🚀 Auto-deploy: BotVPS atualizado em 01/05/2026 21:16:46
This commit is contained in:
377
tools.py
377
tools.py
@@ -1,354 +1,25 @@
|
||||
import subprocess
|
||||
import os
|
||||
import psutil
|
||||
import time
|
||||
import re
|
||||
import json
|
||||
"""
|
||||
tools.py - Stub de compatibilidade.
|
||||
Todas as funções foram movidas para core_tools.py.
|
||||
Manter este arquivo para não quebrar imports existentes.
|
||||
"""
|
||||
from core_tools import (
|
||||
AVAILABLE_TOOLS,
|
||||
run_bash_command,
|
||||
get_docker_stats,
|
||||
read_vps_file,
|
||||
get_system_health,
|
||||
cronos_log,
|
||||
cronos_query,
|
||||
list_gmail_emails,
|
||||
gmail_manage_label,
|
||||
gmail_manage_filter,
|
||||
drive_find,
|
||||
drive_upload,
|
||||
calendar_agenda,
|
||||
resolve_account,
|
||||
ACCOUNT_MAPPING,
|
||||
)
|
||||
|
||||
def run_bash_command(command: str) -> str:
|
||||
"""Executa um comando bash na VPS e retorna a saída."""
|
||||
try:
|
||||
# Garante caminhos comuns no PATH para execução via PM2/Containers
|
||||
custom_env = os.environ.copy()
|
||||
paths = ["/usr/local/bin", "/root/.cargo/bin", "/usr/bin", "/bin"]
|
||||
current_path = custom_env.get("PATH", "")
|
||||
for p in paths:
|
||||
if p not in current_path:
|
||||
current_path = f"{p}:{current_path}"
|
||||
custom_env["PATH"] = current_path
|
||||
|
||||
# Executa comando de forma segura dentro da VPS
|
||||
result = subprocess.run(
|
||||
command,
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
env=custom_env
|
||||
)
|
||||
|
||||
output = result.stdout.strip()
|
||||
error = result.stderr.strip()
|
||||
|
||||
# Se encontrou algo no stdout, retornamos o que achou mesmo com erro
|
||||
if output:
|
||||
return output
|
||||
|
||||
if result.returncode != 0:
|
||||
if result.returncode == 127:
|
||||
return f"ERRO (127): Comando não encontrado. Verifique se o alias ou binário está no PATH. (Comando: {command})"
|
||||
return f"ERRO ({result.returncode}): {error if error else 'Nada no stderr'}"
|
||||
|
||||
return "Sucesso (vazio)"
|
||||
except subprocess.TimeoutExpired:
|
||||
return "ERRO: O comando demorou muito e foi cancelado (timeout)."
|
||||
except Exception as e:
|
||||
return f"ERRO fatal ao rodar bash: {str(e)}"
|
||||
|
||||
def get_system_health(*args) -> str:
|
||||
"""Retorna um texto detalhado da saúde do servidor para a IA."""
|
||||
cpu = psutil.cpu_percent(interval=0.1)
|
||||
vm = psutil.virtual_memory()
|
||||
disk = psutil.disk_usage('/')
|
||||
|
||||
# Uptime aproximado usando psutil
|
||||
uptime_seconds = time.time() - psutil.boot_time()
|
||||
uptime_hours = round(uptime_seconds / 3600, 1)
|
||||
|
||||
return (f"CPU: {cpu}% | "
|
||||
f"RAM: {round(vm.used / (1024**3), 2)}GB usada / {round(vm.total / (1024**3), 2)}GB total ({vm.percent}%) | "
|
||||
f"Disco: {disk.percent}% usado | "
|
||||
f"Uptime: {uptime_hours}h")
|
||||
|
||||
def read_vps_file(filepath: str) -> str:
|
||||
"""Lê um arquivo do sistema de arquivos da VPS através do mapeamento /host_root."""
|
||||
# Garante que o caminho comece com /host_root
|
||||
clean_path = filepath
|
||||
if clean_path.startswith("/host_root"):
|
||||
host_path = clean_path
|
||||
else:
|
||||
# Se o usuário mandou /root/vps..., vira /host_root/root/vps...
|
||||
host_path = os.path.join("/host_root", clean_path.lstrip("/"))
|
||||
|
||||
try:
|
||||
if not os.path.exists("/host_root"):
|
||||
return "ERRO CRÍTICO: O diretório /host_root não existe no container! O mapeamento de volume falhou."
|
||||
|
||||
if not os.path.exists(host_path):
|
||||
# Tenta listar o diretório pai para ajudar no debug
|
||||
parent = os.path.dirname(host_path)
|
||||
content = os.listdir(parent) if os.path.exists(parent) else "Pai não existe"
|
||||
return f"Erro: {filepath} não encontrado (Caminho real: {host_path}). Conteúdo do pai: {content}"
|
||||
|
||||
with open(host_path, 'r') as f:
|
||||
return f.read(2000)
|
||||
except Exception as e:
|
||||
return f"Erro ao acessar {filepath}: {e}"
|
||||
|
||||
def get_docker_stats() -> str:
|
||||
"""Retorna o uso de CPU/RAM de todos os containers ativos via comando docker stats."""
|
||||
return run_bash_command('docker stats --no-stream --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"')
|
||||
|
||||
# ============================================================
|
||||
# SISTEMA CRONOS (MEMÓRIA DE LONGO PRAZO)
|
||||
# ============================================================
|
||||
|
||||
MEMORY_ROOT = "/root/Antigravity_Memory"
|
||||
|
||||
def cronos_log(arg: str) -> str:
|
||||
"""
|
||||
Salva memórias. Formato esperado: topic="assunto", content="texto", folder="current_week"
|
||||
Também aceita JSON.
|
||||
"""
|
||||
try:
|
||||
# Tenta como JSON primeiro
|
||||
try:
|
||||
data = json.loads(arg)
|
||||
topic = data.get("topic", "geral")
|
||||
content = data.get("content", "")
|
||||
folder = data.get("folder", "current_week")
|
||||
except:
|
||||
# Parser simples por regex para query="xxx" ou topic="xxx"
|
||||
topic_m = re.search(r'topic=["\'](.*?)["\']', arg)
|
||||
content_m = re.search(r'content=["\'](.*?)["\']', arg, re.S)
|
||||
folder_m = re.search(r'folder=["\'](.*?)["\']', arg)
|
||||
|
||||
topic = topic_m.group(1) if topic_m else "geral"
|
||||
content = content_m.group(1) if content_m else arg # fallback para arg bruto
|
||||
folder = folder_m.group(1) if folder_m else "current_week"
|
||||
|
||||
if not content: return "Erro: Conteúdo vazio."
|
||||
|
||||
target_dir = os.path.join(MEMORY_ROOT, folder)
|
||||
if not os.path.exists(target_dir):
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
|
||||
filename = f"{topic.lower().replace(' ', '_')}.md"
|
||||
filepath = os.path.join(target_dir, filename)
|
||||
|
||||
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
entry = f"\n---\n### ENTRY: {timestamp}\n{content}\n"
|
||||
|
||||
with open(filepath, "a" if os.path.exists(filepath) else "w") as f:
|
||||
f.write(entry)
|
||||
|
||||
return f"Sucesso: Salvo em Cronos/{folder}/{filename}"
|
||||
except Exception as e:
|
||||
return f"Erro ao salvar em Cronos: {e}"
|
||||
|
||||
def cronos_query(arg: str) -> str:
|
||||
"""Busca informações. Formato: query="termo", folder="pasta" """
|
||||
query_m = re.search(r'query=["\'](.*?)["\']', arg)
|
||||
folder_m = re.search(r'folder=["\'](.*?)["\']', arg)
|
||||
|
||||
query = query_m.group(1) if query_m else arg
|
||||
folder = folder_m.group(1) if folder_m else "current_week"
|
||||
|
||||
target_dir = os.path.join(MEMORY_ROOT, folder)
|
||||
return run_bash_command(f"grep -rniI '{query}' {target_dir} | head -n 20")
|
||||
|
||||
# ============================================================
|
||||
# GOOGLE WORKSPACE TOOLS
|
||||
# ============================================================
|
||||
|
||||
ACCOUNT_MAPPING = {
|
||||
"ma": "gws-mr", "mr": "gws-mr", "marcos": "gws-mr",
|
||||
"adm": "gws-adm", "empresa": "gws-adm",
|
||||
"4r": "gws-4r", "familia": "gws-4r", "fam": "gws-4r"
|
||||
}
|
||||
|
||||
def resolve_account(account_alias: str) -> str:
|
||||
clean = account_alias.strip().lower()
|
||||
return ACCOUNT_MAPPING.get(clean, f"gws-{clean}")
|
||||
|
||||
def list_gmail_emails(account_alias: str) -> str:
|
||||
"""Lista os últimos 5 e-mails com Título e Remetente. Aceita apelidos: ma, mr, adm, 4r."""
|
||||
account = resolve_account(account_alias)
|
||||
list_cmd = f"{account} gmail users messages list --params '{{\"userId\": \"me\", \"maxResults\": 5}}'"
|
||||
res = run_bash_command(list_cmd)
|
||||
|
||||
try:
|
||||
# Limpeza de JSON (remove lixo do keyring/stderr no stdout)
|
||||
json_match = re.search(r"(\{.*\})", res, re.S)
|
||||
res_clean = json_match.group(1) if json_match else res
|
||||
data = json.loads(res_clean)
|
||||
|
||||
if "error" in data:
|
||||
err_msg = data["error"].get("message", str(data["error"]))
|
||||
return f"❌ Erro de Autenticação GWS ({account}): {err_msg}. Você pode precisar reautenticar o CLI na VPS."
|
||||
|
||||
messages = data.get("messages", [])
|
||||
if not messages: return "📭 Nenhum e-mail encontrado na caixa de entrada."
|
||||
|
||||
result_text = "📧 **Últimos E-mails:**\n"
|
||||
for i, msg in enumerate(messages, 1):
|
||||
msg_id = msg["id"]
|
||||
details_cmd = f"{account} gmail users messages get --params '{{\"userId\": \"me\", \"id\": \"{msg_id}\", \"format\": \"metadata\", \"metadataHeaders\": [\"Subject\", \"From\"]}}'"
|
||||
details_res = run_bash_command(details_cmd)
|
||||
try:
|
||||
# Limpeza de JSON também nos detalhes
|
||||
dm = re.search(r"(\{.*\})", details_res, re.S)
|
||||
dc = dm.group(1) if dm else details_res
|
||||
details = json.loads(dc)
|
||||
|
||||
headers = details.get("payload", {}).get("headers", [])
|
||||
subject = next((h["value"] for h in headers if h["name"] == "Subject"), "Sem Assunto")
|
||||
sender = next((h["value"] for h in headers if h["name"] == "From"), "Desconhecido")
|
||||
result_text += f"{i}. **De:** {sender}\n **Assunto:** {subject}\n **ID:** `{msg_id}`\n\n"
|
||||
except:
|
||||
result_text += f"{i}. [Erro ao carregar detalhes do ID: {msg_id}]\n\n"
|
||||
return result_text
|
||||
except Exception as e:
|
||||
return f"Erro ao listar e-mails: {str(e)}\nResposta bruta: {res[:200]}"
|
||||
|
||||
def gmail_manage_label(arg: str) -> str:
|
||||
"""Cria ou busca marcadores (labels). Arg: account name (ex: adm alibaba)"""
|
||||
try:
|
||||
parts = arg.split(maxsplit=1)
|
||||
account_alias = parts[0]
|
||||
label_name = parts[1].strip() if len(parts) > 1 else ""
|
||||
if not label_name: return "Erro: Nome do marcador não fornecido."
|
||||
account = resolve_account(account_alias)
|
||||
|
||||
# 1. Verifica se já existe
|
||||
list_res = run_bash_command(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'")
|
||||
try:
|
||||
# Extrai apenas o JSON se houver lixo no stdout (ex: Using keyring...)
|
||||
json_match = re.search(r"(\{.*\})", list_res, re.S)
|
||||
list_res_clean = json_match.group(1) if json_match else list_res
|
||||
labels_data = json.loads(list_res_clean)
|
||||
for l in labels_data.get("labels", []):
|
||||
if l["name"].lower() == label_name.lower():
|
||||
return f"Marcador '{l['name']}' já existe (ID: {l['id']})."
|
||||
except: pass
|
||||
|
||||
# 2. Tenta criar
|
||||
cmd = f"{account} gmail users labels create --params '{{\"userId\": \"me\"}}' --json '{{\"name\": \"{label_name}\", \"labelListVisibility\": \"labelShow\", \"messageListVisibility\": \"show\"}}'"
|
||||
res = run_bash_command(cmd)
|
||||
return f"Criação de '{label_name}': {res}"
|
||||
except Exception as e: return f"Erro marcador: {str(e)}"
|
||||
|
||||
def gmail_manage_filter(arg: str) -> str:
|
||||
"""Cria um filtro para e-mails. Arg: account criteria label_name (ex: adm alibaba alibaba)"""
|
||||
try:
|
||||
parts = arg.split(maxsplit=2)
|
||||
if len(parts) < 3: return "Erro: Use 'conta criteria marcador'. Ex: adm alibaba alibaba"
|
||||
account_alias, criteria, label_name = parts[0], parts[1], parts[2]
|
||||
account = resolve_account(account_alias)
|
||||
|
||||
# Busca o ID do marcador pelo nome
|
||||
label_id = label_name
|
||||
list_res = run_bash_command(f"{account} gmail users labels list --params '{{\"userId\": \"me\"}}'")
|
||||
try:
|
||||
json_match = re.search(r"(\{.*\})", list_res, re.S)
|
||||
list_res_clean = json_match.group(1) if json_match else list_res
|
||||
labels_data = json.loads(list_res_clean)
|
||||
for l in labels_data.get("labels", []):
|
||||
if l["name"].lower() == label_name.lower():
|
||||
label_id = l["id"]
|
||||
break
|
||||
except: pass
|
||||
|
||||
criteria_obj = {"from": criteria} if "@" in criteria else {"query": criteria}
|
||||
filter_obj = {
|
||||
"criteria": criteria_obj,
|
||||
"action": {"addLabelIds": [label_id]}
|
||||
}
|
||||
cmd = f"{account} gmail users settings filters create --params '{{\"userId\": \"me\"}}' --json '{json.dumps(filter_obj)}'"
|
||||
return run_bash_command(cmd)
|
||||
except Exception as e: return f"Erro filtro: {str(e)}"
|
||||
|
||||
def drive_find(arg: str) -> str:
|
||||
"""Busca arquivos no Drive por nome. Arg: account query (ex: ma financeiro)"""
|
||||
try:
|
||||
parts = arg.split(maxsplit=1)
|
||||
account_alias = parts[0]
|
||||
query = parts[1] if len(parts) > 1 else ""
|
||||
account = resolve_account(account_alias)
|
||||
q = f"name contains '{query}'" if query else ""
|
||||
cmd = f"{account} drive files list"
|
||||
if q: cmd += f" --params '{{\"q\": \"{q}\"}}'"
|
||||
res = run_bash_command(cmd)
|
||||
data = json.loads(res)
|
||||
files = data.get("files", [])
|
||||
if not files: return "Nenhum arquivo encontrado."
|
||||
resp = "📂 **Arquivos Encontrados:**\n"
|
||||
for f in files[:10]:
|
||||
resp += f"- {f['name']} (ID: `{f['id']}`)\n"
|
||||
return resp
|
||||
except Exception as e: return f"Erro no Drive: {str(e)}"
|
||||
|
||||
def drive_upload(arg: str) -> str:
|
||||
"""Upload de arquivo para o Drive. Arg: account filepath (ex: ma /tmp/relat.pdf)"""
|
||||
try:
|
||||
parts = arg.split(maxsplit=1)
|
||||
account_alias, filepath = parts[0], parts[1]
|
||||
account = resolve_account(account_alias)
|
||||
filename = os.path.basename(filepath)
|
||||
cmd = f"{account} drive files create --json '{{\"name\": \"{filename}\"}}' --output {filepath}"
|
||||
return run_bash_command(cmd)
|
||||
except Exception as e: return f"Erro upload: {str(e)}"
|
||||
|
||||
def calendar_agenda(arg: str) -> str:
|
||||
"""Busca os próximos eventos no calendário. Arg: account timeframe (timeframe: today, tomorrow, week, days=N)"""
|
||||
try:
|
||||
parts = arg.split(maxsplit=1)
|
||||
account_alias = parts[0]
|
||||
timeframe = parts[1] if len(parts) > 1 else "--today"
|
||||
account = resolve_account(account_alias)
|
||||
if not timeframe.startswith("--"): timeframe = f"--{timeframe}"
|
||||
cmd = f"{account} calendar +agenda {timeframe}"
|
||||
return run_bash_command(cmd)
|
||||
except Exception as e: return f"Erro no Calendário: {str(e)}"
|
||||
|
||||
# Mapeamento para o Agente entender quais tools ele possui
|
||||
AVAILABLE_TOOLS = {
|
||||
"run_bash_command": {
|
||||
"description": "Executa comandos Linux na VPS. Use para docker, git, mkdir, touch, etc.",
|
||||
"func": run_bash_command
|
||||
},
|
||||
"list_gmail_emails": {
|
||||
"description": "Lista os 5 e-mails mais recentes de uma conta (ma, adm, 4r) com título e remetente.",
|
||||
"func": list_gmail_emails
|
||||
},
|
||||
"drive_find": {
|
||||
"description": "Busca arquivos no Drive por nome. Ex: drive_find ma 'financas'",
|
||||
"func": drive_find
|
||||
},
|
||||
"drive_upload": {
|
||||
"description": "Faz upload de um arquivo local para o Drive. Ex: drive_upload adm /tmp/doc.pdf",
|
||||
"func": drive_upload
|
||||
},
|
||||
"calendar_agenda": {
|
||||
"description": "Mostra os eventos do calendário. Ex: calendar_agenda ma today / tomorrow / week",
|
||||
"func": calendar_agenda
|
||||
},
|
||||
"get_system_health": {
|
||||
"description": "Verifica RAM, CPU e Disco globais da VPS.",
|
||||
"func": get_system_health
|
||||
},
|
||||
"read_vps_file": {
|
||||
"description": "Lê o conteúdo de um arquivo na VPS (logs, configs).",
|
||||
"func": read_vps_file
|
||||
},
|
||||
"get_docker_stats": {
|
||||
"description": "Retorna uma tabela com o consumo de CPU e Memória de cada container.",
|
||||
"func": get_docker_stats
|
||||
},
|
||||
"cronos_log": {
|
||||
"description": "Salva memórias importantes (assunto, conteúdo, pasta=current_week|knowledge). Use sempre que algo for concluído.",
|
||||
"func": cronos_log
|
||||
},
|
||||
"cronos_query": {
|
||||
"description": "Busca informações no histórico da Memória Cronos.",
|
||||
"func": cronos_query
|
||||
},
|
||||
"gmail_manage_label": {
|
||||
"description": "Cria um novo marcador (Label) no Gmail. Ex: gmail_manage_label adm alibaba",
|
||||
"func": gmail_manage_label
|
||||
},
|
||||
"gmail_manage_filter": {
|
||||
"description": "Cria um filtro automático no Gmail. Ex: gmail_manage_filter adm @alibaba.com Alibaba",
|
||||
"func": gmail_manage_filter
|
||||
}
|
||||
}
|
||||
# Mantém AVAILABLE_TOOLS com mesmo nome do arquivo antigo
|
||||
AVAILABLE_TOOLS = AVAILABLE_TOOLS
|
||||
|
||||
Reference in New Issue
Block a user