import subprocess import os import psutil import time import re import json def run_bash_command(command: str) -> str: """Executa um comando bash na VPS e retorna a saída.""" try: # Garante caminhos comuns no PATH para execução via PM2/Containers custom_env = os.environ.copy() paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"] current_path = custom_env.get("PATH", "") for p in paths: if p not in current_path: current_path = f"{p}:{current_path}" custom_env["PATH"] = current_path # Executa comando de forma segura dentro da VPS result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=120, env=custom_env ) output = result.stdout.strip() error = result.stderr.strip() # Se encontrou algo no stdout, retornamos o que achou mesmo com erro if output: return output if result.returncode != 0: if result.returncode == 127: return f"ERRO (127): Comando não encontrado. Verifique se o alias ou binário está no PATH. (Comando: {command})" return f"ERRO ({result.returncode}): {error if error else 'Nada no stderr'}" return "Sucesso (vazio)" except subprocess.TimeoutExpired: return "ERRO: O comando demorou muito e foi cancelado (timeout)." except Exception as e: return f"ERRO fatal ao rodar bash: {str(e)}" def get_system_health() -> str: """Retorna um texto detalhado da saúde do servidor para a IA.""" cpu = psutil.cpu_percent(interval=0.1) vm = psutil.virtual_memory() disk = psutil.disk_usage('/') # Uptime aproximado usando psutil uptime_seconds = time.time() - psutil.boot_time() uptime_hours = round(uptime_seconds / 3600, 1) return (f"CPU: {cpu}% | " f"RAM: {round(vm.used / (1024**3), 2)}GB usada / {round(vm.total / (1024**3), 2)}GB total ({vm.percent}%) | " f"Disco: {disk.percent}% usado | " f"Uptime: {uptime_hours}h") def read_vps_file(filepath: str) -> str: """Lê um arquivo do sistema de arquivos da VPS através do mapeamento /host_root.""" # Garante que o caminho comece com /host_root clean_path = filepath if clean_path.startswith("/host_root"): host_path = clean_path else: # Se o usuário mandou /root/vps..., vira /host_root/root/vps... host_path = os.path.join("/host_root", clean_path.lstrip("/")) try: if not os.path.exists("/host_root"): return "ERRO CRÍTICO: O diretório /host_root não existe no container! O mapeamento de volume falhou." if not os.path.exists(host_path): # Tenta listar o diretório pai para ajudar no debug parent = os.path.dirname(host_path) content = os.listdir(parent) if os.path.exists(parent) else "Pai não existe" return f"Erro: {filepath} não encontrado (Caminho real: {host_path}). Conteúdo do pai: {content}" with open(host_path, 'r') as f: return f.read(2000) except Exception as e: return f"Erro ao acessar {filepath}: {e}" def get_docker_stats() -> str: """Retorna o uso de CPU/RAM de todos os containers ativos via comando docker stats.""" return run_bash_command('docker stats --no-stream --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"') # ============================================================ # SISTEMA CRONOS (MEMÓRIA DE LONGO PRAZO) # ============================================================ MEMORY_ROOT = "/root/Antigravity_Memory" def cronos_log(arg: str) -> str: """ Salva memórias. Formato esperado: topic="assunto", content="texto", folder="current_week" Também aceita JSON. """ try: # Tenta como JSON primeiro try: data = json.loads(arg) topic = data.get("topic", "geral") content = data.get("content", "") folder = data.get("folder", "current_week") except: # Parser simples por regex para query="xxx" ou topic="xxx" topic_m = re.search(r'topic=["\'](.*?)["\']', arg) content_m = re.search(r'content=["\'](.*?)["\']', arg, re.S) folder_m = re.search(r'folder=["\'](.*?)["\']', arg) topic = topic_m.group(1) if topic_m else "geral" content = content_m.group(1) if content_m else arg # fallback para arg bruto folder = folder_m.group(1) if folder_m else "current_week" if not content: return "Erro: Conteúdo vazio." target_dir = os.path.join(MEMORY_ROOT, folder) if not os.path.exists(target_dir): os.makedirs(target_dir, exist_ok=True) filename = f"{topic.lower().replace(' ', '_')}.md" filepath = os.path.join(target_dir, filename) timestamp = time.strftime("%Y-%m-%d %H:%M:%S") entry = f"\n---\n### ENTRY: {timestamp}\n{content}\n" with open(filepath, "a" if os.path.exists(filepath) else "w") as f: f.write(entry) return f"Sucesso: Salvo em Cronos/{folder}/{filename}" except Exception as e: return f"Erro ao salvar em Cronos: {e}" def cronos_query(arg: str) -> str: """Busca informações. Formato: query="termo", folder="pasta" """ query_m = re.search(r'query=["\'](.*?)["\']', arg) folder_m = re.search(r'folder=["\'](.*?)["\']', arg) query = query_m.group(1) if query_m else arg folder = folder_m.group(1) if folder_m else "current_week" target_dir = os.path.join(MEMORY_ROOT, folder) return run_bash_command(f"grep -rniI '{query}' {target_dir} | head -n 20") def list_gmail_emails(account: str) -> str: """Lista os últimos 5 e-mails com Título e Remetente. Aceita apelidos: ma, mr, adm, 4r.""" mapping = { "ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr", "adm": "gws-adm", "empresa": "gws-adm", "4r": "gws-4r", "familia": "gws-4r", "fam": "gws-4r" } clean_account = account.strip().lower().replace("gws-", "") account = mapping.get(clean_account, f"gws-{clean_account}" if not clean_account.startswith("gws") else clean_account) # 1. Obtém a lista de IDs list_cmd = f"{account} gmail users messages list --params '{{\"userId\": \"me\", \"maxResults\": 5}}'" res = run_bash_command(list_cmd) try: data = json.loads(res) messages = data.get("messages", []) if not messages: return "Nenhum e-mail encontrado." result_text = "📧 **Últimos E-mails:**\n" for i, msg in enumerate(messages, 1): msg_id = msg["id"] # 2. Busca detalhes de cada e-mail (Metadata apenas para ser rápido) details_cmd = f"{account} gmail users messages get --params '{{\"userId\": \"me\", \"id\": \"{msg_id}\", \"format\": \"metadata\", \"metadataHeaders\": [\"Subject\", \"From\"]}}'" details_res = run_bash_command(details_cmd) try: details = json.loads(details_res) headers = details.get("payload", {}).get("headers", []) subject = next((h["value"] for h in headers if h["name"] == "Subject"), "Sem Assunto") sender = next((h["value"] for h in headers if h["name"] == "From"), "Desconhecido") result_text += f"{i}. **De:** {sender}\n **Assunto:** {subject}\n **ID:** `{msg_id}`\n\n" except: result_text += f"{i}. [Erro ao carregar detalhes do ID: {msg_id}]\n\n" return result_text except Exception as e: return f"Erro ao listar e-mails: {str(e)}\nResposta bruta: {res[:200]}" def gmail_manage_label(arg: str) -> str: """Cria ou busca marcadores (labels). Arg: account name (ex: adm alibaba)""" try: parts = arg.split(maxsplit=1) account = parts[0] label_name = parts[1] if len(parts) > 1 else "" if not label_name: return "Erro: Nome do marcador não fornecido." mapping = {"ma": "gws-mr", "mr": "gws-mr", "adm": "gws-adm", "4r": "gws-4r"} account = mapping.get(account.lower(), account) # Tenta criar o marcador cmd = f"{account} gmail users labels create --json '{{\"name\": \"{label_name}\", \"labelListVisibility\": \"labelShow\", \"messageListVisibility\": \"show\"}}'" return run_bash_command(cmd) except Exception as e: return f"Erro ao gerenciar marcador: {str(e)}" def gmail_manage_filter(arg: str) -> str: """Cria um filtro para e-mails. Arg: account subject_or_from label_name (ex: adm alibaba alibaba)""" try: parts = arg.split(maxsplit=2) if len(parts) < 3: return "Erro: Use 'conta termo marcador'. Ex: adm alibaba alibaba" account, criteria, label_id = parts[0], parts[1], parts[2] mapping = {"ma": "gws-mr", "mr": "gws-mr", "adm": "gws-adm", "4r": "gws-4r"} account = mapping.get(account.lower(), account) # Tenta criar o filtro (exemplo: assunto contém o critério ou de quem vem) # Primeiro, buscamos se o marcador existe para pegar o ID? # Na verdade no Gmail API você pode usar o nome do marcador se for por --json direto. # Mas gws gmail users settings filters create requer um Filter object. filter_obj = { "criteria": {"from": criteria}, # Simplificado: busca pelo remetente "action": {"addLabelIds": [label_id]} } cmd = f"{account} gmail users settings filters create --json '{json.dumps(filter_obj)}'" return run_bash_command(cmd) except Exception as e: return f"Erro ao gerenciar filtro: {str(e)}" def drive_find(arg: str) -> str: """Busca arquivos no Drive por nome. Arg: account query (ex: ma financeiro)""" try: parts = arg.split(maxsplit=1) account = parts[0] query = parts[1] if len(parts) > 1 else "" mapping = {"ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr", "adm": "gws-adm", "4r": "gws-4r", "fam": "gws-4r"} account = mapping.get(account.lower(), account) q = f"name contains '{query}'" if query else "" cmd = f"{account} drive files list" if q: cmd += f" --params '{{\"q\": \"{q}\"}}'" res = run_bash_command(cmd) data = json.loads(res) files = data.get("files", []) if not files: return "Nenhum arquivo encontrado." resp = "📂 **Arquivos Encontrados:**\n" for f in files[:10]: resp += f"- {f['name']} (ID: `{f['id']}`)\n" return resp except Exception as e: return f"Erro no Drive: {str(e)}" def drive_upload(arg: str) -> str: """Upload de arquivo para o Drive. Arg: account filepath (ex: ma /tmp/relat.pdf)""" try: parts = arg.split(maxsplit=1) account, filepath = parts[0], parts[1] mapping = {"ma": "gws-mr", "mr": "gws-mr", "adm": "gws-adm", "4r": "gws-4r"} account = mapping.get(account.lower(), account) filename = os.path.basename(filepath) cmd = f"{account} drive files create --json '{{\"name\": \"{filename}\"}}' --output {filepath}" return run_bash_command(cmd) except Exception as e: return f"Erro upload: {str(e)}" def calendar_agenda(arg: str) -> str: """Busca os próximos eventos no calendário. Arg: account timeframe (timeframe: today, tomorrow, week, days=N)""" try: parts = arg.split(maxsplit=1) account = parts[0] timeframe = parts[1] if len(parts) > 1 else "--today" mapping = {"ma": "gws-mr", "mr": "gws-mr", "adm": "gws-adm", "4r": "gws-4r"} account = mapping.get(account.lower(), account) # Converte timeframe para flag correta if not timeframe.startswith("--"): timeframe = f"--{timeframe}" cmd = f"{account} calendar +agenda {timeframe}" return run_bash_command(cmd) except Exception as e: return f"Erro no Calendário: {str(e)}" # Mapeamento para o Agente entender quais tools ele possui (será usado no loop ReAct) AVAILABLE_TOOLS = { "run_bash_command": { "description": "Executa comandos Linux na VPS. Use para docker, git, mkdir, touch, etc.", "func": run_bash_command }, "list_gmail_emails": { "description": "Lista os 5 e-mails mais recentes de uma conta (ma, adm, 4r) com título e remetente.", "func": list_gmail_emails }, "drive_find": { "description": "Busca arquivos no Drive por nome. Ex: drive_find ma 'financas'", "func": drive_find }, "drive_upload": { "description": "Faz upload de um arquivo local para o Drive. Ex: drive_upload adm /tmp/doc.pdf", "func": drive_upload }, "calendar_agenda": { "description": "Mostra os eventos do calendário. Ex: calendar_agenda ma today / tomorrow / week", "func": calendar_agenda }, "get_system_health": { "description": "Verifica RAM, CPU e Disco globais da VPS.", "func": get_system_health }, "read_vps_file": { "description": "Lê o conteúdo de um arquivo na VPS (logs, configs).", "func": read_vps_file }, "get_docker_stats": { "description": "Retorna uma tabela com o consumo de CPU e Memória de cada container.", "func": get_docker_stats }, "cronos_log": { "description": "Salva memórias importantes (assunto, conteúdo, pasta=current_week|knowledge). Use sempre que algo for concluído.", "func": cronos_log }, "cronos_query": { "description": "Busca informações no histórico da Memória Cronos.", "func": cronos_query }, "gmail_manage_label": { "description": "Cria um novo marcador (Label) no Gmail. Ex: gmail_manage_label adm alibaba", "func": gmail_manage_label }, "gmail_manage_filter": { "description": "Cria um filtro automático no Gmail. Ex: gmail_manage_filter adm @alibaba.com Alibaba", "func": gmail_manage_filter } }