Files
BotVPS/ai_agent.py

127 lines
5.9 KiB
Python

import os
import re
import httpx
import asyncio
import json
from tools import AVAILABLE_TOOLS as TOOLS_LEGACY
from tools_v2 import TOOLS_V2 as TOOLS_NEW
from llm_providers import call_llm, get_available_models, get_planner_llm
from config import get_config
async def get_llm_response_async(prompt: str, provider: str, cfg: dict) -> str:
"""Invoca o provedor de LLM centralizado em llm_providers."""
# Define modelo padrão dependendo do provider
if provider == "openrouter":
model = cfg.get("model") or "qwen/qwen-2.5-72b-instruct"
elif provider == "ollama":
model = os.getenv("OLLAMA_MODEL", "llama3.2:1b")
else:
model = cfg.get("model") or "qwen/qwen-2.5-72b-instruct"
return await call_llm(provider, model, prompt)
def query_agent(prompt: str, override_provider=None, chat_history=None) -> str:
"""Wrapper síncrono para query_agent_async."""
return asyncio.run(query_agent_async(prompt, override_provider, chat_history))
async def query_agent_async(prompt: str, override_provider=None, chat_history=None) -> str:
cfg = get_config()
provider = override_provider or cfg.get("active_provider", "openrouter")
# Unifica ferramentas legadas e novas
ALL_TOOLS = {**TOOLS_LEGACY, **TOOLS_NEW}
tools_desc = "\n".join([f"- {k}: {v.get('description') or v.get('desc')}" for k, v in ALL_TOOLS.items()])
# Identifica o modelo para o prompt do sistema
current_model = cfg.get("model") or "qwen/qwen-2.5-72b-instruct"
system_prompt = f"""Antigravity (VPS Marcos). Mestre em Linux/GWS.
Use `[TOOL:nome] arg [/TOOL]` ou `[TOOL:run] cmd [/TOOL]`.
Contas GWS: `gws-mr` (Marcos), `gws-adm` (Empresa), `gws-4r` (Familiar).
Regras: Foco no pedido ATUAL. NUNCA use tags <REFINED>.
Ferramentas:
{tools_desc}
Resposta: Sempre inicie a conclusão com `RESUMO:`. """
history_str = ""
if chat_history:
for m in chat_history[-5:]:
history_str += f"\nUsuário: {m['user']}\nAgente: {m['bot']}\n"
history_str += f"\nUsuário: {prompt}\n"
current_history = history_str
max_iterations = 6
total_in = 0
total_out = 0
final_model = current_model
for i in range(max_iterations):
print(f"[AGENT] Iteração {i+1} - Enviando para {provider} (modelo padrão)...")
try:
res_dict = await call_llm(provider, current_model, system_prompt + current_history)
# Lógica de FALLBACK: Se o Qwen falhar ou retornar erro de API, tenta o Ling-2.6-flash
if res_dict["content"].startswith("Erro OpenRouter") and provider == "openrouter" and current_model == "qwen/qwen-2.5-72b-instruct":
backup_model = "inclusionai/ling-2.6-flash:free"
print(f"⚠️ [FALLBACK CHAT] Falha no Qwen. Tentando {backup_model}...")
res_dict = await call_llm("openrouter", backup_model, system_prompt + current_history)
except Exception as e:
if provider == "openrouter" and current_model == "qwen/qwen-2.5-72b-instruct":
backup_model = "inclusionai/ling-2.6-flash:free"
print(f"⚠️ [FALLBACK CHAT] Exceção no Qwen ({str(e)}). Tentando {backup_model}...")
res_dict = await call_llm("openrouter", backup_model, system_prompt + current_history)
else:
return f"Erro Crítico no Agente: {str(e)}"
response = res_dict["content"]
usage = res_dict.get("usage", {})
total_in += usage.get("prompt_tokens", 0)
total_out += usage.get("completion_tokens", 0)
final_model = res_dict.get("model", final_model)
print(f"[LLM RESPONSE]: {response}")
# Regex mais flexível: tenta casar [TOOL:nome] e extrair o conteúdo até [/TOOL] ou final da string
match = re.search(r"(?:\[?TOOL:([\w_]+)\]?|\[TOOL:([\w_]+)\])", response, re.I)
if match:
t_name = (match.group(1) or match.group(2)).strip().lower()
if t_name == "run": t_name = "run_bash_command"
content_after = response[match.end():]
end_tag = re.search(r"\[/TOOL\]", content_after, re.I)
arg = content_after[:end_tag.start()].strip() if end_tag else content_after.strip()
all_tools = {**TOOLS_LEGACY, **TOOLS_NEW}
if t_name in all_tools:
tool_info = all_tools[t_name]
func = tool_info["func"]
print(f"[AGENT] Executando {t_name} com argumento: {arg[:50]}...")
if asyncio.iscoroutinefunction(func):
obs = await func(arg) if arg else await func()
else:
obs = func(arg) if arg else func()
if isinstance(obs, dict):
obs = obs.get("output") or obs.get("message") or str(obs)
print(f"[TOOL:{t_name}] Observation: {str(obs)[:100]}...")
if len(str(obs)) > 3000:
obs = str(obs)[:3000] + "... [TRUNCATED]"
current_history += f"\nAgente: {response}\nSISTEMA ({t_name}): {obs}\n"
else:
print(f"[AGENT] Erro: Ferramenta '{t_name}' não encontrada.")
current_history += f"\nAgente: {response}\nSISTEMA: Erro: Ferramenta '{t_name}' inexistente no sistema.\n"
else:
# Terminou o pensamento. Adiciona rodapé de tokens.
footer = f"\n\n---\n⚙️ **Modelo:** `{final_model}`\n📊 **Tokens:** `{total_in} IN` / `{total_out} OUT`"
if "RESUMO:" in response:
return response + footer
return response + footer
# Ao atingir o limite, tenta ao menos limpar a resposta final
final_reply = response if 'response' in locals() else 'Nenhuma'
footer = f"\n\n---\n⚠️ *Limite de iterações atingido*\n⚙️ **Modelo:** `{final_model}`\n📊 **Tokens:** `{total_in} IN` / `{total_out} OUT`"
return f"RESUMO: {final_reply}" + footer