import os import psutil import subprocess import time import json import asyncio from fastapi import FastAPI, Request, Header, Depends, HTTPException, status, UploadFile, File from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.templating import Jinja2Templates from dotenv import load_dotenv from starlette.concurrency import run_in_threadpool import shutil import uuid from ai_agent import query_agent_async from audio_handler import transcribe_audio, text_to_speech_async from config import get_config, save_config from credential_manager import fetch_from_gitea_repo_async from orchestrator import ( orchestrate_async, handle_message_async, get_orchestrator_status, get_llm_config, set_llm_config, format_confirmation_message, format_completion_message ) from llm_providers import get_available_models load_dotenv() app = FastAPI(title="BotVPS API") templates = Jinja2Templates(directory="templates") # ============================================================ # STARTUP # ============================================================ @app.on_event("startup") async def startup_event(): print("[INIT] Sincronizando credenciais...") await fetch_from_gitea_repo_async(force=True) # --- SEGURANÇA --- async def verify_password(x_web_password: str = Header(None)): # Autenticação desativada conforme solicitado return True @app.get("/api/login") async def login_bypass(): return {"status": "ok", "message": "Autenticação desativada"} # --- WEB UI --- @app.get("/", response_class=FileResponse) async def read_root(request: Request): return FileResponse("templates/index.html") @app.get("/api/status") async def get_system_status(is_auth: bool = Depends(verify_password)): vm = psutil.virtual_memory() return { "cpu": psutil.cpu_percent(), "ram": { "percent": vm.percent, "used": round(vm.used / (1024**3), 2), "total": round(vm.total / (1024**3), 2) }, "disk": {"percent": psutil.disk_usage('/').percent} } # --- CONFIGURAÇÃO GERAL --- @app.get("/api/config") async def read_config(is_auth: bool = Depends(verify_password)): return get_config() @app.post("/api/config") async def update_config(cfg: dict, is_auth: bool = Depends(verify_password)): save_config(cfg) return {"status": "success"} # --- CONFIGURAÇÃO LLM (ORQUESTRADOR) --- @app.get("/api/llm-config") async def read_llm_config(is_auth: bool = Depends(verify_password)): return get_llm_config() @app.post("/api/llm-config") async def update_llm_config(cfg: dict, is_auth: bool = Depends(verify_password)): set_llm_config( planner_provider=cfg.get("planner_provider"), planner_model=cfg.get("planner_model"), executor_provider=cfg.get("executor_provider"), executor_model=cfg.get("executor_model") ) return {"status": "success"} @app.get("/api/llm-models") async def list_models(is_auth: bool = Depends(verify_password)): return {"models": await get_available_models()} # --- SYNC & ACTIONS --- @app.post("/api/sync-credentials") async def sync_creds(is_auth: bool = Depends(verify_password)): from credential_manager import sync_credentials return sync_credentials() @app.post("/api/sync-from-repo") async def sync_from_repo(is_auth: bool = Depends(verify_password)): await fetch_from_gitea_repo_async(force=True) return {"status": "synced"} @app.post("/api/action") async def run_action(data: dict, is_auth: bool = Depends(verify_password)): action_type = data.get("type") if action_type == "ping": return {"status": "success", "message": "Pong! Servidor respondendo."} if action_type == "restart_bot": # Simula reinício disparando sinal de término - o docker restart cuidará do resto os.system("pkill -9 -f bot_logic.py") return {"status": "success", "message": "Bot reiniciado."} if action_type == "reboot_vps": return {"status": "error", "message": "Reboot bloqueado via Web por segurança."} return {"status": "error", "message": f"Ação {action_type} desconhecida."} @app.get("/api/test_llm") async def test_llm_latency(is_auth: bool = Depends(verify_password)): t0 = time.time() try: reply = await query_agent_async("responda apenas 'pong'") latency = round(time.time() - t0, 2) return {"status": "success", "latency": latency, "reply": reply} except Exception as e: return {"status": "error", "message": str(e)} # --- CHAT & ORCHESTRATION --- @app.post("/api/chat") async def web_chat(message: dict, is_auth: bool = Depends(verify_password)): user_text = message.get("text", "") history = message.get("history", []) # Extrai o histórico do frontend if not user_text: return {"reply": "Vazio."} # Repassa o histórico para manter o contexto da conversa reply = await query_agent_async(user_text, chat_history=history) return {"reply": reply} @app.post("/api/chat-audio") async def web_chat_audio(audio: UploadFile = File(...), is_auth: bool = Depends(verify_password)): # 1. Salva o áudio vindo do navegador (/tmp) temp_in = f"/tmp/{uuid.uuid4().hex}_{audio.filename}" with open(temp_in, "wb") as buffer: shutil.copyfileobj(audio.file, buffer) try: # 2. Transcreve (STT) text = transcribe_audio(temp_in) if not text: return {"reply": "Não entendi seu áudio.", "text": ""} # 3. Processa na IA reply = await query_agent_async(text) # 4. Gera áudio da resposta (TTS) # Se houver , usa apenas ele para o áudio. Caso contrário, usa tudo. refined_match = re.search(r'(.*?)', reply, flags=re.DOTALL) audio_text = refined_match.group(1).strip() if refined_match else reply filename = await text_to_speech_async(audio_text) return { "text": text, "reply": reply, "audio_url": f"/api/audio/{filename}" } except Exception as e: return {"reply": f"Erro Áudio: {str(e)}", "text": "Erro."} finally: if os.path.exists(temp_in): os.remove(temp_in) @app.get("/api/audio/{filename}") async def get_audio_file(filename: str): path = os.path.join("/tmp", filename) if os.path.exists(path): return FileResponse(path, media_type="audio/mpeg") return JSONResponse({"error": "File not found"}, status_code=404) @app.post("/api/orchestrate") async def orchestrate_task(task_data: dict, is_auth: bool = Depends(verify_password)): task = task_data.get("task", "") confirmed = task_data.get("confirmed", False) result = await orchestrate_async(task, user_confirmed=confirmed) if result["status"] == "needs_confirmation": return { "status": "needs_confirmation", "plan": result["plan"], "message": format_confirmation_message(result) } return { "status": "completed", "results": result.get("results", []), "message": format_completion_message(result) } @app.get("/api/orchestrator-status") async def get_orch_status(is_auth: bool = Depends(verify_password)): return get_orchestrator_status() # --- SERVER --- if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 8001)) uvicorn.run("main:app", host="0.0.0.0", port=port, reload=True)