# ============================================================ # ORCHESTRATOR.PY - Orquestrador de Tarefas (Refatorado) # ============================================================ import json import re import os import asyncio from typing import Dict, List, Optional from llm_providers import ( call_planner_async, call_executor_async, get_planner_llm, get_executor_llm, get_available_models, LLM_PROVIDERS, set_planner, set_executor, get_config, save_config ) from tools_v2 import TOOLS_V2, get_tools_by_danger, get_all_tools_formatted from credential_manager import sync_credentials, get_services_status # ============================================================ # SYSTEM PROMPTS # ============================================================ PLANNER_SYSTEM_PROMPT = """Você é o PLANNER AGENT do BotVPS. Seu trabalho é decompor tarefas em passos executáveis CORRETOS. {CONTEXT_INFO} ### REGRAS CRÍTICAS DE COMANDOS: 1. USE SEMPRE "docker compose" (COM ESPAÇO), NUNCA "docker-compose" (COM HÍFEN) 2. Use "cd /app && git pull" para atualizar 3. Use "cd /app && docker compose up -d --build" para rebuild e deploy ### EXEMPLOS DE COMANDOS CORRETOS: ✅ CORRETO: cd /root/Apps/BotVPS && git log -n 1 ✅ CORRETO: cd /data/repositories/admtracksteel/AdmTrackSteel && git status ### DIRETÓRIOS REAIS (HOST): - BotVPS / Antigravity: `/root/Apps/BotVPS` - AdmTrackSteel: `/data/repositories/admtracksteel/AdmTrackSteel` - Se não tiver certeza de um caminho, use `ls` ou `find` primeiro. NUNCA CHUTE CAMINHOS. ### FORMATO DE RESPOSTA: Responda APENAS com JSON: {{ "task_name": "Nome resumido", "summary": "Resumo do que será feito", "steps": [ {{ "order": 1, "action": "Descrição", "tool": "bash", "command": "COMANDO LINUX COMPLETO", "danger": "safe|medium|dangerous" }} ] }} """ EXECUTOR_SYSTEM_PROMPT = """Você é o EXECUTOR AGENT do BotVPS. Retorne JSON: {"success": true|false, "output": "resultado"} """ # ============================================================ # HELPER FUNCTIONS # ============================================================ def _format_tools_for_prompt() -> str: return "\n".join([f"- {name}: {info['desc']} [{info['danger']}]" for name, info in TOOLS_V2.items()]) async def detect_git_repo_path_async(task: str) -> str: """Detecta automaticamente o caminho do repositório Git (async).""" from tools_v2 import run_bash task_lower = task.lower() # Mapeamento de APPs conhecidos app_map = { "tracksteel": "/data/repositories/admtracksteel/AdmTrackSteel", "botvps": "/root/Apps/BotVPS", "coolify": "/data/coolify/source", "antigravity": "/root/Apps/BotVPS" } for key, path in app_map.items(): if key in task_lower: return path # Busca dinâmica rápida result = run_bash("find /data/repositories -name '.git' -type d -maxdepth 3 | head -1") if result.get("success") and result.get("output"): return result["output"].replace("/.git", "").strip() return "/app" if os.path.exists("/app/.git") else "/" async def _parse_json_response(text: str) -> Optional[Dict]: json_match = re.search(r'\{[\s\S]*\}', text) if json_match: try: return json.loads(json_match.group()) except: pass return None # ============================================================ # CORE AGENTS # ============================================================ async def plan_task_async(task: str) -> Dict: provider, model = get_planner_llm() repo_path = await detect_git_repo_path_async(task) context_info = f"### CONTEXTO: Repo em {repo_path}, Bot em /app" system_prompt = PLANNER_SYSTEM_PROMPT.format( CONTEXT_INFO=context_info, TOOLS_LIST=_format_tools_for_prompt() ) response = await call_planner_async(task, system_prompt) plan = await _parse_json_response(response) if not plan: return { "task_name": "Comando Direto", "summary": f"Executando: {task}", "steps": [{"order": 1, "action": task, "tool": "bash", "command": task, "danger": "medium"}] } return plan async def execute_command_async(command: str) -> Dict: # Moderniza comando se necessário command = command.replace("docker-compose", "docker compose") # Comandos simples: execução direta (segurança e velocidade) if len(command) < 150 and not any(c in command for c in ["|", ">", ">>"]): process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() return { "success": process.returncode == 0, "output": (stdout.decode() or stderr.decode() or "OK").strip() } # Comandos complexos: usa Executor LLM para validar/executar response = await call_executor_async(f"Execute: {command}", EXECUTOR_SYSTEM_PROMPT) return await _parse_json_response(response) or {"success": False, "output": response} async def execute_step_async(step: Dict) -> Dict: tool = step.get("tool", "bash") command = step.get("command", "") if tool in TOOLS_V2: func = TOOLS_V2[tool]["func"] try: # Se for async, await if asyncio.iscoroutinefunction(func): result = await func(command) if command else await func() else: result = func(command) if command else func() return {"success": True, "output": result, "step": step.get("order")} except Exception as e: return {"success": False, "output": str(e), "step": step.get("order")} return await execute_command_async(command) # ============================================================ # MAIN ORCHESTRATION # ============================================================ async def orchestrate_async(task: str, user_confirmed: bool = False) -> Dict: plan = await plan_task_async(task) # Verifica perigo dangerous = [s for s in plan.get("steps", []) if s.get("danger") in ["medium", "dangerous"]] if dangerous and not user_confirmed: return { "status": "needs_confirmation", "plan": plan, "confirmation_needed_for": dangerous } results = [] for step in plan.get("steps", []): res = await execute_step_async(step) results.append(res) if not res["success"] and step.get("danger") == "dangerous": break return {"status": "completed", "plan": plan, "results": results} # --- SYNC WRAPPERS PARA COMPATIBILIDADE --- def orchestrate(task: str, user_confirmed: bool = False) -> Dict: return asyncio.run(orchestrate_async(task, user_confirmed)) def handle_message(text: str, confirmed: bool = False) -> str: # Mantendo lógica de parsing mas chamando orchestrate_async internamente facilitaria # No entanto, para evitar mudanças drásticas agora, faremos o wrapper sync return asyncio.run(handle_message_async(text, confirmed)) async def handle_message_async(text: str, confirmed: bool = False) -> str: # Reimplementação levemente mais limpa text_clean = text.strip().lower() if text_clean == "/status": s = get_orchestrator_status() return f"[BOT] Status: Planner={s['planner']['model']}, Executor={s['executor']['model']}" if text_clean == "/tools": return get_all_tools_formatted() # Orchestration res = await orchestrate_async(text, confirmed) if res["status"] == "needs_confirmation": return format_confirmation_message(res) return format_completion_message(res) def format_confirmation_message(result: Dict) -> str: plan = result["plan"] msg = f"⚠️ **Confirmação Necessária**: {plan['task_name']}\n\n" for s in result["confirmation_needed_for"]: msg += f"• Passo {s['order']}: {s['action']} ({s['danger'].upper()})\n" msg += "\nDigite 'sim' para autorizar." return msg def format_completion_message(result: Dict) -> str: plan = result["plan"] results = result.get("results", []) success = all(r.get("success", False) for r in results) msg = f"{'✅' if success else '❌'} **Concluído**: {plan['task_name']}\n" for r in results: char = "S" if r.get("success") else "F" msg += f"[{char}] Step {r.get('step', '?')}: {str(r.get('output'))[:100]}\n" return msg def get_orchestrator_status() -> Dict: p_p, p_m = get_planner_llm() e_p, e_m = get_executor_llm() return { "planner": {"provider": p_p, "model": p_m}, "executor": {"provider": e_p, "model": e_m}, "tools_count": len(TOOLS_V2) } def get_llm_config() -> Dict: p_p, p_m = get_planner_llm() e_p, e_m = get_executor_llm() return { "planner": {"provider": p_p, "model": p_m, "available": list(LLM_PROVIDERS.keys())}, "executor": {"provider": e_p, "model": e_m, "available": list(LLM_PROVIDERS.keys())} } def set_llm_config(planner_provider=None, planner_model=None, executor_provider=None, executor_model=None): if planner_provider: set_planner(planner_provider, planner_model) if executor_provider: set_executor(executor_provider, executor_model) return {"status": "updated"}