Files
BotVPS/main.py

193 lines
7.6 KiB
Python

import os
import psutil
import subprocess
import time
import json
from fastapi import FastAPI, Request, Header, Depends, HTTPException, status, File, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
from starlette.concurrency import run_in_threadpool
import audio_handler
from ai_agent import query_agent
from config import get_config, save_config
# Carrega as variáveis do .env
load_dotenv()
app = FastAPI(title="VpsTelegramBot API")
# Configura templates HTML
templates = Jinja2Templates(directory="templates")
# --- SEGURANÇA ---
async def verify_password(x_web_password: str = Header(None)):
cfg = get_config()
saved_pwd = cfg.get("web_password", "@@Gi05Br;;")
if not x_web_password or x_web_password != saved_pwd:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Senha Web inválida ou ausente."
)
return True
# --- ROTAS PÚBLICAS ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""Renderiza o Dashboard Web."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
"""Favicon dummy para evitar 404."""
return JSONResponse(content={"status": "ok"})
# --- ROTAS PROTEGIDAS (API) ---
@app.get("/api/login")
async def check_login(is_auth: bool = Depends(verify_password)):
return {"status": "success"}
@app.get("/api/status")
async def get_system_status(is_auth: bool = Depends(verify_password)):
"""Retorna o status do sistema (CPU, RAM, Disco) sem travar o loop."""
def get_stats():
cpu_percent = psutil.cpu_percent(interval=0.1)
vm = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"cpu": cpu_percent,
"ram": {
"total": round(vm.total / (1024**3), 2),
"used": round(vm.used / (1024**3), 2),
"percent": vm.percent
},
"disk": {
"total": round(disk.total / (1024**3), 2),
"used": round(disk.used / (1024**3), 2),
"percent": disk.percent
}
}
data = await run_in_threadpool(get_stats)
return JSONResponse(content=data)
@app.get("/api/config")
async def read_configuration(is_auth: bool = Depends(verify_password)):
return JSONResponse(content=get_config())
@app.post("/api/config")
async def update_configuration(req: dict, is_auth: bool = Depends(verify_password)):
save_config(req)
return JSONResponse(content={"status": "success"})
@app.post("/api/action")
async def execute_smart_action(action: dict, is_auth: bool = Depends(verify_password)):
"""Executa ações predefinidas no servidor (Smart Actions da Web UI)."""
action_type = action.get("type")
if action_type == "ping":
return JSONResponse(content={"status": "success", "message": "Pong! Servidor online e responsivo."})
elif action_type == "restart_bot":
subprocess.Popen("sleep 1 && docker restart vps-ai-agent", shell=True)
return JSONResponse(content={"status": "success", "message": "Reboot do Agente autorizado."})
elif action_type == "clear_cache":
subprocess.Popen("docker system prune -af --volumes", shell=True)
return JSONResponse(content={"status": "success", "message": "Limpando caches obsoletos em background!"})
elif action_type == "reboot_vps":
subprocess.Popen("sleep 2 && docker run --rm --privileged --pid=host alpine nsenter -t 1 -m -u -n -i reboot", shell=True)
return JSONResponse(content={"status": "success", "message": "🚨 O REBOOT CRÍTICO COMEÇOU."})
return JSONResponse(content={"status": "error", "message": "Ação desconhecida."}, status_code=400)
@app.post("/api/chat")
async def web_chat(message: dict, is_auth: bool = Depends(verify_password)):
"""Endpoint para interagir com a IA via Web UI."""
user_text = message.get("text", "")
if not user_text:
return JSONResponse(content={"reply": "Por favor, digite um comando válido."})
reply = await run_in_threadpool(query_agent, prompt=user_text)
return JSONResponse(content={"reply": reply})
@app.post("/api/chat-audio")
async def web_chat_audio(audio: UploadFile = File(...), is_auth: bool = Depends(verify_password)):
"""Recebe áudio, transcreve, processa na IA e devolve texto + áudio de resposta."""
temp_path = f"/tmp/{audio.filename}"
with open(temp_path, "wb") as buffer:
buffer.write(await audio.read())
try:
# Transcrição (STT)
user_text = await run_in_threadpool(audio_handler.transcribe_audio, temp_path)
# IA (Processamento)
reply = await run_in_threadpool(query_agent, prompt=user_text)
# Síntese (TTS)
audio_filename = await run_in_threadpool(audio_handler.text_to_speech, reply)
audio_url = f"/api/audio-file/{audio_filename}"
return JSONResponse(content={
"text": user_text,
"reply": reply,
"audio_url": audio_url
})
except Exception as e:
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@app.get("/api/audio-file/{filename}")
async def get_audio_file(filename: str):
"""Serve os arquivos de áudio temporários gerados pelo TTS."""
filepath = os.path.join("/tmp", filename)
if os.path.exists(filepath):
return FileResponse(filepath, media_type="audio/mpeg")
raise HTTPException(status_code=404, detail="Arquivo de áudio não encontrado.")
@app.get("/api/host_file")
async def get_host_file(path: str, pwd: str = None, x_web_password: str = Header(None)):
"""Serve arquivos (como imagens) da máquina host para exibir no painel de insights."""
# Autenticação dupla: via Header (fetch) ou via Query Parâmetro (tag img)
cfg = get_config()
saved_pwd = cfg.get("web_password", "@@Gi05Br;;")
auth_token = pwd or x_web_password
if not auth_token or auth_token != saved_pwd:
raise HTTPException(status_code=401, detail="Não autorizado")
host_path = f"/host_root{path}" if not path.startswith("/host_root") else path
# Previne directory traversal básico garantindo que comece com /host_root
if not host_path.startswith("/host_root") or ".." in host_path:
raise HTTPException(status_code=400, detail="Caminho inválido.")
if os.path.isfile(host_path):
return FileResponse(host_path)
raise HTTPException(status_code=404, detail="Arquivo não encontrado no host.")
@app.get("/api/test_llm")
async def test_llm_speed(is_auth: bool = Depends(verify_password)):
"""Mede a velocidade de resposta da IA ativa."""
start_time = time.time()
try:
reply = await run_in_threadpool(query_agent, prompt="responda apenas com a palavra 'pong'")
latency = round(time.time() - start_time, 2)
return JSONResponse(content={"status": "success", "latency": latency, "reply": reply})
except Exception as e:
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@app.post("/webhook")
async def telegram_webhook(request: Request):
"""Recebe as atualizações (mensagens) do Telegram."""
update = await request.json()
print("Update recebido do Telegram:", update)
return {"ok": True}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)