diff --git a/ai_agent.py b/ai_agent.py index 906a054..b0d19da 100644 --- a/ai_agent.py +++ b/ai_agent.py @@ -3,8 +3,7 @@ import re import httpx import asyncio import json -from tools import AVAILABLE_TOOLS as TOOLS_LEGACY -from tools_v2 import TOOLS_V2 as TOOLS_NEW +from core_tools import AVAILABLE_TOOLS from llm_providers import call_llm, get_available_models, get_planner_llm from config import get_config @@ -27,9 +26,8 @@ def query_agent(prompt: str, override_provider=None, chat_history=None) -> str: async def query_agent_async(prompt: str, override_provider=None, chat_history=None) -> str: cfg = get_config() provider = override_provider or cfg.get("active_provider", "openrouter") - # Unifica ferramentas legadas e novas - ALL_TOOLS = {**TOOLS_LEGACY, **TOOLS_NEW} - tools_desc = "\n".join([f"- {k}: {v.get('description') or v.get('desc')}" for k, v in ALL_TOOLS.items()]) + # Unifica ferramentas (agora centralizadas em core_tools) + tools_desc = "\n".join([f"- {k}: {v.get('description', '')}" for k, v in AVAILABLE_TOOLS.items()]) # Identifica o modelo para o prompt do sistema current_model = cfg.get("model") or "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning:free" @@ -59,6 +57,7 @@ DIRETRIZES: {tools_desc} ### REGRAS DE OURO: +- DELEGAÇÃO AO HERMES (CRÍTICO): Se a requisição do usuário exigir refatoração de código complexa, análise de múltiplos arquivos, criação de novos scripts ou ações avançadas que excedam suas ferramentas básicas, NÃO tente resolver sozinho. Use IMEDIATAMENTE `[TOOL:hermes_delegate] descreva a tarefa aqui [/TOOL]` para que o Operador Master (Hermes Agent) assuma a VPS e resolva o problema. - CONSENTIMENTO PARA SCRIPTS: Se você não conseguir realizar uma tarefa técnica e precisar passar instruções, scripts ou tutoriais, você DEVE primeiro relatar o problema e PERGUNTAR se o usuário deseja receber o passo a passo técnico. Só envie se ele consentir. - FOCO NO PRESENTE: O histórico é para CONTEXTO. Foque SEMPRE no pedido ATUAL (última mensagem). - COOLIFY: NUNCA tente adivinhar caminhos de logs. Use SEMPRE a ferramenta `coolify_status`. @@ -133,7 +132,7 @@ DIRETRIZES: arg = content_after[:end_tag.start()].strip() if end_tag else content_after.strip() - all_tools = {**TOOLS_LEGACY, **TOOLS_NEW} + all_tools = AVAILABLE_TOOLS if t_name in all_tools: tool_info = all_tools[t_name] func = tool_info["func"] diff --git a/bridge_telegram.py b/bridge_telegram.py index 4792485..e65928a 100644 --- a/bridge_telegram.py +++ b/bridge_telegram.py @@ -99,6 +99,14 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): return await update.message.reply_text("⚙️ *Processando tarefa...*", parse_mode='Markdown') reply = await call_antigravity_api("/api/orchestrate", {"task": task}) + elif text.startswith('/hermes'): + task = text.replace('/hermes', '').strip() + if not task: + await update.message.reply_text("❓ Envie a tarefa para o Hermes após o comando /hermes.") + return + await update.message.reply_text("🤖 *Hermes assumindo o controle. Isso pode demorar alguns minutos...*", parse_mode='Markdown') + # Bypass history and call hermes endpoint directly + reply = await call_antigravity_api("/api/hermes", {"task": task}) else: await context.bot.send_chat_action(chat_id=chat_id, action="typing") payload = {"text": text, "history": chat_histories[chat_id][-10:]} diff --git a/core_tools.py b/core_tools.py new file mode 100644 index 0000000..c95e177 --- /dev/null +++ b/core_tools.py @@ -0,0 +1,576 @@ +import subprocess +import os +import re +import json +import time +import psutil +import asyncio +import httpx +from typing import Dict, List, Optional + +# ============================================================ +# CORE BASH EXECUTOR (unificado) +# ============================================================ + +def run_bash(command: str, timeout: int = 120) -> str: + """Executa um comando bash na VPS e retorna a saída.""" + command = command.replace("docker-compose", "docker compose") + try: + custom_env = os.environ.copy() + paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"] + current_path = custom_env.get("PATH", "") + for p in paths: + if p not in current_path: + current_path = f"{p}:{current_path}" + custom_env["PATH"] = current_path + + result = subprocess.run( + command, shell=True, capture_output=True, + text=True, timeout=timeout, env=custom_env + ) + output = result.stdout.strip() + error = result.stderr.strip() + + if output: + return output + if result.returncode != 0: + if result.returncode == 127: + return f"ERRO (127): Comando não encontrado. (Comando: {command})" + return f"ERRO ({result.returncode}): {error or 'Nada no stderr'}" + return "Sucesso (vazio)" + except subprocess.TimeoutExpired: + return "ERRO: O comando demorou muito e foi cancelado (timeout)." + except Exception as e: + return f"ERRO fatal ao rodar bash: {str(e)}" + +# Alias para compatibilidade +run_bash_command = run_bash + +# ============================================================ +# DOCKER TOOLS +# ============================================================ + +class DockerTools: + @staticmethod + def ps() -> str: + return run_bash("docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}'") + + @staticmethod + def stats() -> str: + return run_bash("docker stats --no-stream --format 'table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}'") + + @staticmethod + def logs(container: str, lines: int = 50) -> str: + return run_bash(f"docker logs --tail {lines} {container}") + + @staticmethod + def restart(container: str) -> str: + return run_bash(f"docker restart {container}") + +# Alias compat +get_docker_stats = DockerTools.stats + +# ============================================================ +# GIT TOOLS +# ============================================================ + +class GitTools: + @staticmethod + def pull(repo_path: str = ".") -> str: + return run_bash(f"git -C {repo_path} pull") + + @staticmethod + def status(repo_path: str = ".") -> str: + return run_bash(f"git -C {repo_path} status --short") + +# ============================================================ +# SYSTEM TOOLS +# ============================================================ + +class SystemTools: + @staticmethod + def execute_bash(command: str) -> str: + return run_bash(command) + + @staticmethod + def read_file(path: str) -> str: + try: + with open(path, 'r') as f: + return f.read(2000) + except Exception as e: + return f"Erro ao ler arquivo: {e}" + + @staticmethod + def write_file(path_content: str) -> str: + try: + if "|" not in path_content: + return "Erro: Use o formato 'caminho|conteúdo'" + path, content = path_content.split("|", 1) + path = path.strip() + os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) + with open(path, 'w') as f: + f.write(content) + return f"Sucesso: {path} atualizado." + except Exception as e: + return f"Erro ao escrever: {e}" + + @staticmethod + def list_dir(path: str = ".") -> str: + try: + items = os.listdir(path) + return "\n".join(items) + except Exception as e: + return f"Erro ao listar {path}: {e}" + + @staticmethod + def pm2_status() -> str: + return run_bash("pm2 jlist") + + @staticmethod + def pm2_restart(name: str) -> str: + return run_bash(f"pm2 restart {name}") + + @staticmethod + def system_health() -> str: + cpu = psutil.cpu_percent(interval=0.1) + vm = psutil.virtual_memory() + disk = psutil.disk_usage('/') + uptime_seconds = time.time() - psutil.boot_time() + uptime_hours = round(uptime_seconds / 3600, 1) + return (f"CPU: {cpu}% | " + f"RAM: {round(vm.used / (1024**3), 2)}GB / {round(vm.total / (1024**3), 2)}GB ({vm.percent}%) | " + f"Disco: {disk.percent}% usado | " + f"Uptime: {uptime_hours}h") + +# ============================================================ +# CRONOS (MEMÓRIA DE LONGO PRAZO) +# ============================================================ + +MEMORY_ROOT = "/root/Antigravity_Memory" + +def cronos_log(arg: str) -> str: + try: + try: + data = json.loads(arg) + topic = data.get("topic", "geral") + content = data.get("content", "") + folder = data.get("folder", "current_week") + except: + topic_m = re.search(r'topic=["\'](.*?)["\']', arg) + content_m = re.search(r'content=["\'](.*?)["\']', arg, re.S) + folder_m = re.search(r'folder=["\'](.*?)["\']', arg) + topic = topic_m.group(1) if topic_m else "geral" + content = content_m.group(1) if content_m else arg + folder = folder_m.group(1) if folder_m else "current_week" + + if not content: + return "Erro: Conteúdo vazio." + + target_dir = os.path.join(MEMORY_ROOT, folder) + if not os.path.exists(target_dir): + os.makedirs(target_dir, exist_ok=True) + + filename = f"{topic.lower().replace(' ', '_')}.md" + filepath = os.path.join(target_dir, filename) + timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + entry = f"\n---\n### ENTRY: {timestamp}\n{content}\n" + + with open(filepath, "a" if os.path.exists(filepath) else "w") as f: + f.write(entry) + return f"Sucesso: Salvo em Cronos/{folder}/{filename}" + except Exception as e: + return f"Erro ao salvar em Cronos: {e}" + +def cronos_query(arg: str) -> str: + query_m = re.search(r'query=["\'](.*?)["\']', arg) + folder_m = re.search(r'folder=["\'](.*?)["\']', arg) + query = query_m.group(1) if query_m else arg + folder = folder_m.group(1) if folder_m else "current_week" + target_dir = os.path.join(MEMORY_ROOT, folder) + return run_bash(f"grep -rniI '{query}' {target_dir} | head -n 20") + +# ============================================================ +# GOOGLE WORKSPACE TOOLS +# ============================================================ + +ACCOUNT_MAPPING = { + "ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr", + "adm": "gws-adm", "empresa": "gws-adm", + "4r": "gws-4r", "familia": "gws-4r", "fam": "gws-4r" +} + +def resolve_account(account_alias: str) -> str: + clean = account_alias.strip().lower() + return ACCOUNT_MAPPING.get(clean, f"gws-{clean}") + +def _gws_clean_json(res: str) -> dict: + """Limpa lixo do stdout antes de parsear JSON.""" + json_match = re.search(r"(\{.*\})", res, re.S) + res_clean = json_match.group(1) if json_match else res + return json.loads(res_clean) + +def list_gmail_emails(account_alias: str) -> str: + account = resolve_account(account_alias) + list_cmd = f"{account} gmail users messages list --params '{{\"userId\": \"me\", \"maxResults\": 5}}'" + res = run_bash(list_cmd) + + try: + data = _gws_clean_json(res) + if "error" in data: + err_msg = data["error"].get("message", str(data["error"])) + return f"❌ Erro GWS ({account}): {err_msg}. Talvez precise reautenticar." + + messages = data.get("messages", []) + if not messages: + return "📭 Nenhum e-mail encontrado." + + result_text = "📧 **Últimos E-mails:**\n" + for i, msg in enumerate(messages, 1): + msg_id = msg["id"] + details_cmd = f"{account} gmail users messages get --params '{{\"userId\": \"me\", \"id\": \"{msg_id}\", \"format\": \"metadata\", \"metadataHeaders\": [\"Subject\", \"From\"]}}'" + details_res = run_bash(details_cmd) + try: + dm = re.search(r"(\{.*\})", details_res, re.S) + dc = dm.group(1) if dm else details_res + details = json.loads(dc) + headers = details.get("payload", {}).get("headers", []) + subject = next((h["value"] for h in headers if h["name"] == "Subject"), "Sem Assunto") + sender = next((h["value"] for h in headers if h["name"] == "From"), "Desconhecido") + result_text += f"{i}. **De:** {sender}\n **Assunto:** {subject}\n **ID:** `{msg_id}`\n\n" + except: + result_text += f"{i}. [Erro ao carregar ID: {msg_id}]\n\n" + return result_text + except Exception as e: + return f"Erro: {str(e)}\nRaw: {res[:200]}" + +def gmail_manage_label(arg: str) -> str: + try: + parts = arg.split(maxsplit=1) + account_alias = parts[0] + label_name = parts[1].strip() if len(parts) > 1 else "" + if not label_name: + return "Erro: Nome do marcador não fornecido." + account = resolve_account(account_alias) + + list_res = run_bash(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") + try: + json_match = re.search(r"(\{.*\})", list_res, re.S) + list_res_clean = json_match.group(1) if json_match else list_res + labels_data = json.loads(list_res_clean) + for l in labels_data.get("labels", []): + if l["name"].lower() == label_name.lower(): + return f"Marcador '{l['name']}' já existe (ID: {l['id']})." + except: + pass + + cmd = f"{account} gmail users labels create --params '{{\"userId\": \"me\"}}' --json '{{\"name\": \"{label_name}\", \"labelListVisibility\": \"labelShow\", \"messageListVisibility\": \"show\"}}'" + res = run_bash(cmd) + return f"Criação de '{label_name}': {res}" + except Exception as e: + return f"Erro marcador: {str(e)}" + +def gmail_manage_filter(arg: str) -> str: + try: + parts = arg.split(maxsplit=2) + if len(parts) < 3: + return "Erro: Use 'conta criteria marcador'. Ex: adm @alibaba.com Alibaba" + account_alias, criteria, label_name = parts[0], parts[1], parts[2] + account = resolve_account(account_alias) + + label_id = label_name + list_res = run_bash(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") + try: + json_match = re.search(r"(\{.*\})", list_res, re.S) + list_res_clean = json_match.group(1) if json_match else list_res + labels_data = json.loads(list_res_clean) + for l in labels_data.get("labels", []): + if l["name"].lower() == label_name.lower(): + label_id = l["id"] + break + except: + pass + + criteria_obj = {"from": criteria} if "@" in criteria else {"query": criteria} + filter_obj = { + "criteria": criteria_obj, + "action": {"addLabelIds": [label_id]} + } + cmd = f"{account} gmail users settings filters create --params '{{\"userId\": \"me\"}}' --json '{json.dumps(filter_obj)}'" + return run_bash(cmd) + except Exception as e: + return f"Erro filtro: {str(e)}" + +def drive_find(arg: str) -> str: + try: + parts = arg.split(maxsplit=1) + account_alias = parts[0] + query = parts[1] if len(parts) > 1 else "" + account = resolve_account(account_alias) + q = f"name contains '{query}'" if query else "" + cmd = f"{account} drive files list" + if q: + cmd += f" --params '{{\"q\": \"{q}\"}}'" + res = run_bash(cmd) + data = _gws_clean_json(res) if res.startswith("{") else {} + files = data.get("files", []) + if not files: + return "Nenhum arquivo encontrado." + resp = "📂 **Arquivos Encontrados:**\n" + for f in files[:10]: + resp += f"- {f['name']} (ID: `{f['id']}`)\n" + return resp + except Exception as e: + return f"Erro no Drive: {str(e)}" + +def drive_upload(arg: str) -> str: + try: + parts = arg.split(maxsplit=1) + account_alias, filepath = parts[0], parts[1] + account = resolve_account(account_alias) + filename = os.path.basename(filepath) + cmd = f"{account} drive files create --json '{{\"name\": \"{filename}\"}}' --output {filepath}" + return run_bash(cmd) + except Exception as e: + return f"Erro upload: {str(e)}" + +def calendar_agenda(arg: str) -> str: + try: + parts = arg.split(maxsplit=1) + account_alias = parts[0] + timeframe = parts[1] if len(parts) > 1 else "--today" + account = resolve_account(account_alias) + if not timeframe.startswith("--"): + timeframe = f"--{timeframe}" + cmd = f"{account} calendar +agenda {timeframe}" + return run_bash(cmd) + except Exception as e: + return f"Erro no Calendário: {str(e)}" + +# ============================================================ +# GITEA & SUPABASE (ASYNC) +# ============================================================ + +async def gitea_list_repos() -> str: + from credential_manager import gitea_api_url, gitea_token + url = f"{gitea_api_url()}/user/repos" + headers = {"Authorization": f"token {gitea_token()}"} + async with httpx.AsyncClient() as client: + try: + res = await client.get(url, headers=headers) + repos = res.json() + return "\n".join([f"• {r['name']}" for r in repos[:10]]) + except Exception as e: + return f"Erro Gitea: {e}" + +async def supabase_list_tables() -> str: + from credential_manager import supabase_url, supabase_anon_key + url = f"{supabase_url()}/rest/v1/" + headers = {"apikey": supabase_anon_key(), "Authorization": f"Bearer {supabase_anon_key()}"} + async with httpx.AsyncClient() as client: + try: + res = await client.get(url, headers=headers) + return str(res.json()) + except Exception as e: + return f"Erro Supabase: {e}" + +# ============================================================ +# WORKSPACE TOOLS +# ============================================================ + +class WorkspaceTools: + @staticmethod + def gws_command(cmd: str) -> str: + return run_bash(cmd) + + @staticmethod + def magic_deploy(git_url: str) -> str: + from deploy_manager import DeployManager + dm = DeployManager() + return dm.magic_deploy(git_url) + + @staticmethod + def coolify_deploy_status(arg: str = None) -> str: + cmd = 'docker exec coolify-db psql -U coolify -d coolify -c "SELECT application_name, status, created_at FROM application_deployment_queues ORDER BY created_at DESC LIMIT 5;"' + return run_bash(cmd) + +# ============================================================ +# HERMES ORCHESTRATOR +# ============================================================ + +def delegate_to_hermes(task: str) -> str: + """Delega uma tarefa complexa para o Hermes Agent (MiniMax 2.7) resolver na VPS.""" + import shlex + safe_task = shlex.quote(task) + # Executa o hermes no modo oneshot (-z), com timeout estendido de 5 min + command = f"hermes -z {safe_task}" + return run_bash(command, timeout=300) + +# ============================================================ +# REGISTRY CENTRALIZADO +# ============================================================ + +AVAILABLE_TOOLS = { + # Bash / Sistema + "run_bash": { + "description": "Executa comandos Linux na VPS (docker, git, pm2, etc.)", + "func": run_bash + }, + "bash": { + "description": "Alias para run_bash", + "func": run_bash + }, + # Docker + "docker_ps": { + "description": "Lista containers ativos", + "func": DockerTools.ps + }, + "docker_stats": { + "description": "CPU/RAM por container", + "func": DockerTools.stats + }, + "docker_logs": { + "description": "Logs de um container", + "func": DockerTools.logs + }, + "docker_restart": { + "description": "Reinicia container", + "func": DockerTools.restart + }, + # Git + "git_pull": { + "description": "Pull no repositório", + "func": GitTools.pull + }, + "git_status": { + "description": "Status git", + "func": GitTools.status + }, + # Sistema + "system_health": { + "description": "CPU, RAM, Disco, Uptime", + "func": SystemTools.system_health + }, + "read_file": { + "description": "Lê arquivo na VPS", + "func": SystemTools.read_file + }, + "write_file": { + "description": "Cria/edita arquivo (caminho|conteúdo)", + "func": SystemTools.write_file + }, + "ls": { + "description": "Lista arquivos num diretório", + "func": SystemTools.list_dir + }, + "pm2_status": { + "description": "Status PM2", + "func": SystemTools.pm2_status + }, + "pm2_restart": { + "description": "Reinicia PM2", + "func": SystemTools.pm2_restart + }, + # Cronos + "cronos_log": { + "description": "Salva memória de longo prazo", + "func": cronos_log + }, + "cronos_query": { + "description": "Busca na memória Cronos", + "func": cronos_query + }, + # GWS - Gmail + "list_emails": { + "description": "Lista últimos 5 e-mails (ma/adm/4r)", + "func": list_gmail_emails + }, + "gmail_label": { + "description": "Cria marcador no Gmail", + "func": gmail_manage_label + }, + "gmail_filter": { + "description": "Cria filtro no Gmail", + "func": gmail_manage_filter + }, + # GWS - Drive + "drive_find": { + "description": "Busca arquivos no Drive", + "func": drive_find + }, + "drive_upload": { + "description": "Upload para Drive", + "func": drive_upload + }, + # GWS - Calendar + "calendar_agenda": { + "description": "Eventos do calendário", + "func": calendar_agenda + }, + # Gitea / Supabase + "gitea_repos": { + "description": "Lista repositórios no Gitea", + "func": gitea_list_repos + }, + "supabase_tables": { + "description": "Lista tabelas no Supabase", + "func": supabase_list_tables + }, + # Workspace + "gws": { + "description": "Executa comando GWS CLI", + "func": WorkspaceTools.gws_command + }, + "magic_deploy": { + "description": "Deploy automático via Git URL", + "func": WorkspaceTools.magic_deploy + }, + "coolify_status": { + "description": "Status deploies Coolify", + "func": WorkspaceTools.coolify_deploy_status + }, + "hermes_delegate": { + "description": "Delega tarefas muito complexas para o Operador Master (Hermes Agent)", + "func": delegate_to_hermes + }, + # Legado (aliases) + "get_docker_stats": { + "description": "CPU/RAM por container (legacy)", + "func": DockerTools.stats + }, + "get_system_health": { + "description": "CPU, RAM, Disco, Uptime (legacy)", + "func": SystemTools.system_health + }, + "read_vps_file": { + "description": "Lê arquivo na VPS (legacy)", + "func": SystemTools.read_file + }, +} + +# ============================================================ +# HELPERS +# ============================================================ + +def get_all_tools_formatted() -> str: + res = "🛠️ **Ferramentas Antigravity Ativas**:\\n\\n" + for name, info in AVAILABLE_TOOLS.items(): + res += f"- `{name}`: {info['description']}\\n" + return res + +def get_tools_by_danger(level: str) -> List: + danger_map = { + "bash": "dangerous", + "magic_deploy": "dangerous", + "write_file": "dangerous", + "docker_restart": "dangerous", + "pm2_restart": "medium", + "git_pull": "medium", + "gws": "medium", + "hermes_delegate": "dangerous", + } + result = [] + for name, info in AVAILABLE_TOOLS.items(): + danger = danger_map.get(name, "safe") + if danger == level: + result.append({"name": name, **info, "danger": danger}) + return result diff --git a/main.py b/main.py index dceb8cd..571874c 100644 --- a/main.py +++ b/main.py @@ -201,6 +201,20 @@ async def orchestrate_task(task_data: dict, is_auth: bool = Depends(verify_passw async def get_orch_status(is_auth: bool = Depends(verify_password)): return get_orchestrator_status() +@app.post("/api/hermes") +async def call_hermes_direct(task_data: dict, is_auth: bool = Depends(verify_password)): + from core_tools import delegate_to_hermes + task = task_data.get("task", "") + if not task: + return {"reply": "Tarefa vazia enviada para o Hermes."} + + try: + # Roda a tool que faz a chamada sincrona do subprocess em uma thread + result = await run_in_threadpool(delegate_to_hermes, task) + return {"reply": f"🤖 **Hermes Agent:**\n\n{result}"} + except Exception as e: + return {"reply": f"❌ **Erro no Hermes:** {str(e)}"} + # --- SERVER --- if __name__ == "__main__": import uvicorn diff --git a/orchestrator.py b/orchestrator.py index d995c6f..89fb164 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -11,7 +11,7 @@ from llm_providers import ( call_planner_async, call_executor_async, get_planner_llm, get_executor_llm, get_available_models, LLM_PROVIDERS, set_planner, set_executor, get_config, save_config ) -from tools_v2 import TOOLS_V2, get_tools_by_danger, get_all_tools_formatted +from core_tools import AVAILABLE_TOOLS as TOOLS_V2, get_tools_by_danger, get_all_tools_formatted, run_bash from credential_manager import sync_credentials, get_services_status # ============================================================ @@ -67,7 +67,6 @@ def _format_tools_for_prompt() -> str: async def detect_git_repo_path_async(task: str) -> str: """Detecta automaticamente o caminho do repositório Git (async).""" - from tools_v2 import run_bash task_lower = task.lower() # Mapeamento de APPs conhecidos diff --git a/tools.py b/tools.py index 56ea2dd..3689fc7 100644 --- a/tools.py +++ b/tools.py @@ -1,354 +1,25 @@ -import subprocess -import os -import psutil -import time -import re -import json +""" +tools.py - Stub de compatibilidade. +Todas as funções foram movidas para core_tools.py. +Manter este arquivo para não quebrar imports existentes. +""" +from core_tools import ( + AVAILABLE_TOOLS, + run_bash_command, + get_docker_stats, + read_vps_file, + get_system_health, + cronos_log, + cronos_query, + list_gmail_emails, + gmail_manage_label, + gmail_manage_filter, + drive_find, + drive_upload, + calendar_agenda, + resolve_account, + ACCOUNT_MAPPING, +) -def run_bash_command(command: str) -> str: - """Executa um comando bash na VPS e retorna a saída.""" - try: - # Garante caminhos comuns no PATH para execução via PM2/Containers - custom_env = os.environ.copy() - paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"] - current_path = custom_env.get("PATH", "") - for p in paths: - if p not in current_path: - current_path = f"{p}:{current_path}" - custom_env["PATH"] = current_path - - # Executa comando de forma segura dentro da VPS - result = subprocess.run( - command, - shell=True, - capture_output=True, - text=True, - timeout=60, - env=custom_env - ) - - output = result.stdout.strip() - error = result.stderr.strip() - - # Se encontrou algo no stdout, retornamos o que achou mesmo com erro - if output: - return output - - if result.returncode != 0: - if result.returncode == 127: - return f"ERRO (127): Comando não encontrado. Verifique se o alias ou binário está no PATH. (Comando: {command})" - return f"ERRO ({result.returncode}): {error if error else 'Nada no stderr'}" - - return "Sucesso (vazio)" - except subprocess.TimeoutExpired: - return "ERRO: O comando demorou muito e foi cancelado (timeout)." - except Exception as e: - return f"ERRO fatal ao rodar bash: {str(e)}" - -def get_system_health(*args) -> str: - """Retorna um texto detalhado da saúde do servidor para a IA.""" - cpu = psutil.cpu_percent(interval=0.1) - vm = psutil.virtual_memory() - disk = psutil.disk_usage('/') - - # Uptime aproximado usando psutil - uptime_seconds = time.time() - psutil.boot_time() - uptime_hours = round(uptime_seconds / 3600, 1) - - return (f"CPU: {cpu}% | " - f"RAM: {round(vm.used / (1024**3), 2)}GB usada / {round(vm.total / (1024**3), 2)}GB total ({vm.percent}%) | " - f"Disco: {disk.percent}% usado | " - f"Uptime: {uptime_hours}h") - -def read_vps_file(filepath: str) -> str: - """Lê um arquivo do sistema de arquivos da VPS através do mapeamento /host_root.""" - # Garante que o caminho comece com /host_root - clean_path = filepath - if clean_path.startswith("/host_root"): - host_path = clean_path - else: - # Se o usuário mandou /root/vps..., vira /host_root/root/vps... - host_path = os.path.join("/host_root", clean_path.lstrip("/")) - - try: - if not os.path.exists("/host_root"): - return "ERRO CRÍTICO: O diretório /host_root não existe no container! O mapeamento de volume falhou." - - if not os.path.exists(host_path): - # Tenta listar o diretório pai para ajudar no debug - parent = os.path.dirname(host_path) - content = os.listdir(parent) if os.path.exists(parent) else "Pai não existe" - return f"Erro: {filepath} não encontrado (Caminho real: {host_path}). Conteúdo do pai: {content}" - - with open(host_path, 'r') as f: - return f.read(2000) - except Exception as e: - return f"Erro ao acessar {filepath}: {e}" - -def get_docker_stats() -> str: - """Retorna o uso de CPU/RAM de todos os containers ativos via comando docker stats.""" - return run_bash_command('docker stats --no-stream --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"') - -# ============================================================ -# SISTEMA CRONOS (MEMÓRIA DE LONGO PRAZO) -# ============================================================ - -MEMORY_ROOT = "/root/Antigravity_Memory" - -def cronos_log(arg: str) -> str: - """ - Salva memórias. Formato esperado: topic="assunto", content="texto", folder="current_week" - Também aceita JSON. - """ - try: - # Tenta como JSON primeiro - try: - data = json.loads(arg) - topic = data.get("topic", "geral") - content = data.get("content", "") - folder = data.get("folder", "current_week") - except: - # Parser simples por regex para query="xxx" ou topic="xxx" - topic_m = re.search(r'topic=["\'](.*?)["\']', arg) - content_m = re.search(r'content=["\'](.*?)["\']', arg, re.S) - folder_m = re.search(r'folder=["\'](.*?)["\']', arg) - - topic = topic_m.group(1) if topic_m else "geral" - content = content_m.group(1) if content_m else arg # fallback para arg bruto - folder = folder_m.group(1) if folder_m else "current_week" - - if not content: return "Erro: Conteúdo vazio." - - target_dir = os.path.join(MEMORY_ROOT, folder) - if not os.path.exists(target_dir): - os.makedirs(target_dir, exist_ok=True) - - filename = f"{topic.lower().replace(' ', '_')}.md" - filepath = os.path.join(target_dir, filename) - - timestamp = time.strftime("%Y-%m-%d %H:%M:%S") - entry = f"\n---\n### ENTRY: {timestamp}\n{content}\n" - - with open(filepath, "a" if os.path.exists(filepath) else "w") as f: - f.write(entry) - - return f"Sucesso: Salvo em Cronos/{folder}/{filename}" - except Exception as e: - return f"Erro ao salvar em Cronos: {e}" - -def cronos_query(arg: str) -> str: - """Busca informações. Formato: query="termo", folder="pasta" """ - query_m = re.search(r'query=["\'](.*?)["\']', arg) - folder_m = re.search(r'folder=["\'](.*?)["\']', arg) - - query = query_m.group(1) if query_m else arg - folder = folder_m.group(1) if folder_m else "current_week" - - target_dir = os.path.join(MEMORY_ROOT, folder) - return run_bash_command(f"grep -rniI '{query}' {target_dir} | head -n 20") - -# ============================================================ -# GOOGLE WORKSPACE TOOLS -# ============================================================ - -ACCOUNT_MAPPING = { - "ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr", - "adm": "gws-adm", "empresa": "gws-adm", - "4r": "gws-4r", "familia": "gws-4r", "fam": "gws-4r" -} - -def resolve_account(account_alias: str) -> str: - clean = account_alias.strip().lower() - return ACCOUNT_MAPPING.get(clean, f"gws-{clean}") - -def list_gmail_emails(account_alias: str) -> str: - """Lista os últimos 5 e-mails com Título e Remetente. Aceita apelidos: ma, mr, adm, 4r.""" - account = resolve_account(account_alias) - list_cmd = f"{account} gmail users messages list --params '{{\"userId\": \"me\", \"maxResults\": 5}}'" - res = run_bash_command(list_cmd) - - try: - # Limpeza de JSON (remove lixo do keyring/stderr no stdout) - json_match = re.search(r"(\{.*\})", res, re.S) - res_clean = json_match.group(1) if json_match else res - data = json.loads(res_clean) - - if "error" in data: - err_msg = data["error"].get("message", str(data["error"])) - return f"❌ Erro de Autenticação GWS ({account}): {err_msg}. Você pode precisar reautenticar o CLI na VPS." - - messages = data.get("messages", []) - if not messages: return "📭 Nenhum e-mail encontrado na caixa de entrada." - - result_text = "📧 **Últimos E-mails:**\n" - for i, msg in enumerate(messages, 1): - msg_id = msg["id"] - details_cmd = f"{account} gmail users messages get --params '{{\"userId\": \"me\", \"id\": \"{msg_id}\", \"format\": \"metadata\", \"metadataHeaders\": [\"Subject\", \"From\"]}}'" - details_res = run_bash_command(details_cmd) - try: - # Limpeza de JSON também nos detalhes - dm = re.search(r"(\{.*\})", details_res, re.S) - dc = dm.group(1) if dm else details_res - details = json.loads(dc) - - headers = details.get("payload", {}).get("headers", []) - subject = next((h["value"] for h in headers if h["name"] == "Subject"), "Sem Assunto") - sender = next((h["value"] for h in headers if h["name"] == "From"), "Desconhecido") - result_text += f"{i}. **De:** {sender}\n **Assunto:** {subject}\n **ID:** `{msg_id}`\n\n" - except: - result_text += f"{i}. [Erro ao carregar detalhes do ID: {msg_id}]\n\n" - return result_text - except Exception as e: - return f"Erro ao listar e-mails: {str(e)}\nResposta bruta: {res[:200]}" - -def gmail_manage_label(arg: str) -> str: - """Cria ou busca marcadores (labels). Arg: account name (ex: adm alibaba)""" - try: - parts = arg.split(maxsplit=1) - account_alias = parts[0] - label_name = parts[1].strip() if len(parts) > 1 else "" - if not label_name: return "Erro: Nome do marcador não fornecido." - account = resolve_account(account_alias) - - # 1. Verifica se já existe - list_res = run_bash_command(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") - try: - # Extrai apenas o JSON se houver lixo no stdout (ex: Using keyring...) - json_match = re.search(r"(\{.*\})", list_res, re.S) - list_res_clean = json_match.group(1) if json_match else list_res - labels_data = json.loads(list_res_clean) - for l in labels_data.get("labels", []): - if l["name"].lower() == label_name.lower(): - return f"Marcador '{l['name']}' já existe (ID: {l['id']})." - except: pass - - # 2. Tenta criar - cmd = f"{account} gmail users labels create --params '{{\"userId\": \"me\"}}' --json '{{\"name\": \"{label_name}\", \"labelListVisibility\": \"labelShow\", \"messageListVisibility\": \"show\"}}'" - res = run_bash_command(cmd) - return f"Criação de '{label_name}': {res}" - except Exception as e: return f"Erro marcador: {str(e)}" - -def gmail_manage_filter(arg: str) -> str: - """Cria um filtro para e-mails. Arg: account criteria label_name (ex: adm alibaba alibaba)""" - try: - parts = arg.split(maxsplit=2) - if len(parts) < 3: return "Erro: Use 'conta criteria marcador'. Ex: adm alibaba alibaba" - account_alias, criteria, label_name = parts[0], parts[1], parts[2] - account = resolve_account(account_alias) - - # Busca o ID do marcador pelo nome - label_id = label_name - list_res = run_bash_command(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") - try: - json_match = re.search(r"(\{.*\})", list_res, re.S) - list_res_clean = json_match.group(1) if json_match else list_res - labels_data = json.loads(list_res_clean) - for l in labels_data.get("labels", []): - if l["name"].lower() == label_name.lower(): - label_id = l["id"] - break - except: pass - - criteria_obj = {"from": criteria} if "@" in criteria else {"query": criteria} - filter_obj = { - "criteria": criteria_obj, - "action": {"addLabelIds": [label_id]} - } - cmd = f"{account} gmail users settings filters create --params '{{\"userId\": \"me\"}}' --json '{json.dumps(filter_obj)}'" - return run_bash_command(cmd) - except Exception as e: return f"Erro filtro: {str(e)}" - -def drive_find(arg: str) -> str: - """Busca arquivos no Drive por nome. Arg: account query (ex: ma financeiro)""" - try: - parts = arg.split(maxsplit=1) - account_alias = parts[0] - query = parts[1] if len(parts) > 1 else "" - account = resolve_account(account_alias) - q = f"name contains '{query}'" if query else "" - cmd = f"{account} drive files list" - if q: cmd += f" --params '{{\"q\": \"{q}\"}}'" - res = run_bash_command(cmd) - data = json.loads(res) - files = data.get("files", []) - if not files: return "Nenhum arquivo encontrado." - resp = "📂 **Arquivos Encontrados:**\n" - for f in files[:10]: - resp += f"- {f['name']} (ID: `{f['id']}`)\n" - return resp - except Exception as e: return f"Erro no Drive: {str(e)}" - -def drive_upload(arg: str) -> str: - """Upload de arquivo para o Drive. Arg: account filepath (ex: ma /tmp/relat.pdf)""" - try: - parts = arg.split(maxsplit=1) - account_alias, filepath = parts[0], parts[1] - account = resolve_account(account_alias) - filename = os.path.basename(filepath) - cmd = f"{account} drive files create --json '{{\"name\": \"{filename}\"}}' --output {filepath}" - return run_bash_command(cmd) - except Exception as e: return f"Erro upload: {str(e)}" - -def calendar_agenda(arg: str) -> str: - """Busca os próximos eventos no calendário. Arg: account timeframe (timeframe: today, tomorrow, week, days=N)""" - try: - parts = arg.split(maxsplit=1) - account_alias = parts[0] - timeframe = parts[1] if len(parts) > 1 else "--today" - account = resolve_account(account_alias) - if not timeframe.startswith("--"): timeframe = f"--{timeframe}" - cmd = f"{account} calendar +agenda {timeframe}" - return run_bash_command(cmd) - except Exception as e: return f"Erro no Calendário: {str(e)}" - -# Mapeamento para o Agente entender quais tools ele possui -AVAILABLE_TOOLS = { - "run_bash_command": { - "description": "Executa comandos Linux na VPS. Use para docker, git, mkdir, touch, etc.", - "func": run_bash_command - }, - "list_gmail_emails": { - "description": "Lista os 5 e-mails mais recentes de uma conta (ma, adm, 4r) com título e remetente.", - "func": list_gmail_emails - }, - "drive_find": { - "description": "Busca arquivos no Drive por nome. Ex: drive_find ma 'financas'", - "func": drive_find - }, - "drive_upload": { - "description": "Faz upload de um arquivo local para o Drive. Ex: drive_upload adm /tmp/doc.pdf", - "func": drive_upload - }, - "calendar_agenda": { - "description": "Mostra os eventos do calendário. Ex: calendar_agenda ma today / tomorrow / week", - "func": calendar_agenda - }, - "get_system_health": { - "description": "Verifica RAM, CPU e Disco globais da VPS.", - "func": get_system_health - }, - "read_vps_file": { - "description": "Lê o conteúdo de um arquivo na VPS (logs, configs).", - "func": read_vps_file - }, - "get_docker_stats": { - "description": "Retorna uma tabela com o consumo de CPU e Memória de cada container.", - "func": get_docker_stats - }, - "cronos_log": { - "description": "Salva memórias importantes (assunto, conteúdo, pasta=current_week|knowledge). Use sempre que algo for concluído.", - "func": cronos_log - }, - "cronos_query": { - "description": "Busca informações no histórico da Memória Cronos.", - "func": cronos_query - }, - "gmail_manage_label": { - "description": "Cria um novo marcador (Label) no Gmail. Ex: gmail_manage_label adm alibaba", - "func": gmail_manage_label - }, - "gmail_manage_filter": { - "description": "Cria um filtro automático no Gmail. Ex: gmail_manage_filter adm @alibaba.com Alibaba", - "func": gmail_manage_filter - } -} +# Mantém AVAILABLE_TOOLS com mesmo nome do arquivo antigo +AVAILABLE_TOOLS = AVAILABLE_TOOLS diff --git a/tools_v2.py b/tools_v2.py index f631f29..123d721 100644 --- a/tools_v2.py +++ b/tools_v2.py @@ -1,214 +1,33 @@ -import subprocess -import os -import httpx -import asyncio -from typing import Dict, List, Optional -from credential_manager import ( - gitea_api_url, gitea_token, supabase_url, supabase_anon_key, - supabase_service_role_key +""" +tools_v2.py - Stub de compatibilidade. +Todas as funções foram movidas para core_tools.py. +Manter este arquivo para não quebrar imports existentes. +""" +from core_tools import ( + AVAILABLE_TOOLS as TOOLS_V2, + run_bash, + DockerTools, + GitTools, + SystemTools, + WorkspaceTools, + gitea_list_repos, + supabase_list_tables, + get_all_tools_formatted, + get_tools_by_danger, ) -from deploy_manager import DeployManager - -# ============================================================ -# UTILS -# ============================================================ - -def run_bash(command: str, timeout: int = 120) -> Dict: - # Auto-moderniza docker-compose - command = command.replace("docker-compose", "docker compose") - try: - # Garante caminhos comuns no PATH para execução via PM2/Containers - custom_env = os.environ.copy() - paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"] - current_path = custom_env.get("PATH", "") - for p in paths: - if p not in current_path: - current_path = f"{p}:{current_path}" - custom_env["PATH"] = current_path - - result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout, env=custom_env) - return { - "success": result.returncode == 0, - "output": (result.stdout or result.stderr).strip() or "Sucesso" - } - except Exception as e: - return {"success": False, "output": str(e)} - -# ============================================================ -# DOCKER TOOLS -# ============================================================ - -class DockerTools: - @staticmethod - def ps() -> str: - return run_bash("docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}'")["output"] - - @staticmethod - def stats() -> str: - return run_bash("docker stats --no-stream --format 'table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}'")["output"] - - @staticmethod - def logs(container: str, lines: int = 50) -> str: - return run_bash(f"docker logs --tail {lines} {container}")["output"] - - @staticmethod - def restart(container: str) -> str: - return run_bash(f"docker restart {container}")["output"] - -# ============================================================ -# GIT TOOLS -# ============================================================ - -class GitTools: - @staticmethod - def pull(repo_path: str = ".") -> str: - return run_bash(f"git -C {repo_path} pull")["output"] - - @staticmethod - def status(repo_path: str = ".") -> str: - return run_bash(f"git -C {repo_path} status --short")["output"] - -# ============================================================ -# API TOOLS (ASYNC) -# ============================================================ +# Reclassificar como classes para compat (ou simples re-export) class GiteaTools: @staticmethod - async def list_repos() -> str: - url = f"{gitea_api_url()}/user/repos" - headers = {"Authorization": f"token {gitea_token()}"} - async with httpx.AsyncClient() as client: - try: - res = await client.get(url, headers=headers) - repos = res.json() - return "\n".join([f"• {r['name']}" for r in repos[:10]]) - except Exception as e: return f"Erro Gitea: {e}" + def list_repos() -> str: + import asyncio + return asyncio.run(gitea_list_repos()) class SupabaseTools: @staticmethod - async def list_tables() -> str: - url = f"{supabase_url()}/rest/v1/" - headers = {"apikey": supabase_anon_key(), "Authorization": f"Bearer {supabase_anon_key()}"} - async with httpx.AsyncClient() as client: - try: - res = await client.get(url, headers=headers) - return str(res.json()) - except Exception as e: return f"Erro Supabase: {e}" + def list_tables() -> str: + import asyncio + return asyncio.run(supabase_list_tables()) -# ============================================================ -# SYSTEM TOOLS (MASTER SKILL) -# ============================================================ - -class SystemTools: - @staticmethod - def execute_bash(command: str) -> str: - """Executa um comando bash arbitrário na VPS.""" - return run_bash(command)["output"] - - @staticmethod - def read_file(path: str) -> str: - """Lê o conteúdo total de um arquivo na VPS.""" - try: - with open(path, 'r') as f: - return f.read() - except Exception as e: - return f"Erro ao ler arquivo: {e}" - - @staticmethod - def write_file(path_content: str) -> str: - """Escreve conteúdo em um arquivo. Formato esperado: 'caminho|conteúdo'""" - try: - if "|" not in path_content: - return "Erro: Use o formato 'caminho|conteúdo'" - path, content = path_content.split("|", 1) - path = path.strip() - # Garante que o diretório existe - os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) - with open(path, 'w') as f: - f.write(content) - return f"Sucesso: {path} atualizado." - except Exception as e: - return f"Erro ao escrever: {e}" - - @staticmethod - def list_dir(path: str = ".") -> str: - """Lista arquivos e pastas de um diretório.""" - try: - items = os.listdir(path) - return "\n".join(items) - except Exception as e: - return f"Erro ao listar {path}: {e}" - - @staticmethod - def pm2_status() -> str: - """Lista todos os processos gerenciados pelo PM2.""" - return run_bash("pm2 jlist")["output"] - - @staticmethod - def pm2_restart(name: str) -> str: - """Reinicia um processo no PM2 pelo nome ou ID.""" - return run_bash(f"pm2 restart {name}")["output"] - -# ============================================================ -# WORKSPACE TOOLS (GWS CLI) -# ============================================================ - -class WorkspaceTools: - @staticmethod - def gws_command(cmd: str) -> str: - """Executa um comando gws completo (ex: gws-mr gmail +send ...).""" - return run_bash(cmd)["output"] - - @staticmethod - def magic_deploy(git_url: str) -> str: - """Clona um repositório git e tenta realizar o deploy automático (Docker/Node/Python).""" - dm = DeployManager() - return dm.magic_deploy(git_url) - - @staticmethod - def coolify_deploy_status(arg: str = None) -> str: - """Consulta os últimos 5 deploies registrados no Coolify via banco de dados.""" - cmd = 'docker exec coolify-db psql -U coolify -d coolify -c "SELECT application_name as application, status, created_at FROM application_deployment_queues ORDER BY created_at DESC LIMIT 5;"' - return run_bash(cmd)["output"] - -# ============================================================ -# REGISTRY -# ============================================================ - -TOOLS_V2 = { - # Docker - "docker_ps": {"desc": "Lista containers ativos", "func": DockerTools.ps, "danger": "safe"}, - "docker_stats": {"desc": "Uso de CPU/RAM por container", "func": DockerTools.stats, "danger": "safe"}, - "docker_logs": {"desc": "Ver logs de um container", "func": DockerTools.logs, "danger": "safe"}, - "docker_restart": {"desc": "Reiniciar container", "func": DockerTools.restart, "danger": "dangerous"}, - - # Git - "git_pull": {"desc": "Faz pull no repositório atual", "func": GitTools.pull, "danger": "medium"}, - "git_status": {"desc": "Verifica status do git", "func": GitTools.status, "danger": "safe"}, - "gitea_repos": {"desc": "Lista repositórios no Gitea", "func": GiteaTools.list_repos, "danger": "safe"}, - - # Supabase - "supabase_tables": {"desc": "Lista tabelas no Supabase", "func": SupabaseTools.list_tables, "danger": "safe"}, - - # System (Master Skill) - "bash": {"desc": "Executa comando shell direto", "func": SystemTools.execute_bash, "danger": "dangerous"}, - "read_file": {"desc": "Lê conteúdo de um arquivo", "func": SystemTools.read_file, "danger": "safe"}, - "write_file": {"desc": "Cria ou edita arquivo (caminho|conteúdo)", "func": SystemTools.write_file, "danger": "dangerous"}, - "ls": {"desc": "Lista arquivos num diretório", "func": SystemTools.list_dir, "danger": "safe"}, - "pm2_status": {"desc": "Status dos processos PM2", "func": SystemTools.pm2_status, "danger": "safe"}, - "pm2_restart": {"desc": "Reiniciar processo PM2", "func": SystemTools.pm2_restart, "danger": "medium"}, - "magic_deploy": {"desc": "Deploy automático via URL Git", "func": WorkspaceTools.magic_deploy, "danger": "dangerous"}, - "coolify_status": {"desc": "Status dos últimos deploies no Coolify", "func": WorkspaceTools.coolify_deploy_status, "danger": "safe"}, - - # Google Workspace - "gws": {"desc": "Executa comando GWS CLI (ex: gws-mr drive files list)", "func": WorkspaceTools.gws_command, "danger": "medium"}, -} - -def get_all_tools_formatted() -> str: - res = "🛠️ **Ferramentas Antigravity Ativas (V2)**:\n\n" - for name, info in TOOLS_V2.items(): - res += f"- `{name}`: {info['desc']} [{info['danger'].upper()}]\n" - return res - -def get_tools_by_danger(level: str) -> List: - return [{"name": k, **v} for k, v in TOOLS_V2.items() if v["danger"] == level] +# Mantém compat comorchestrator.py que referencia TOOLS_V2 +TOOLS_V2 = TOOLS_V2