import subprocess import os import re import json import time import psutil import asyncio import httpx from typing import Dict, List, Optional # ============================================================ # CORE BASH EXECUTOR (unificado) # ============================================================ def run_bash(command: str, timeout: int = 120) -> str: """Executa um comando bash na VPS e retorna a saída.""" command = command.replace("docker-compose", "docker compose") try: custom_env = os.environ.copy() paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"] current_path = custom_env.get("PATH", "") for p in paths: if p not in current_path: current_path = f"{p}:{current_path}" custom_env["PATH"] = current_path result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=timeout, env=custom_env ) output = result.stdout.strip() error = result.stderr.strip() if output: return output if result.returncode != 0: if result.returncode == 127: return f"ERRO (127): Comando não encontrado. (Comando: {command})" return f"ERRO ({result.returncode}): {error or 'Nada no stderr'}" return "Sucesso (vazio)" except subprocess.TimeoutExpired: return "ERRO: O comando demorou muito e foi cancelado (timeout)." except Exception as e: return f"ERRO fatal ao rodar bash: {str(e)}" # Alias para compatibilidade run_bash_command = run_bash # ============================================================ # DOCKER TOOLS # ============================================================ class DockerTools: @staticmethod def ps() -> str: return run_bash("docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}'") @staticmethod def stats() -> str: return run_bash("docker stats --no-stream --format 'table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}'") @staticmethod def logs(container: str, lines: int = 50) -> str: return run_bash(f"docker logs --tail {lines} {container}") @staticmethod def restart(container: str) -> str: return run_bash(f"docker restart {container}") # Alias compat get_docker_stats = DockerTools.stats # ============================================================ # GIT TOOLS # ============================================================ class GitTools: @staticmethod def pull(repo_path: str = ".") -> str: return run_bash(f"git -C {repo_path} pull") @staticmethod def status(repo_path: str = ".") -> str: return run_bash(f"git -C {repo_path} status --short") # ============================================================ # SYSTEM TOOLS # ============================================================ class SystemTools: @staticmethod def execute_bash(command: str) -> str: return run_bash(command) @staticmethod def read_file(path: str) -> str: try: with open(path, 'r') as f: return f.read(2000) except Exception as e: return f"Erro ao ler arquivo: {e}" @staticmethod def write_file(path_content: str) -> str: try: if "|" not in path_content: return "Erro: Use o formato 'caminho|conteúdo'" path, content = path_content.split("|", 1) path = path.strip() os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) with open(path, 'w') as f: f.write(content) return f"Sucesso: {path} atualizado." except Exception as e: return f"Erro ao escrever: {e}" @staticmethod def list_dir(path: str = ".") -> str: try: items = os.listdir(path) return "\n".join(items) except Exception as e: return f"Erro ao listar {path}: {e}" @staticmethod def pm2_status() -> str: return run_bash("pm2 jlist") @staticmethod def pm2_restart(name: str) -> str: return run_bash(f"pm2 restart {name}") @staticmethod def system_health() -> str: cpu = psutil.cpu_percent(interval=0.1) vm = psutil.virtual_memory() disk = psutil.disk_usage('/') uptime_seconds = time.time() - psutil.boot_time() uptime_hours = round(uptime_seconds / 3600, 1) return (f"CPU: {cpu}% | " f"RAM: {round(vm.used / (1024**3), 2)}GB / {round(vm.total / (1024**3), 2)}GB ({vm.percent}%) | " f"Disco: {disk.percent}% usado | " f"Uptime: {uptime_hours}h") # ============================================================ # CRONOS (MEMÓRIA DE LONGO PRAZO) # ============================================================ MEMORY_ROOT = "/root/Antigravity_Memory" def cronos_log(arg: str) -> str: try: try: data = json.loads(arg) topic = data.get("topic", "geral") content = data.get("content", "") folder = data.get("folder", "current_week") except: topic_m = re.search(r'topic=["\'](.*?)["\']', arg) content_m = re.search(r'content=["\'](.*?)["\']', arg, re.S) folder_m = re.search(r'folder=["\'](.*?)["\']', arg) topic = topic_m.group(1) if topic_m else "geral" content = content_m.group(1) if content_m else arg folder = folder_m.group(1) if folder_m else "current_week" if not content: return "Erro: Conteúdo vazio." target_dir = os.path.join(MEMORY_ROOT, folder) if not os.path.exists(target_dir): os.makedirs(target_dir, exist_ok=True) filename = f"{topic.lower().replace(' ', '_')}.md" filepath = os.path.join(target_dir, filename) timestamp = time.strftime("%Y-%m-%d %H:%M:%S") entry = f"\n---\n### ENTRY: {timestamp}\n{content}\n" with open(filepath, "a" if os.path.exists(filepath) else "w") as f: f.write(entry) return f"Sucesso: Salvo em Cronos/{folder}/{filename}" except Exception as e: return f"Erro ao salvar em Cronos: {e}" def cronos_query(arg: str) -> str: query_m = re.search(r'query=["\'](.*?)["\']', arg) folder_m = re.search(r'folder=["\'](.*?)["\']', arg) query = query_m.group(1) if query_m else arg folder = folder_m.group(1) if folder_m else "current_week" target_dir = os.path.join(MEMORY_ROOT, folder) return run_bash(f"grep -rniI '{query}' {target_dir} | head -n 20") # ============================================================ # GOOGLE WORKSPACE TOOLS # ============================================================ ACCOUNT_MAPPING = { "ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr", "adm": "gws-adm", "empresa": "gws-adm", "4r": "gws-4r", "familia": "gws-4r", "fam": "gws-4r" } def resolve_account(account_alias: str) -> str: clean = account_alias.strip().lower() return ACCOUNT_MAPPING.get(clean, f"gws-{clean}") def _gws_clean_json(res: str) -> dict: """Limpa lixo do stdout antes de parsear JSON.""" json_match = re.search(r"(\{.*\})", res, re.S) res_clean = json_match.group(1) if json_match else res return json.loads(res_clean) def list_gmail_emails(account_alias: str) -> str: account = resolve_account(account_alias) list_cmd = f"{account} gmail users messages list --params '{{\"userId\": \"me\", \"maxResults\": 5}}'" res = run_bash(list_cmd) try: data = _gws_clean_json(res) if "error" in data: err_msg = data["error"].get("message", str(data["error"])) return f"❌ Erro GWS ({account}): {err_msg}. Talvez precise reautenticar." messages = data.get("messages", []) if not messages: return "📭 Nenhum e-mail encontrado." result_text = "📧 **Últimos E-mails:**\n" for i, msg in enumerate(messages, 1): msg_id = msg["id"] details_cmd = f"{account} gmail users messages get --params '{{\"userId\": \"me\", \"id\": \"{msg_id}\", \"format\": \"metadata\", \"metadataHeaders\": [\"Subject\", \"From\"]}}'" details_res = run_bash(details_cmd) try: dm = re.search(r"(\{.*\})", details_res, re.S) dc = dm.group(1) if dm else details_res details = json.loads(dc) headers = details.get("payload", {}).get("headers", []) subject = next((h["value"] for h in headers if h["name"] == "Subject"), "Sem Assunto") sender = next((h["value"] for h in headers if h["name"] == "From"), "Desconhecido") result_text += f"{i}. **De:** {sender}\n **Assunto:** {subject}\n **ID:** `{msg_id}`\n\n" except: result_text += f"{i}. [Erro ao carregar ID: {msg_id}]\n\n" return result_text except Exception as e: return f"Erro: {str(e)}\nRaw: {res[:200]}" def gmail_manage_label(arg: str) -> str: try: parts = arg.split(maxsplit=1) account_alias = parts[0] label_name = parts[1].strip() if len(parts) > 1 else "" if not label_name: return "Erro: Nome do marcador não fornecido." account = resolve_account(account_alias) list_res = run_bash(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") try: json_match = re.search(r"(\{.*\})", list_res, re.S) list_res_clean = json_match.group(1) if json_match else list_res labels_data = json.loads(list_res_clean) for l in labels_data.get("labels", []): if l["name"].lower() == label_name.lower(): return f"Marcador '{l['name']}' já existe (ID: {l['id']})." except: pass cmd = f"{account} gmail users labels create --params '{{\"userId\": \"me\"}}' --json '{{\"name\": \"{label_name}\", \"labelListVisibility\": \"labelShow\", \"messageListVisibility\": \"show\"}}'" res = run_bash(cmd) return f"Criação de '{label_name}': {res}" except Exception as e: return f"Erro marcador: {str(e)}" def gmail_manage_filter(arg: str) -> str: try: parts = arg.split(maxsplit=2) if len(parts) < 3: return "Erro: Use 'conta criteria marcador'. Ex: adm @alibaba.com Alibaba" account_alias, criteria, label_name = parts[0], parts[1], parts[2] account = resolve_account(account_alias) label_id = label_name list_res = run_bash(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") try: json_match = re.search(r"(\{.*\})", list_res, re.S) list_res_clean = json_match.group(1) if json_match else list_res labels_data = json.loads(list_res_clean) for l in labels_data.get("labels", []): if l["name"].lower() == label_name.lower(): label_id = l["id"] break except: pass criteria_obj = {"from": criteria} if "@" in criteria else {"query": criteria} filter_obj = { "criteria": criteria_obj, "action": {"addLabelIds": [label_id]} } cmd = f"{account} gmail users settings filters create --params '{{\"userId\": \"me\"}}' --json '{json.dumps(filter_obj)}'" return run_bash(cmd) except Exception as e: return f"Erro filtro: {str(e)}" def drive_find(arg: str) -> str: try: parts = arg.split(maxsplit=1) account_alias = parts[0] query = parts[1] if len(parts) > 1 else "" account = resolve_account(account_alias) q = f"name contains '{query}'" if query else "" cmd = f"{account} drive files list" if q: cmd += f" --params '{{\"q\": \"{q}\"}}'" res = run_bash(cmd) data = _gws_clean_json(res) if res.startswith("{") else {} files = data.get("files", []) if not files: return "Nenhum arquivo encontrado." resp = "📂 **Arquivos Encontrados:**\n" for f in files[:10]: resp += f"- {f['name']} (ID: `{f['id']}`)\n" return resp except Exception as e: return f"Erro no Drive: {str(e)}" def drive_upload(arg: str) -> str: try: parts = arg.split(maxsplit=1) account_alias, filepath = parts[0], parts[1] account = resolve_account(account_alias) filename = os.path.basename(filepath) cmd = f"{account} drive files create --json '{{\"name\": \"{filename}\"}}' --output {filepath}" return run_bash(cmd) except Exception as e: return f"Erro upload: {str(e)}" def calendar_agenda(arg: str) -> str: try: parts = arg.split(maxsplit=1) account_alias = parts[0] timeframe = parts[1] if len(parts) > 1 else "--today" account = resolve_account(account_alias) if not timeframe.startswith("--"): timeframe = f"--{timeframe}" cmd = f"{account} calendar +agenda {timeframe}" return run_bash(cmd) except Exception as e: return f"Erro no Calendário: {str(e)}" # ============================================================ # GITEA & SUPABASE (ASYNC) # ============================================================ async def gitea_list_repos() -> str: from credential_manager import gitea_api_url, gitea_token url = f"{gitea_api_url()}/user/repos" headers = {"Authorization": f"token {gitea_token()}"} async with httpx.AsyncClient() as client: try: res = await client.get(url, headers=headers) repos = res.json() return "\n".join([f"• {r['name']}" for r in repos[:10]]) except Exception as e: return f"Erro Gitea: {e}" async def supabase_list_tables() -> str: from credential_manager import supabase_url, supabase_anon_key url = f"{supabase_url()}/rest/v1/" headers = {"apikey": supabase_anon_key(), "Authorization": f"Bearer {supabase_anon_key()}"} async with httpx.AsyncClient() as client: try: res = await client.get(url, headers=headers) return str(res.json()) except Exception as e: return f"Erro Supabase: {e}" # ============================================================ # WORKSPACE TOOLS # ============================================================ class WorkspaceTools: @staticmethod def gws_command(cmd: str) -> str: return run_bash(cmd) @staticmethod def magic_deploy(git_url: str) -> str: from deploy_manager import DeployManager dm = DeployManager() return dm.magic_deploy(git_url) @staticmethod def coolify_deploy_status(arg: str = None) -> str: cmd = 'docker exec coolify-db psql -U coolify -d coolify -c "SELECT application_name, status, created_at FROM application_deployment_queues ORDER BY created_at DESC LIMIT 5;"' return run_bash(cmd) # ============================================================ # BROWSER CLOUD TOOLS # ============================================================ BROWSER_CLOUD_URL = "http://localhost:8088" _current_session_id = None class BrowserCloudTools: @staticmethod def _req(method: str, path: str, data: dict = None): import urllib.request, json url = f"{BROWSER_CLOUD_URL}{path}" body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, method=method, headers={"Content-Type": "application/json"}) try: with urllib.request.urlopen(req, timeout=30) as r: return json.loads(r.read()) except Exception as e: return {"error": str(e)} @staticmethod def _session(): global _current_session_id if _current_session_id: return _current_session_id r = BrowserCloudTools._req("POST", "/session", {"headless": True}) if "session_id" in r: _current_session_id = r["session_id"] return _current_session_id return None @staticmethod def navigate(url: str) -> str: sid = BrowserCloudTools._session() if not sid: return "Erro: não foi possível criar sessão no browser cloud" r = BrowserCloudTools._req("POST", f"/session/{sid}/navigate", {"url": url, "wait_until": "load"}) if "error" in r: return f"Erro: {r['error']}" return f"OK — {r.get('title', '')} ({url})" @staticmethod def screenshot(url_or_current: str = "current") -> str: sid = BrowserCloudTools._session() if not sid: return "Erro: sem sessão" # If URL provided, navigate first if url_or_current != "current" and url_or_current.startswith("http"): nav = BrowserCloudTools.navigate(url_or_current) if nav.startswith("Erro"): return nav r = BrowserCloudTools._req("GET", f"/session/{sid}/screenshot") if "error" in r: return f"Erro: {r['error']}" b64 = r.get("data", "") size = len(b64) * 3 // 4 # base64 → bytes approx return f"Screenshot OK — {r.get('format')} {size // 1024}KB (base64)" @staticmethod def click(selector: str) -> str: sid = BrowserCloudTools._session() if not sid: return "Erro: sem sessão" r = BrowserCloudTools._req("POST", f"/session/{sid}/click", {"selector": selector}) if "error" in r: return f"Erro: {r['error']}" return "OK — clicado" @staticmethod def fill(input_str: str) -> str: # Formato: selector|value|submit(optional) parts = input_str.split("|") selector = parts[0] value = parts[1] if len(parts) > 1 else "" submit = len(parts) > 2 and parts[2].lower() == "true" sid = BrowserCloudTools._session() if not sid: return "Erro: sem sessão" r = BrowserCloudTools._req("POST", f"/session/{sid}/fill", {"selector": selector, "value": value, "submit": submit}) if "error" in r: return f"Erro: {r['error']}" return "OK — preenchido" + (" e submetido" if submit else "") # ============================================================ # HERMES ORCHESTRATOR # ============================================================ def delegate_to_hermes(task: str) -> str: """Delega uma tarefa complexa para o Hermes Agent (MiniMax 2.7) resolver na VPS.""" import shlex safe_task = shlex.quote(task) # Executa o hermes no modo oneshot (-z), com timeout estendido de 5 min command = f"hermes -z {safe_task}" return run_bash(command, timeout=300) # ============================================================ # REGISTRY CENTRALIZADO # ============================================================ AVAILABLE_TOOLS = { # Bash / Sistema "run_bash": { "description": "Executa comandos Linux na VPS (docker, git, pm2, etc.)", "func": run_bash }, "bash": { "description": "Alias para run_bash", "func": run_bash }, # Docker "docker_ps": { "description": "Lista containers ativos", "func": DockerTools.ps }, "docker_stats": { "description": "CPU/RAM por container", "func": DockerTools.stats }, "docker_logs": { "description": "Logs de um container", "func": DockerTools.logs }, "docker_restart": { "description": "Reinicia container", "func": DockerTools.restart }, # Git "git_pull": { "description": "Pull no repositório", "func": GitTools.pull }, "git_status": { "description": "Status git", "func": GitTools.status }, # Sistema "system_health": { "description": "CPU, RAM, Disco, Uptime", "func": SystemTools.system_health }, "read_file": { "description": "Lê arquivo na VPS", "func": SystemTools.read_file }, "write_file": { "description": "Cria/edita arquivo (caminho|conteúdo)", "func": SystemTools.write_file }, "ls": { "description": "Lista arquivos num diretório", "func": SystemTools.list_dir }, "pm2_status": { "description": "Status PM2", "func": SystemTools.pm2_status }, "pm2_restart": { "description": "Reinicia PM2", "func": SystemTools.pm2_restart }, # Cronos "cronos_log": { "description": "Salva memória de longo prazo", "func": cronos_log }, "cronos_query": { "description": "Busca na memória Cronos", "func": cronos_query }, # GWS - Gmail "list_emails": { "description": "Lista últimos 5 e-mails (ma/adm/4r)", "func": list_gmail_emails }, "gmail_label": { "description": "Cria marcador no Gmail", "func": gmail_manage_label }, "gmail_filter": { "description": "Cria filtro no Gmail", "func": gmail_manage_filter }, # GWS - Drive "drive_find": { "description": "Busca arquivos no Drive", "func": drive_find }, "drive_upload": { "description": "Upload para Drive", "func": drive_upload }, # GWS - Calendar "calendar_agenda": { "description": "Eventos do calendário", "func": calendar_agenda }, # Gitea / Supabase "gitea_repos": { "description": "Lista repositórios no Gitea", "func": gitea_list_repos }, "supabase_tables": { "description": "Lista tabelas no Supabase", "func": supabase_list_tables }, # Workspace "gws": { "description": "Executa comando GWS CLI", "func": WorkspaceTools.gws_command }, "magic_deploy": { "description": "Deploy automático via Git URL", "func": WorkspaceTools.magic_deploy }, "coolify_status": { "description": "Status deploies Coolify", "func": WorkspaceTools.coolify_deploy_status }, "hermes_delegate": { "description": "Delega tarefas muito complexas para o Operador Master (Hermes Agent)", "func": delegate_to_hermes }, # Browser Cloud "browser_navigate": { "description": "Navega URL no browser cloud (HEADLESS). Uso: URL completa", "func": BrowserCloudTools.navigate }, "browser_screenshot": { "description": "Tira screenshot do browser cloud (PNG base64). Uso: URL ou 'current'", "func": BrowserCloudTools.screenshot }, "browser_click": { "description": "Clica em elemento CSS no browser cloud. Uso: seletor CSS", "func": BrowserCloudTools.click }, "browser_fill": { "description": "Preenche input e opcionalmente submete. Uso: seletor|valor|submeter(true/false)", "func": BrowserCloudTools.fill }, # Legado (aliases) "get_docker_stats": { "description": "CPU/RAM por container (legacy)", "func": DockerTools.stats }, "get_system_health": { "description": "CPU, RAM, Disco, Uptime (legacy)", "func": SystemTools.system_health }, "read_vps_file": { "description": "Lê arquivo na VPS (legacy)", "func": SystemTools.read_file }, } # ============================================================ # HELPERS # ============================================================ def get_all_tools_formatted() -> str: res = "🛠️ **Ferramentas Antigravity Ativas**:\\n\\n" for name, info in AVAILABLE_TOOLS.items(): res += f"- `{name}`: {info['description']}\\n" return res def get_tools_by_danger(level: str) -> List: danger_map = { "bash": "dangerous", "magic_deploy": "dangerous", "write_file": "dangerous", "docker_restart": "dangerous", "pm2_restart": "medium", "git_pull": "medium", "gws": "medium", "hermes_delegate": "dangerous", } result = [] for name, info in AVAILABLE_TOOLS.items(): danger = danger_map.get(name, "safe") if danger == level: result.append({"name": name, **info, "danger": danger}) return result