import os import psutil import subprocess import time import json from fastapi import FastAPI, Request, Header, Depends, HTTPException, status, File, UploadFile from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.templating import Jinja2Templates from dotenv import load_dotenv from starlette.concurrency import run_in_threadpool import audio_handler from ai_agent import query_agent from config import get_config, save_config # Carrega as variáveis do .env load_dotenv() app = FastAPI(title="VpsTelegramBot API") # Configura templates HTML templates = Jinja2Templates(directory="templates") # --- SEGURANÇA --- async def verify_password(x_web_password: str = Header(None)): cfg = get_config() saved_pwd = cfg.get("web_password", "@@Gi05Br;;") if not x_web_password or x_web_password != saved_pwd: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Senha Web inválida ou ausente." ) return True # --- ROTAS PÚBLICAS --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): """Renderiza o Dashboard Web.""" return templates.TemplateResponse("index.html", {"request": request}) @app.get("/favicon.ico", include_in_schema=False) async def favicon(): """Favicon dummy para evitar 404.""" return JSONResponse(content={"status": "ok"}) # --- ROTAS PROTEGIDAS (API) --- @app.get("/api/login") async def check_login(is_auth: bool = Depends(verify_password)): return {"status": "success"} @app.get("/api/status") async def get_system_status(is_auth: bool = Depends(verify_password)): """Retorna o status do sistema (CPU, RAM, Disco) sem travar o loop.""" def get_stats(): cpu_percent = psutil.cpu_percent(interval=0.1) vm = psutil.virtual_memory() disk = psutil.disk_usage('/') return { "cpu": cpu_percent, "ram": { "total": round(vm.total / (1024**3), 2), "used": round(vm.used / (1024**3), 2), "percent": vm.percent }, "disk": { "total": round(disk.total / (1024**3), 2), "used": round(disk.used / (1024**3), 2), "percent": disk.percent } } data = await run_in_threadpool(get_stats) return JSONResponse(content=data) @app.get("/api/config") async def read_configuration(is_auth: bool = Depends(verify_password)): return JSONResponse(content=get_config()) @app.post("/api/config") async def update_configuration(req: dict, is_auth: bool = Depends(verify_password)): save_config(req) return JSONResponse(content={"status": "success"}) @app.post("/api/action") async def execute_smart_action(action: dict, is_auth: bool = Depends(verify_password)): """Executa ações predefinidas no servidor (Smart Actions da Web UI).""" action_type = action.get("type") if action_type == "ping": return JSONResponse(content={"status": "success", "message": "Pong! Servidor online e responsivo."}) elif action_type == "restart_bot": subprocess.Popen("sleep 1 && docker restart vps-ai-agent", shell=True) return JSONResponse(content={"status": "success", "message": "Reboot do Agente autorizado."}) elif action_type == "clear_cache": subprocess.Popen("docker system prune -af --volumes", shell=True) return JSONResponse(content={"status": "success", "message": "Limpando caches obsoletos em background!"}) elif action_type == "reboot_vps": subprocess.Popen("sleep 2 && docker run --rm --privileged --pid=host alpine nsenter -t 1 -m -u -n -i reboot", shell=True) return JSONResponse(content={"status": "success", "message": "🚨 O REBOOT CRÍTICO COMEÇOU."}) return JSONResponse(content={"status": "error", "message": "Ação desconhecida."}, status_code=400) @app.post("/api/chat") async def web_chat(message: dict, is_auth: bool = Depends(verify_password)): """Endpoint para interagir com a IA via Web UI.""" user_text = message.get("text", "") if not user_text: return JSONResponse(content={"reply": "Por favor, digite um comando válido."}) reply = await run_in_threadpool(query_agent, prompt=user_text) return JSONResponse(content={"reply": reply}) @app.post("/api/chat-audio") async def web_chat_audio(audio: UploadFile = File(...), is_auth: bool = Depends(verify_password)): """Recebe áudio, transcreve, processa na IA e devolve texto + áudio de resposta.""" temp_path = f"/tmp/{audio.filename}" with open(temp_path, "wb") as buffer: buffer.write(await audio.read()) try: # Transcrição (STT) user_text = await run_in_threadpool(audio_handler.transcribe_audio, temp_path) # IA (Processamento) reply = await run_in_threadpool(query_agent, prompt=user_text) # Síntese (TTS) audio_filename = await run_in_threadpool(audio_handler.text_to_speech, reply) audio_url = f"/api/audio-file/{audio_filename}" return JSONResponse(content={ "text": user_text, "reply": reply, "audio_url": audio_url }) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) finally: if os.path.exists(temp_path): os.remove(temp_path) @app.get("/api/audio-file/{filename}") async def get_audio_file(filename: str): """Serve os arquivos de áudio temporários gerados pelo TTS.""" filepath = os.path.join("/tmp", filename) if os.path.exists(filepath): return FileResponse(filepath, media_type="audio/mpeg") raise HTTPException(status_code=404, detail="Arquivo de áudio não encontrado.") @app.get("/api/test_llm") async def test_llm_speed(is_auth: bool = Depends(verify_password)): """Mede a velocidade de resposta da IA ativa.""" start_time = time.time() try: reply = await run_in_threadpool(query_agent, prompt="responda apenas com a palavra 'pong'") latency = round(time.time() - start_time, 2) return JSONResponse(content={"status": "success", "latency": latency, "reply": reply}) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) @app.post("/webhook") async def telegram_webhook(request: Request): """Recebe as atualizações (mensagens) do Telegram.""" update = await request.json() print("Update recebido do Telegram:", update) return {"ok": True} if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)