Files
BotVPS/core_tools.py

681 lines
24 KiB
Python

import subprocess
import os
import re
import json
import time
import psutil
import asyncio
import httpx
from typing import Dict, List, Optional
# ============================================================
# CORE BASH EXECUTOR (unificado)
# ============================================================
def run_bash(command: str, timeout: int = 120) -> str:
"""Executa um comando bash na VPS e retorna a saída."""
command = command.replace("docker-compose", "docker compose")
try:
custom_env = os.environ.copy()
paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"]
current_path = custom_env.get("PATH", "")
for p in paths:
if p not in current_path:
current_path = f"{p}:{current_path}"
custom_env["PATH"] = current_path
result = subprocess.run(
command, shell=True, capture_output=True,
text=True, timeout=timeout, env=custom_env
)
output = result.stdout.strip()
error = result.stderr.strip()
if output:
return output
if result.returncode != 0:
if result.returncode == 127:
return f"ERRO (127): Comando não encontrado. (Comando: {command})"
return f"ERRO ({result.returncode}): {error or 'Nada no stderr'}"
return "Sucesso (vazio)"
except subprocess.TimeoutExpired:
return "ERRO: O comando demorou muito e foi cancelado (timeout)."
except Exception as e:
return f"ERRO fatal ao rodar bash: {str(e)}"
# Alias para compatibilidade
run_bash_command = run_bash
# ============================================================
# DOCKER TOOLS
# ============================================================
class DockerTools:
@staticmethod
def ps() -> str:
return run_bash("docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}'")
@staticmethod
def stats() -> str:
return run_bash("docker stats --no-stream --format 'table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}'")
@staticmethod
def logs(container: str, lines: int = 50) -> str:
return run_bash(f"docker logs --tail {lines} {container}")
@staticmethod
def restart(container: str) -> str:
return run_bash(f"docker restart {container}")
# Alias compat
get_docker_stats = DockerTools.stats
# ============================================================
# GIT TOOLS
# ============================================================
class GitTools:
@staticmethod
def pull(repo_path: str = ".") -> str:
return run_bash(f"git -C {repo_path} pull")
@staticmethod
def status(repo_path: str = ".") -> str:
return run_bash(f"git -C {repo_path} status --short")
# ============================================================
# SYSTEM TOOLS
# ============================================================
class SystemTools:
@staticmethod
def execute_bash(command: str) -> str:
return run_bash(command)
@staticmethod
def read_file(path: str) -> str:
try:
with open(path, 'r') as f:
return f.read(2000)
except Exception as e:
return f"Erro ao ler arquivo: {e}"
@staticmethod
def write_file(path_content: str) -> str:
try:
if "|" not in path_content:
return "Erro: Use o formato 'caminho|conteúdo'"
path, content = path_content.split("|", 1)
path = path.strip()
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
with open(path, 'w') as f:
f.write(content)
return f"Sucesso: {path} atualizado."
except Exception as e:
return f"Erro ao escrever: {e}"
@staticmethod
def list_dir(path: str = ".") -> str:
try:
items = os.listdir(path)
return "\n".join(items)
except Exception as e:
return f"Erro ao listar {path}: {e}"
@staticmethod
def pm2_status() -> str:
return run_bash("pm2 jlist")
@staticmethod
def pm2_restart(name: str) -> str:
return run_bash(f"pm2 restart {name}")
@staticmethod
def system_health() -> str:
cpu = psutil.cpu_percent(interval=0.1)
vm = psutil.virtual_memory()
disk = psutil.disk_usage('/')
uptime_seconds = time.time() - psutil.boot_time()
uptime_hours = round(uptime_seconds / 3600, 1)
return (f"CPU: {cpu}% | "
f"RAM: {round(vm.used / (1024**3), 2)}GB / {round(vm.total / (1024**3), 2)}GB ({vm.percent}%) | "
f"Disco: {disk.percent}% usado | "
f"Uptime: {uptime_hours}h")
# ============================================================
# CRONOS (MEMÓRIA DE LONGO PRAZO)
# ============================================================
MEMORY_ROOT = "/root/Antigravity_Memory"
def cronos_log(arg: str) -> str:
try:
try:
data = json.loads(arg)
topic = data.get("topic", "geral")
content = data.get("content", "")
folder = data.get("folder", "current_week")
except:
topic_m = re.search(r'topic=["\'](.*?)["\']', arg)
content_m = re.search(r'content=["\'](.*?)["\']', arg, re.S)
folder_m = re.search(r'folder=["\'](.*?)["\']', arg)
topic = topic_m.group(1) if topic_m else "geral"
content = content_m.group(1) if content_m else arg
folder = folder_m.group(1) if folder_m else "current_week"
if not content:
return "Erro: Conteúdo vazio."
target_dir = os.path.join(MEMORY_ROOT, folder)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
filename = f"{topic.lower().replace(' ', '_')}.md"
filepath = os.path.join(target_dir, filename)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
entry = f"\n---\n### ENTRY: {timestamp}\n{content}\n"
with open(filepath, "a" if os.path.exists(filepath) else "w") as f:
f.write(entry)
return f"Sucesso: Salvo em Cronos/{folder}/{filename}"
except Exception as e:
return f"Erro ao salvar em Cronos: {e}"
def cronos_query(arg: str) -> str:
query_m = re.search(r'query=["\'](.*?)["\']', arg)
folder_m = re.search(r'folder=["\'](.*?)["\']', arg)
query = query_m.group(1) if query_m else arg
folder = folder_m.group(1) if folder_m else "current_week"
target_dir = os.path.join(MEMORY_ROOT, folder)
return run_bash(f"grep -rniI '{query}' {target_dir} | head -n 20")
# ============================================================
# GOOGLE WORKSPACE TOOLS
# ============================================================
ACCOUNT_MAPPING = {
"ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr",
"adm": "gws-adm", "empresa": "gws-adm",
"4r": "gws-4r", "familia": "gws-4r", "fam": "gws-4r"
}
def resolve_account(account_alias: str) -> str:
clean = account_alias.strip().lower()
return ACCOUNT_MAPPING.get(clean, f"gws-{clean}")
def _gws_clean_json(res: str) -> dict:
"""Limpa lixo do stdout antes de parsear JSON."""
json_match = re.search(r"(\{.*\})", res, re.S)
res_clean = json_match.group(1) if json_match else res
return json.loads(res_clean)
def list_gmail_emails(account_alias: str) -> str:
account = resolve_account(account_alias)
list_cmd = f"{account} gmail users messages list --params '{{\"userId\": \"me\", \"maxResults\": 5}}'"
res = run_bash(list_cmd)
try:
data = _gws_clean_json(res)
if "error" in data:
err_msg = data["error"].get("message", str(data["error"]))
return f"❌ Erro GWS ({account}): {err_msg}. Talvez precise reautenticar."
messages = data.get("messages", [])
if not messages:
return "📭 Nenhum e-mail encontrado."
result_text = "📧 **Últimos E-mails:**\n"
for i, msg in enumerate(messages, 1):
msg_id = msg["id"]
details_cmd = f"{account} gmail users messages get --params '{{\"userId\": \"me\", \"id\": \"{msg_id}\", \"format\": \"metadata\", \"metadataHeaders\": [\"Subject\", \"From\"]}}'"
details_res = run_bash(details_cmd)
try:
dm = re.search(r"(\{.*\})", details_res, re.S)
dc = dm.group(1) if dm else details_res
details = json.loads(dc)
headers = details.get("payload", {}).get("headers", [])
subject = next((h["value"] for h in headers if h["name"] == "Subject"), "Sem Assunto")
sender = next((h["value"] for h in headers if h["name"] == "From"), "Desconhecido")
result_text += f"{i}. **De:** {sender}\n **Assunto:** {subject}\n **ID:** `{msg_id}`\n\n"
except:
result_text += f"{i}. [Erro ao carregar ID: {msg_id}]\n\n"
return result_text
except Exception as e:
return f"Erro: {str(e)}\nRaw: {res[:200]}"
def gmail_manage_label(arg: str) -> str:
try:
parts = arg.split(maxsplit=1)
account_alias = parts[0]
label_name = parts[1].strip() if len(parts) > 1 else ""
if not label_name:
return "Erro: Nome do marcador não fornecido."
account = resolve_account(account_alias)
list_res = run_bash(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'")
try:
json_match = re.search(r"(\{.*\})", list_res, re.S)
list_res_clean = json_match.group(1) if json_match else list_res
labels_data = json.loads(list_res_clean)
for l in labels_data.get("labels", []):
if l["name"].lower() == label_name.lower():
return f"Marcador '{l['name']}' já existe (ID: {l['id']})."
except:
pass
cmd = f"{account} gmail users labels create --params '{{\"userId\": \"me\"}}' --json '{{\"name\": \"{label_name}\", \"labelListVisibility\": \"labelShow\", \"messageListVisibility\": \"show\"}}'"
res = run_bash(cmd)
return f"Criação de '{label_name}': {res}"
except Exception as e:
return f"Erro marcador: {str(e)}"
def gmail_manage_filter(arg: str) -> str:
try:
parts = arg.split(maxsplit=2)
if len(parts) < 3:
return "Erro: Use 'conta criteria marcador'. Ex: adm @alibaba.com Alibaba"
account_alias, criteria, label_name = parts[0], parts[1], parts[2]
account = resolve_account(account_alias)
label_id = label_name
list_res = run_bash(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'")
try:
json_match = re.search(r"(\{.*\})", list_res, re.S)
list_res_clean = json_match.group(1) if json_match else list_res
labels_data = json.loads(list_res_clean)
for l in labels_data.get("labels", []):
if l["name"].lower() == label_name.lower():
label_id = l["id"]
break
except:
pass
criteria_obj = {"from": criteria} if "@" in criteria else {"query": criteria}
filter_obj = {
"criteria": criteria_obj,
"action": {"addLabelIds": [label_id]}
}
cmd = f"{account} gmail users settings filters create --params '{{\"userId\": \"me\"}}' --json '{json.dumps(filter_obj)}'"
return run_bash(cmd)
except Exception as e:
return f"Erro filtro: {str(e)}"
def drive_find(arg: str) -> str:
try:
parts = arg.split(maxsplit=1)
account_alias = parts[0]
query = parts[1] if len(parts) > 1 else ""
account = resolve_account(account_alias)
q = f"name contains '{query}'" if query else ""
cmd = f"{account} drive files list"
if q:
cmd += f" --params '{{\"q\": \"{q}\"}}'"
res = run_bash(cmd)
data = _gws_clean_json(res) if res.startswith("{") else {}
files = data.get("files", [])
if not files:
return "Nenhum arquivo encontrado."
resp = "📂 **Arquivos Encontrados:**\n"
for f in files[:10]:
resp += f"- {f['name']} (ID: `{f['id']}`)\n"
return resp
except Exception as e:
return f"Erro no Drive: {str(e)}"
def drive_upload(arg: str) -> str:
try:
parts = arg.split(maxsplit=1)
account_alias, filepath = parts[0], parts[1]
account = resolve_account(account_alias)
filename = os.path.basename(filepath)
cmd = f"{account} drive files create --json '{{\"name\": \"{filename}\"}}' --output {filepath}"
return run_bash(cmd)
except Exception as e:
return f"Erro upload: {str(e)}"
def calendar_agenda(arg: str) -> str:
try:
parts = arg.split(maxsplit=1)
account_alias = parts[0]
timeframe = parts[1] if len(parts) > 1 else "--today"
account = resolve_account(account_alias)
if not timeframe.startswith("--"):
timeframe = f"--{timeframe}"
cmd = f"{account} calendar +agenda {timeframe}"
return run_bash(cmd)
except Exception as e:
return f"Erro no Calendário: {str(e)}"
# ============================================================
# GITEA & SUPABASE (ASYNC)
# ============================================================
async def gitea_list_repos() -> str:
from credential_manager import gitea_api_url, gitea_token
url = f"{gitea_api_url()}/user/repos"
headers = {"Authorization": f"token {gitea_token()}"}
async with httpx.AsyncClient() as client:
try:
res = await client.get(url, headers=headers)
repos = res.json()
return "\n".join([f"{r['name']}" for r in repos[:10]])
except Exception as e:
return f"Erro Gitea: {e}"
async def supabase_list_tables() -> str:
from credential_manager import supabase_url, supabase_anon_key
url = f"{supabase_url()}/rest/v1/"
headers = {"apikey": supabase_anon_key(), "Authorization": f"Bearer {supabase_anon_key()}"}
async with httpx.AsyncClient() as client:
try:
res = await client.get(url, headers=headers)
return str(res.json())
except Exception as e:
return f"Erro Supabase: {e}"
# ============================================================
# WORKSPACE TOOLS
# ============================================================
class WorkspaceTools:
@staticmethod
def gws_command(cmd: str) -> str:
return run_bash(cmd)
@staticmethod
def magic_deploy(git_url: str) -> str:
from deploy_manager import DeployManager
dm = DeployManager()
return dm.magic_deploy(git_url)
@staticmethod
def coolify_deploy_status(arg: str = None) -> str:
cmd = 'docker exec coolify-db psql -U coolify -d coolify -c "SELECT application_name, status, created_at FROM application_deployment_queues ORDER BY created_at DESC LIMIT 5;"'
return run_bash(cmd)
# ============================================================
# BROWSER CLOUD TOOLS
# ============================================================
BROWSER_CLOUD_URL = "http://localhost:8088"
_current_session_id = None
class BrowserCloudTools:
@staticmethod
def _req(method: str, path: str, data: dict = None):
import urllib.request, json
url = f"{BROWSER_CLOUD_URL}{path}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, method=method,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read())
except Exception as e:
return {"error": str(e)}
@staticmethod
def _session():
global _current_session_id
if _current_session_id:
return _current_session_id
r = BrowserCloudTools._req("POST", "/session", {"headless": True})
if "session_id" in r:
_current_session_id = r["session_id"]
return _current_session_id
return None
@staticmethod
def navigate(url: str) -> str:
sid = BrowserCloudTools._session()
if not sid:
return "Erro: não foi possível criar sessão no browser cloud"
r = BrowserCloudTools._req("POST", f"/session/{sid}/navigate",
{"url": url, "wait_until": "load"})
if "error" in r:
return f"Erro: {r['error']}"
return f"OK — {r.get('title', '')} ({url})"
@staticmethod
def screenshot(url_or_current: str = "current") -> str:
sid = BrowserCloudTools._session()
if not sid:
return "Erro: sem sessão"
# If URL provided, navigate first
if url_or_current != "current" and url_or_current.startswith("http"):
nav = BrowserCloudTools.navigate(url_or_current)
if nav.startswith("Erro"):
return nav
r = BrowserCloudTools._req("GET", f"/session/{sid}/screenshot")
if "error" in r:
return f"Erro: {r['error']}"
b64 = r.get("data", "")
size = len(b64) * 3 // 4 # base64 → bytes approx
return f"Screenshot OK — {r.get('format')} {size // 1024}KB (base64)"
@staticmethod
def click(selector: str) -> str:
sid = BrowserCloudTools._session()
if not sid:
return "Erro: sem sessão"
r = BrowserCloudTools._req("POST", f"/session/{sid}/click",
{"selector": selector})
if "error" in r:
return f"Erro: {r['error']}"
return "OK — clicado"
@staticmethod
def fill(input_str: str) -> str:
# Formato: selector|value|submit(optional)
parts = input_str.split("|")
selector = parts[0]
value = parts[1] if len(parts) > 1 else ""
submit = len(parts) > 2 and parts[2].lower() == "true"
sid = BrowserCloudTools._session()
if not sid:
return "Erro: sem sessão"
r = BrowserCloudTools._req("POST", f"/session/{sid}/fill",
{"selector": selector, "value": value, "submit": submit})
if "error" in r:
return f"Erro: {r['error']}"
return "OK — preenchido" + (" e submetido" if submit else "")
# ============================================================
# HERMES ORCHESTRATOR
# ============================================================
def delegate_to_hermes(task: str) -> str:
"""Delega uma tarefa complexa para o Hermes Agent (MiniMax 2.7) resolver na VPS."""
import shlex
safe_task = shlex.quote(task)
# Executa o hermes no modo oneshot (-z), com timeout estendido de 5 min
command = f"hermes -z {safe_task}"
return run_bash(command, timeout=300)
# ============================================================
# REGISTRY CENTRALIZADO
# ============================================================
AVAILABLE_TOOLS = {
# Bash / Sistema
"run_bash": {
"description": "Executa comandos Linux na VPS (docker, git, pm2, etc.)",
"func": run_bash
},
"bash": {
"description": "Alias para run_bash",
"func": run_bash
},
# Docker
"docker_ps": {
"description": "Lista containers ativos",
"func": DockerTools.ps
},
"docker_stats": {
"description": "CPU/RAM por container",
"func": DockerTools.stats
},
"docker_logs": {
"description": "Logs de um container",
"func": DockerTools.logs
},
"docker_restart": {
"description": "Reinicia container",
"func": DockerTools.restart
},
# Git
"git_pull": {
"description": "Pull no repositório",
"func": GitTools.pull
},
"git_status": {
"description": "Status git",
"func": GitTools.status
},
# Sistema
"system_health": {
"description": "CPU, RAM, Disco, Uptime",
"func": SystemTools.system_health
},
"read_file": {
"description": "Lê arquivo na VPS",
"func": SystemTools.read_file
},
"write_file": {
"description": "Cria/edita arquivo (caminho|conteúdo)",
"func": SystemTools.write_file
},
"ls": {
"description": "Lista arquivos num diretório",
"func": SystemTools.list_dir
},
"pm2_status": {
"description": "Status PM2",
"func": SystemTools.pm2_status
},
"pm2_restart": {
"description": "Reinicia PM2",
"func": SystemTools.pm2_restart
},
# Cronos
"cronos_log": {
"description": "Salva memória de longo prazo",
"func": cronos_log
},
"cronos_query": {
"description": "Busca na memória Cronos",
"func": cronos_query
},
# GWS - Gmail
"list_emails": {
"description": "Lista últimos 5 e-mails (ma/adm/4r)",
"func": list_gmail_emails
},
"gmail_label": {
"description": "Cria marcador no Gmail",
"func": gmail_manage_label
},
"gmail_filter": {
"description": "Cria filtro no Gmail",
"func": gmail_manage_filter
},
# GWS - Drive
"drive_find": {
"description": "Busca arquivos no Drive",
"func": drive_find
},
"drive_upload": {
"description": "Upload para Drive",
"func": drive_upload
},
# GWS - Calendar
"calendar_agenda": {
"description": "Eventos do calendário",
"func": calendar_agenda
},
# Gitea / Supabase
"gitea_repos": {
"description": "Lista repositórios no Gitea",
"func": gitea_list_repos
},
"supabase_tables": {
"description": "Lista tabelas no Supabase",
"func": supabase_list_tables
},
# Workspace
"gws": {
"description": "Executa comando GWS CLI",
"func": WorkspaceTools.gws_command
},
"magic_deploy": {
"description": "Deploy automático via Git URL",
"func": WorkspaceTools.magic_deploy
},
"coolify_status": {
"description": "Status deploies Coolify",
"func": WorkspaceTools.coolify_deploy_status
},
"hermes_delegate": {
"description": "Delega tarefas muito complexas para o Operador Master (Hermes Agent)",
"func": delegate_to_hermes
},
# Browser Cloud
"browser_navigate": {
"description": "Navega URL no browser cloud (HEADLESS). Uso: URL completa",
"func": BrowserCloudTools.navigate
},
"browser_screenshot": {
"description": "Tira screenshot do browser cloud (PNG base64). Uso: URL ou 'current'",
"func": BrowserCloudTools.screenshot
},
"browser_click": {
"description": "Clica em elemento CSS no browser cloud. Uso: seletor CSS",
"func": BrowserCloudTools.click
},
"browser_fill": {
"description": "Preenche input e opcionalmente submete. Uso: seletor|valor|submeter(true/false)",
"func": BrowserCloudTools.fill
},
# Legado (aliases)
"get_docker_stats": {
"description": "CPU/RAM por container (legacy)",
"func": DockerTools.stats
},
"get_system_health": {
"description": "CPU, RAM, Disco, Uptime (legacy)",
"func": SystemTools.system_health
},
"read_vps_file": {
"description": "Lê arquivo na VPS (legacy)",
"func": SystemTools.read_file
},
}
# ============================================================
# HELPERS
# ============================================================
def get_all_tools_formatted() -> str:
res = "🛠️ **Ferramentas Antigravity Ativas**:\\n\\n"
for name, info in AVAILABLE_TOOLS.items():
res += f"- `{name}`: {info['description']}\\n"
return res
def get_tools_by_danger(level: str) -> List:
danger_map = {
"bash": "dangerous",
"magic_deploy": "dangerous",
"write_file": "dangerous",
"docker_restart": "dangerous",
"pm2_restart": "medium",
"git_pull": "medium",
"gws": "medium",
"hermes_delegate": "dangerous",
}
result = []
for name, info in AVAILABLE_TOOLS.items():
danger = danger_map.get(name, "safe")
if danger == level:
result.append({"name": name, **info, "danger": danger})
return result