import subprocess import os import psutil import time import re import json def run_bash_command(command: str) -> str: """Executa um comando bash na VPS e retorna a saída.""" try: # Garante caminhos comuns no PATH para execução via PM2/Containers custom_env = os.environ.copy() paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"] current_path = custom_env.get("PATH", "") for p in paths: if p not in current_path: current_path = f"{p}:{current_path}" custom_env["PATH"] = current_path # Executa comando de forma segura dentro da VPS result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=60, env=custom_env ) output = result.stdout.strip() error = result.stderr.strip() # Se encontrou algo no stdout, retornamos o que achou mesmo com erro if output: return output if result.returncode != 0: if result.returncode == 127: return f"ERRO (127): Comando não encontrado. Verifique se o alias ou binário está no PATH. (Comando: {command})" return f"ERRO ({result.returncode}): {error if error else 'Nada no stderr'}" return "Sucesso (vazio)" except subprocess.TimeoutExpired: return "ERRO: O comando demorou muito e foi cancelado (timeout)." except Exception as e: return f"ERRO fatal ao rodar bash: {str(e)}" def get_system_health() -> str: """Retorna um texto detalhado da saúde do servidor para a IA.""" cpu = psutil.cpu_percent(interval=0.1) vm = psutil.virtual_memory() disk = psutil.disk_usage('/') # Uptime aproximado usando psutil uptime_seconds = time.time() - psutil.boot_time() uptime_hours = round(uptime_seconds / 3600, 1) return (f"CPU: {cpu}% | " f"RAM: {round(vm.used / (1024**3), 2)}GB usada / {round(vm.total / (1024**3), 2)}GB total ({vm.percent}%) | " f"Disco: {disk.percent}% usado | " f"Uptime: {uptime_hours}h") def read_vps_file(filepath: str) -> str: """Lê um arquivo do sistema de arquivos da VPS através do mapeamento /host_root.""" # Garante que o caminho comece com /host_root clean_path = filepath if clean_path.startswith("/host_root"): host_path = clean_path else: # Se o usuário mandou /root/vps..., vira /host_root/root/vps... host_path = os.path.join("/host_root", clean_path.lstrip("/")) try: if not os.path.exists("/host_root"): return "ERRO CRÍTICO: O diretório /host_root não existe no container! O mapeamento de volume falhou." if not os.path.exists(host_path): # Tenta listar o diretório pai para ajudar no debug parent = os.path.dirname(host_path) content = os.listdir(parent) if os.path.exists(parent) else "Pai não existe" return f"Erro: {filepath} não encontrado (Caminho real: {host_path}). Conteúdo do pai: {content}" with open(host_path, 'r') as f: return f.read(2000) except Exception as e: return f"Erro ao acessar {filepath}: {e}" def get_docker_stats() -> str: """Retorna o uso de CPU/RAM de todos os containers ativos via comando docker stats.""" return run_bash_command('docker stats --no-stream --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"') # ============================================================ # SISTEMA CRONOS (MEMÓRIA DE LONGO PRAZO) # ============================================================ MEMORY_ROOT = "/root/Antigravity_Memory" def cronos_log(arg: str) -> str: """ Salva memórias. Formato esperado: topic="assunto", content="texto", folder="current_week" Também aceita JSON. """ try: # Tenta como JSON primeiro try: data = json.loads(arg) topic = data.get("topic", "geral") content = data.get("content", "") folder = data.get("folder", "current_week") except: # Parser simples por regex para query="xxx" ou topic="xxx" topic_m = re.search(r'topic=["\'](.*?)["\']', arg) content_m = re.search(r'content=["\'](.*?)["\']', arg, re.S) folder_m = re.search(r'folder=["\'](.*?)["\']', arg) topic = topic_m.group(1) if topic_m else "geral" content = content_m.group(1) if content_m else arg # fallback para arg bruto folder = folder_m.group(1) if folder_m else "current_week" if not content: return "Erro: Conteúdo vazio." target_dir = os.path.join(MEMORY_ROOT, folder) if not os.path.exists(target_dir): os.makedirs(target_dir, exist_ok=True) filename = f"{topic.lower().replace(' ', '_')}.md" filepath = os.path.join(target_dir, filename) timestamp = time.strftime("%Y-%m-%d %H:%M:%S") entry = f"\n---\n### ENTRY: {timestamp}\n{content}\n" with open(filepath, "a" if os.path.exists(filepath) else "w") as f: f.write(entry) return f"Sucesso: Salvo em Cronos/{folder}/{filename}" except Exception as e: return f"Erro ao salvar em Cronos: {e}" def cronos_query(arg: str) -> str: """Busca informações. Formato: query="termo", folder="pasta" """ query_m = re.search(r'query=["\'](.*?)["\']', arg) folder_m = re.search(r'folder=["\'](.*?)["\']', arg) query = query_m.group(1) if query_m else arg folder = folder_m.group(1) if folder_m else "current_week" target_dir = os.path.join(MEMORY_ROOT, folder) return run_bash_command(f"grep -rniI '{query}' {target_dir} | head -n 20") # ============================================================ # GOOGLE WORKSPACE TOOLS # ============================================================ ACCOUNT_MAPPING = { "ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr", "adm": "gws-adm", "empresa": "gws-adm", "4r": "gws-4r", "familia": "gws-4r", "fam": "gws-4r" } def resolve_account(account_alias: str) -> str: clean = account_alias.strip().lower() return ACCOUNT_MAPPING.get(clean, f"gws-{clean}") def list_gmail_emails(account_alias: str) -> str: """Lista os últimos 5 e-mails com Título e Remetente. Aceita apelidos: ma, mr, adm, 4r.""" account = resolve_account(account_alias) list_cmd = f"{account} gmail users messages list --params '{{\"userId\": \"me\", \"maxResults\": 5}}'" res = run_bash_command(list_cmd) try: # Limpeza de JSON (remove lixo do keyring/stderr no stdout) json_match = re.search(r"(\{.*\})", res, re.S) res_clean = json_match.group(1) if json_match else res data = json.loads(res_clean) if "error" in data: err_msg = data["error"].get("message", str(data["error"])) return f"❌ Erro de Autenticação GWS ({account}): {err_msg}. Você pode precisar reautenticar o CLI na VPS." messages = data.get("messages", []) if not messages: return "📭 Nenhum e-mail encontrado na caixa de entrada." result_text = "📧 **Últimos E-mails:**\n" for i, msg in enumerate(messages, 1): msg_id = msg["id"] details_cmd = f"{account} gmail users messages get --params '{{\"userId\": \"me\", \"id\": \"{msg_id}\", \"format\": \"metadata\", \"metadataHeaders\": [\"Subject\", \"From\"]}}'" details_res = run_bash_command(details_cmd) try: # Limpeza de JSON também nos detalhes dm = re.search(r"(\{.*\})", details_res, re.S) dc = dm.group(1) if dm else details_res details = json.loads(dc) headers = details.get("payload", {}).get("headers", []) subject = next((h["value"] for h in headers if h["name"] == "Subject"), "Sem Assunto") sender = next((h["value"] for h in headers if h["name"] == "From"), "Desconhecido") result_text += f"{i}. **De:** {sender}\n **Assunto:** {subject}\n **ID:** `{msg_id}`\n\n" except: result_text += f"{i}. [Erro ao carregar detalhes do ID: {msg_id}]\n\n" return result_text except Exception as e: return f"Erro ao listar e-mails: {str(e)}\nResposta bruta: {res[:200]}" def gmail_manage_label(arg: str) -> str: """Cria ou busca marcadores (labels). Arg: account name (ex: adm alibaba)""" try: parts = arg.split(maxsplit=1) account_alias = parts[0] label_name = parts[1].strip() if len(parts) > 1 else "" if not label_name: return "Erro: Nome do marcador não fornecido." account = resolve_account(account_alias) # 1. Verifica se já existe list_res = run_bash_command(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") try: # Extrai apenas o JSON se houver lixo no stdout (ex: Using keyring...) json_match = re.search(r"(\{.*\})", list_res, re.S) list_res_clean = json_match.group(1) if json_match else list_res labels_data = json.loads(list_res_clean) for l in labels_data.get("labels", []): if l["name"].lower() == label_name.lower(): return f"Marcador '{l['name']}' já existe (ID: {l['id']})." except: pass # 2. Tenta criar cmd = f"{account} gmail users labels create --params '{{\"userId\": \"me\"}}' --json '{{\"name\": \"{label_name}\", \"labelListVisibility\": \"labelShow\", \"messageListVisibility\": \"show\"}}'" res = run_bash_command(cmd) return f"Criação de '{label_name}': {res}" except Exception as e: return f"Erro marcador: {str(e)}" def gmail_manage_filter(arg: str) -> str: """Cria um filtro para e-mails. Arg: account criteria label_name (ex: adm alibaba alibaba)""" try: parts = arg.split(maxsplit=2) if len(parts) < 3: return "Erro: Use 'conta criteria marcador'. Ex: adm alibaba alibaba" account_alias, criteria, label_name = parts[0], parts[1], parts[2] account = resolve_account(account_alias) # Busca o ID do marcador pelo nome label_id = label_name list_res = run_bash_command(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'") try: json_match = re.search(r"(\{.*\})", list_res, re.S) list_res_clean = json_match.group(1) if json_match else list_res labels_data = json.loads(list_res_clean) for l in labels_data.get("labels", []): if l["name"].lower() == label_name.lower(): label_id = l["id"] break except: pass criteria_obj = {"from": criteria} if "@" in criteria else {"query": criteria} filter_obj = { "criteria": criteria_obj, "action": {"addLabelIds": [label_id]} } cmd = f"{account} gmail users settings filters create --params '{{\"userId\": \"me\"}}' --json '{json.dumps(filter_obj)}'" return run_bash_command(cmd) except Exception as e: return f"Erro filtro: {str(e)}" def drive_find(arg: str) -> str: """Busca arquivos no Drive por nome. Arg: account query (ex: ma financeiro)""" try: parts = arg.split(maxsplit=1) account_alias = parts[0] query = parts[1] if len(parts) > 1 else "" account = resolve_account(account_alias) q = f"name contains '{query}'" if query else "" cmd = f"{account} drive files list" if q: cmd += f" --params '{{\"q\": \"{q}\"}}'" res = run_bash_command(cmd) data = json.loads(res) files = data.get("files", []) if not files: return "Nenhum arquivo encontrado." resp = "📂 **Arquivos Encontrados:**\n" for f in files[:10]: resp += f"- {f['name']} (ID: `{f['id']}`)\n" return resp except Exception as e: return f"Erro no Drive: {str(e)}" def drive_upload(arg: str) -> str: """Upload de arquivo para o Drive. Arg: account filepath (ex: ma /tmp/relat.pdf)""" try: parts = arg.split(maxsplit=1) account_alias, filepath = parts[0], parts[1] account = resolve_account(account_alias) filename = os.path.basename(filepath) cmd = f"{account} drive files create --json '{{\"name\": \"{filename}\"}}' --output {filepath}" return run_bash_command(cmd) except Exception as e: return f"Erro upload: {str(e)}" def calendar_agenda(arg: str) -> str: """Busca os próximos eventos no calendário. Arg: account timeframe (timeframe: today, tomorrow, week, days=N)""" try: parts = arg.split(maxsplit=1) account_alias = parts[0] timeframe = parts[1] if len(parts) > 1 else "--today" account = resolve_account(account_alias) if not timeframe.startswith("--"): timeframe = f"--{timeframe}" cmd = f"{account} calendar +agenda {timeframe}" return run_bash_command(cmd) except Exception as e: return f"Erro no Calendário: {str(e)}" # Mapeamento para o Agente entender quais tools ele possui AVAILABLE_TOOLS = { "run_bash_command": { "description": "Executa comandos Linux na VPS. Use para docker, git, mkdir, touch, etc.", "func": run_bash_command }, "list_gmail_emails": { "description": "Lista os 5 e-mails mais recentes de uma conta (ma, adm, 4r) com título e remetente.", "func": list_gmail_emails }, "drive_find": { "description": "Busca arquivos no Drive por nome. Ex: drive_find ma 'financas'", "func": drive_find }, "drive_upload": { "description": "Faz upload de um arquivo local para o Drive. Ex: drive_upload adm /tmp/doc.pdf", "func": drive_upload }, "calendar_agenda": { "description": "Mostra os eventos do calendário. Ex: calendar_agenda ma today / tomorrow / week", "func": calendar_agenda }, "get_system_health": { "description": "Verifica RAM, CPU e Disco globais da VPS.", "func": get_system_health }, "read_vps_file": { "description": "Lê o conteúdo de um arquivo na VPS (logs, configs).", "func": read_vps_file }, "get_docker_stats": { "description": "Retorna uma tabela com o consumo de CPU e Memória de cada container.", "func": get_docker_stats }, "cronos_log": { "description": "Salva memórias importantes (assunto, conteúdo, pasta=current_week|knowledge). Use sempre que algo for concluído.", "func": cronos_log }, "cronos_query": { "description": "Busca informações no histórico da Memória Cronos.", "func": cronos_query }, "gmail_manage_label": { "description": "Cria um novo marcador (Label) no Gmail. Ex: gmail_manage_label adm alibaba", "func": gmail_manage_label }, "gmail_manage_filter": { "description": "Cria um filtro automático no Gmail. Ex: gmail_manage_filter adm @alibaba.com Alibaba", "func": gmail_manage_filter } }