257 lines
9.3 KiB
Python
257 lines
9.3 KiB
Python
# ============================================================
|
|
# ORCHESTRATOR.PY - Orquestrador de Tarefas (Refatorado)
|
|
# ============================================================
|
|
|
|
import json
|
|
import re
|
|
import os
|
|
import asyncio
|
|
from typing import Dict, List, Optional
|
|
from llm_providers import (
|
|
call_planner_async, call_executor_async, get_planner_llm, get_executor_llm,
|
|
get_available_models, LLM_PROVIDERS, set_planner, set_executor, get_config, save_config
|
|
)
|
|
from tools_v2 import TOOLS_V2, get_tools_by_danger, get_all_tools_formatted
|
|
from credential_manager import sync_credentials, get_services_status
|
|
|
|
# ============================================================
|
|
# SYSTEM PROMPTS
|
|
# ============================================================
|
|
|
|
PLANNER_SYSTEM_PROMPT = """Você é o PLANNER AGENT do BotVPS.
|
|
Seu trabalho é decompor tarefas em passos executáveis CORRETOS.
|
|
|
|
{CONTEXT_INFO}
|
|
|
|
### REGRAS CRÍTICAS DE COMANDOS:
|
|
1. USE SEMPRE "docker compose" (COM ESPAÇO), NUNCA "docker-compose" (COM HÍFEN)
|
|
2. Use "cd /app && git pull" para atualizar
|
|
3. Use "cd /app && docker compose up -d --build" para rebuild e deploy
|
|
|
|
### EXEMPLOS DE COMANDOS CORRETOS:
|
|
✅ CORRETO: cd /root/Apps/BotVPS && git log -n 1
|
|
✅ CORRETO: cd /data/repositories/admtracksteel/AdmTrackSteel && git status
|
|
|
|
### DIRETÓRIOS REAIS (HOST):
|
|
- BotVPS / Antigravity: `/root/Apps/BotVPS`
|
|
- AdmTrackSteel: `/data/repositories/admtracksteel/AdmTrackSteel`
|
|
- Se não tiver certeza de um caminho, use `ls` ou `find` primeiro. NUNCA CHUTE CAMINHOS.
|
|
|
|
### FORMATO DE RESPOSTA:
|
|
Responda APENAS com JSON:
|
|
{{
|
|
"task_name": "Nome resumido",
|
|
"summary": "Resumo do que será feito",
|
|
"steps": [
|
|
{{
|
|
"order": 1,
|
|
"action": "Descrição",
|
|
"tool": "bash",
|
|
"command": "COMANDO LINUX COMPLETO",
|
|
"danger": "safe|medium|dangerous"
|
|
}}
|
|
]
|
|
}}
|
|
"""
|
|
|
|
EXECUTOR_SYSTEM_PROMPT = """Você é o EXECUTOR AGENT do BotVPS.
|
|
Retorne JSON: {"success": true|false, "output": "resultado"}
|
|
"""
|
|
|
|
# ============================================================
|
|
# HELPER FUNCTIONS
|
|
# ============================================================
|
|
|
|
def _format_tools_for_prompt() -> str:
|
|
return "\n".join([f"- {name}: {info['desc']} [{info['danger']}]" for name, info in TOOLS_V2.items()])
|
|
|
|
async def detect_git_repo_path_async(task: str) -> str:
|
|
"""Detecta automaticamente o caminho do repositório Git (async)."""
|
|
from tools_v2 import run_bash
|
|
task_lower = task.lower()
|
|
|
|
# Mapeamento de APPs conhecidos
|
|
app_map = {
|
|
"tracksteel": "/data/repositories/admtracksteel/AdmTrackSteel",
|
|
"botvps": "/root/Apps/BotVPS",
|
|
"coolify": "/data/coolify/source",
|
|
"antigravity": "/root/Apps/BotVPS"
|
|
}
|
|
|
|
for key, path in app_map.items():
|
|
if key in task_lower:
|
|
return path
|
|
|
|
# Busca dinâmica rápida
|
|
result = run_bash("find /data/repositories -name '.git' -type d -maxdepth 3 | head -1")
|
|
if result.get("success") and result.get("output"):
|
|
return result["output"].replace("/.git", "").strip()
|
|
|
|
return "/app" if os.path.exists("/app/.git") else "/"
|
|
|
|
async def _parse_json_response(text: str) -> Optional[Dict]:
|
|
json_match = re.search(r'\{[\s\S]*\}', text)
|
|
if json_match:
|
|
try:
|
|
return json.loads(json_match.group())
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
# ============================================================
|
|
# CORE AGENTS
|
|
# ============================================================
|
|
|
|
async def plan_task_async(task: str) -> Dict:
|
|
provider, model = get_planner_llm()
|
|
repo_path = await detect_git_repo_path_async(task)
|
|
|
|
context_info = f"### CONTEXTO: Repo em {repo_path}, Bot em /app"
|
|
system_prompt = PLANNER_SYSTEM_PROMPT.format(
|
|
CONTEXT_INFO=context_info,
|
|
TOOLS_LIST=_format_tools_for_prompt()
|
|
)
|
|
|
|
response = await call_planner_async(task, system_prompt)
|
|
plan = await _parse_json_response(response)
|
|
|
|
if not plan:
|
|
return {
|
|
"task_name": "Comando Direto",
|
|
"summary": f"Executando: {task}",
|
|
"steps": [{"order": 1, "action": task, "tool": "bash", "command": task, "danger": "medium"}]
|
|
}
|
|
return plan
|
|
|
|
async def execute_command_async(command: str) -> Dict:
|
|
# Moderniza comando se necessário
|
|
command = command.replace("docker-compose", "docker compose")
|
|
|
|
# Comandos simples: execução direta (segurança e velocidade)
|
|
if len(command) < 150 and not any(c in command for c in ["|", ">", ">>"]):
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
return {
|
|
"success": process.returncode == 0,
|
|
"output": (stdout.decode() or stderr.decode() or "OK").strip()
|
|
}
|
|
|
|
# Comandos complexos: usa Executor LLM para validar/executar
|
|
response = await call_executor_async(f"Execute: {command}", EXECUTOR_SYSTEM_PROMPT)
|
|
return await _parse_json_response(response) or {"success": False, "output": response}
|
|
|
|
async def execute_step_async(step: Dict) -> Dict:
|
|
tool = step.get("tool", "bash")
|
|
command = step.get("command", "")
|
|
|
|
if tool in TOOLS_V2:
|
|
func = TOOLS_V2[tool]["func"]
|
|
try:
|
|
# Se for async, await
|
|
if asyncio.iscoroutinefunction(func):
|
|
result = await func(command) if command else await func()
|
|
else:
|
|
result = func(command) if command else func()
|
|
return {"success": True, "output": result, "step": step.get("order")}
|
|
except Exception as e:
|
|
return {"success": False, "output": str(e), "step": step.get("order")}
|
|
|
|
return await execute_command_async(command)
|
|
|
|
# ============================================================
|
|
# MAIN ORCHESTRATION
|
|
# ============================================================
|
|
|
|
async def orchestrate_async(task: str, user_confirmed: bool = False) -> Dict:
|
|
plan = await plan_task_async(task)
|
|
|
|
# Verifica perigo
|
|
dangerous = [s for s in plan.get("steps", []) if s.get("danger") in ["medium", "dangerous"]]
|
|
|
|
if dangerous and not user_confirmed:
|
|
return {
|
|
"status": "needs_confirmation",
|
|
"plan": plan,
|
|
"confirmation_needed_for": dangerous
|
|
}
|
|
|
|
results = []
|
|
for step in plan.get("steps", []):
|
|
res = await execute_step_async(step)
|
|
results.append(res)
|
|
if not res["success"] and step.get("danger") == "dangerous":
|
|
break
|
|
|
|
return {"status": "completed", "plan": plan, "results": results}
|
|
|
|
# --- SYNC WRAPPERS PARA COMPATIBILIDADE ---
|
|
def orchestrate(task: str, user_confirmed: bool = False) -> Dict:
|
|
return asyncio.run(orchestrate_async(task, user_confirmed))
|
|
|
|
def handle_message(text: str, confirmed: bool = False) -> str:
|
|
# Mantendo lógica de parsing mas chamando orchestrate_async internamente facilitaria
|
|
# No entanto, para evitar mudanças drásticas agora, faremos o wrapper sync
|
|
return asyncio.run(handle_message_async(text, confirmed))
|
|
|
|
async def handle_message_async(text: str, confirmed: bool = False) -> str:
|
|
# Reimplementação levemente mais limpa
|
|
text_clean = text.strip().lower()
|
|
|
|
if text_clean == "/status":
|
|
s = get_orchestrator_status()
|
|
return f"[BOT] Status: Planner={s['planner']['model']}, Executor={s['executor']['model']}"
|
|
|
|
if text_clean == "/tools":
|
|
return get_all_tools_formatted()
|
|
|
|
# Orchestration
|
|
res = await orchestrate_async(text, confirmed)
|
|
if res["status"] == "needs_confirmation":
|
|
return format_confirmation_message(res)
|
|
return format_completion_message(res)
|
|
|
|
def format_confirmation_message(result: Dict) -> str:
|
|
plan = result["plan"]
|
|
msg = f"⚠️ **Confirmação Necessária**: {plan['task_name']}\n\n"
|
|
for s in result["confirmation_needed_for"]:
|
|
msg += f"• Passo {s['order']}: {s['action']} ({s['danger'].upper()})\n"
|
|
msg += "\nDigite 'sim' para autorizar."
|
|
return msg
|
|
|
|
def format_completion_message(result: Dict) -> str:
|
|
plan = result["plan"]
|
|
results = result.get("results", [])
|
|
success = all(r.get("success", False) for r in results)
|
|
|
|
msg = f"{'✅' if success else '❌'} **Concluído**: {plan['task_name']}\n"
|
|
for r in results:
|
|
char = "S" if r.get("success") else "F"
|
|
msg += f"[{char}] Step {r.get('step', '?')}: {str(r.get('output'))[:100]}\n"
|
|
return msg
|
|
|
|
def get_orchestrator_status() -> Dict:
|
|
p_p, p_m = get_planner_llm()
|
|
e_p, e_m = get_executor_llm()
|
|
return {
|
|
"planner": {"provider": p_p, "model": p_m},
|
|
"executor": {"provider": e_p, "model": e_m},
|
|
"tools_count": len(TOOLS_V2)
|
|
}
|
|
|
|
def get_llm_config() -> Dict:
|
|
p_p, p_m = get_planner_llm()
|
|
e_p, e_m = get_executor_llm()
|
|
return {
|
|
"planner": {"provider": p_p, "model": p_m, "available": list(LLM_PROVIDERS.keys())},
|
|
"executor": {"provider": e_p, "model": e_m, "available": list(LLM_PROVIDERS.keys())}
|
|
}
|
|
|
|
def set_llm_config(planner_provider=None, planner_model=None, executor_provider=None, executor_model=None):
|
|
if planner_provider: set_planner(planner_provider, planner_model)
|
|
if executor_provider: set_executor(executor_provider, executor_model)
|
|
return {"status": "updated"}
|