Files
BotVPS/main.py
Marcos d7bc7df32c feat: Auto-sync credentials on startup
- Import credential_manager
- Call sync_credentials() on FastAPI startup event
- Print sync status in logs
2026-03-22 15:36:57 -03:00

341 lines
13 KiB
Python

import os
import psutil
import subprocess
import time
import json
from fastapi import FastAPI, Request, Header, Depends, HTTPException, status, File, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
from starlette.concurrency import run_in_threadpool
import audio_handler
from ai_agent import query_agent
from config import get_config, save_config
from credential_manager import sync_credentials
# Carrega as variáveis do .env
load_dotenv()
app = FastAPI(title="VpsTelegramBot API")
# Configura templates HTML
templates = Jinja2Templates(directory="templates")
# ============================================================
# AUTO-SYNC DE CREDENCIAIS NO STARTUP
# ============================================================
print("[INIT] Sincronizando credenciais...")
sync_result = sync_credentials()
print(f"[INIT] Credenciais sincronizadas: {sync_result['status']}")
print(f"[INIT] Services: {', '.join(sync_result['services'].keys())}")
# ============================================================
# EVENTO DE STARTUP
# ============================================================
@app.on_event("startup")
async def startup_event():
print("[STARTUP] Sincronizando credenciais...")
sync_credentials()
print("[STARTUP] Credenciais sincronizadas com sucesso!")
# --- SEGURANÇA ---
async def verify_password(x_web_password: str = Header(None)):
cfg = get_config()
saved_pwd = cfg.get("web_password", "@@Gi05Br;;")
if not x_web_password or x_web_password != saved_pwd:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Senha Web inválida ou ausente."
)
return True
# --- ROTAS PÚBLICAS ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""Renderiza o Dashboard Web."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
"""Favicon dummy para evitar 404."""
return JSONResponse(content={"status": "ok"})
# --- ROTAS PROTEGIDAS (API) ---
@app.get("/api/login")
async def check_login(is_auth: bool = Depends(verify_password)):
return {"status": "success"}
@app.get("/api/status")
async def get_system_status(is_auth: bool = Depends(verify_password)):
"""Retorna o status do sistema (CPU, RAM, Disco) sem travar o loop."""
def get_stats():
cpu_percent = psutil.cpu_percent(interval=0.5)
vm = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"cpu": cpu_percent,
"ram": {
"total": round(vm.total / (1024**3), 2),
"used": round(vm.used / (1024**3), 2),
"percent": vm.percent
},
"disk": {
"total": round(disk.total / (1024**3), 2),
"used": round(disk.used / (1024**3), 2),
"percent": disk.percent
}
}
data = await run_in_threadpool(get_stats)
return JSONResponse(content=data)
@app.get("/api/config")
async def read_configuration(is_auth: bool = Depends(verify_password)):
return JSONResponse(content=get_config())
@app.post("/api/config")
async def update_configuration(req: dict, is_auth: bool = Depends(verify_password)):
save_config(req)
return JSONResponse(content={"status": "success"})
@app.post("/api/action")
async def execute_smart_action(action: dict, is_auth: bool = Depends(verify_password)):
"""Executa ações predefinidas no servidor (Smart Actions da Web UI)."""
action_type = action.get("type")
if action_type == "ping":
return JSONResponse(content={"status": "success", "message": "Pong! Servidor online e responsivo."})
elif action_type == "restart_bot":
subprocess.Popen("sleep 1 && docker restart vps-ai-agent", shell=True)
return JSONResponse(content={"status": "success", "message": "Reboot do Agente autorizado."})
elif action_type == "clear_cache":
subprocess.Popen("docker system prune -af --volumes", shell=True)
return JSONResponse(content={"status": "success", "message": "Limpando caches obsoletos em background!"})
elif action_type == "reboot_vps":
subprocess.Popen("sleep 2 && docker run --rm --privileged --pid=host alpine nsenter -t 1 -m -u -n -i reboot", shell=True)
return JSONResponse(content={"status": "success", "message": "🚨 O REBOOT CRÍTICO COMEÇOU."})
return JSONResponse(content={"status": "error", "message": "Ação desconhecida."}, status_code=400)
@app.post("/api/chat")
async def web_chat(message: dict, is_auth: bool = Depends(verify_password)):
"""Endpoint para interagir com a IA via Web UI com suporte a histórico."""
user_text = message.get("text", "")
history = message.get("history", []) # Recebe o histórico do navegador
if not user_text:
return JSONResponse(content={"reply": "Por favor, digite um comando válido."})
# Passa o histórico para o query_agent manter o contexto
reply = await run_in_threadpool(query_agent, prompt=user_text, chat_history=history)
return JSONResponse(content={"reply": reply})
@app.post("/api/chat-audio")
async def web_chat_audio(audio: UploadFile = File(...), is_auth: bool = Depends(verify_password)):
"""Recebe áudio, transcreve, processa na IA e devolve texto + áudio de resposta."""
temp_path = f"/tmp/{audio.filename}"
with open(temp_path, "wb") as buffer:
buffer.write(await audio.read())
try:
# Transcrição (STT)
user_text = await run_in_threadpool(audio_handler.transcribe_audio, temp_path)
# IA (Processamento)
reply = await run_in_threadpool(query_agent, prompt=user_text)
# Síntese (TTS)
audio_filename = await run_in_threadpool(audio_handler.text_to_speech, reply)
audio_url = f"/api/audio-file/{audio_filename}"
return JSONResponse(content={
"text": user_text,
"reply": reply,
"audio_url": audio_url
})
except Exception as e:
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@app.get("/api/audio-file/{filename}")
async def get_audio_file(filename: str):
"""Serve os arquivos de áudio temporários gerados pelo TTS."""
filepath = os.path.join("/tmp", filename)
if os.path.exists(filepath):
return FileResponse(filepath, media_type="audio/mpeg")
raise HTTPException(status_code=404, detail="Arquivo de áudio não encontrado.")
@app.get("/api/host_file")
async def get_host_file(path: str, pwd: str = None, x_web_password: str = Header(None)):
"""Serve arquivos (como imagens) da máquina host para exibir no painel de insights."""
# Autenticação dupla: via Header (fetch) ou via Query Parâmetro (tag img)
cfg = get_config()
saved_pwd = cfg.get("web_password", "@@Gi05Br;;")
auth_token = pwd or x_web_password
if not auth_token or auth_token != saved_pwd:
raise HTTPException(status_code=401, detail="Não autorizado")
host_path = f"/host_root{path}" if not path.startswith("/host_root") else path
# Previne directory traversal básico garantindo que comece com /host_root
if not host_path.startswith("/host_root") or ".." in host_path:
raise HTTPException(status_code=400, detail="Caminho inválido.")
if os.path.isfile(host_path):
return FileResponse(host_path)
raise HTTPException(status_code=404, detail="Arquivo não encontrado no host.")
@app.get("/api/test_llm")
async def test_llm_speed(is_auth: bool = Depends(verify_password)):
"""Mede a velocidade de resposta da IA ativa."""
start_time = time.time()
try:
reply = await run_in_threadpool(query_agent, prompt="responda apenas com a palavra 'pong'")
latency = round(time.time() - start_time, 2)
return JSONResponse(content={"status": "success", "latency": latency, "reply": reply})
except Exception as e:
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@app.post("/webhook")
async def telegram_webhook(request: Request):
"""Recebe as atualizações (mensagens) do Telegram."""
update = await request.json()
print("Update recebido do Telegram:", update)
return {"ok": True}
# ============================================================
# NOVOS ENDPOINTS - ORQUESTRADOR
# ============================================================
from orchestrator import (
orchestrate, handle_message, get_orchestrator_status,
get_llm_config, set_llm_config, format_confirmation_message,
format_completion_message
)
from llm_providers import get_available_models
from credential_manager import sync_credentials
@app.post("/api/orchestrate")
async def orchestrate_task(task_data: dict, is_auth: bool = Depends(verify_password)):
"""
Executa tarefa orquestrada.
POST /api/orchestrate
{
"task": "faz deploy do app X",
"confirmed": false
}
Response:
{
"status": "needs_confirmation" | "completed",
"plan": {...},
"confirmation_needed_for": [...],
"message": "..." (para display)
}
"""
task = task_data.get("task", "")
confirmed = task_data.get("confirmed", False)
if not task:
return JSONResponse(content={"status": "error", "message": "Task vazia"}, status_code=400)
result = orchestrate(task, user_confirmed=confirmed)
# Formata mensagem para display
if result["status"] == "needs_confirmation":
message = format_confirmation_message(result)
return JSONResponse(content={
"status": "needs_confirmation",
"plan": result["plan"],
"confirmation_needed_for": result["confirmation_needed_for"],
"message": message
})
return JSONResponse(content={
"status": "completed",
"plan": result["plan"],
"results": result.get("results", []),
"message": format_completion_message(result) if 'format_completion_message' in dir() else "Concluído"
})
@app.get("/api/orchestrator-status")
async def get_orch_status(is_auth: bool = Depends(verify_password)):
"""Retorna status do orquestrador."""
return JSONResponse(content=get_orchestrator_status())
@app.get("/api/llm-config")
async def get_llm_configuration(is_auth: bool = Depends(verify_password)):
"""Retorna configuração atual de LLMs."""
return JSONResponse(content=get_llm_config())
@app.post("/api/llm-config")
async def update_llm_configuration(config_data: dict, is_auth: bool = Depends(verify_password)):
"""Atualiza configuração de LLMs."""
planner_provider = config_data.get("planner_provider") or None
planner_model = config_data.get("planner_model") or None
executor_provider = config_data.get("executor_provider") or None
executor_model = config_data.get("executor_model") or None
changes = set_llm_config(
planner_provider=planner_provider,
planner_model=planner_model,
executor_provider=executor_provider,
executor_model=executor_model
)
return JSONResponse(content={"status": "success", "changes": changes})
@app.get("/api/llm-models")
async def list_llm_models(is_auth: bool = Depends(verify_password)):
"""Lista modelos disponíveis para cada provider."""
models = get_available_models()
return JSONResponse(content={"models": models})
@app.post("/api/sync-credentials")
async def sync_creds(is_auth: bool = Depends(verify_password)):
"""Força sincronização de credenciais."""
result = sync_credentials()
return JSONResponse(content=result)
@app.get("/api/tools")
async def list_tools(is_auth: bool = Depends(verify_password)):
"""Lista todas as ferramentas disponíveis."""
from tools_v2 import get_tools_by_danger
return JSONResponse(content={
"tools": {
"safe": get_tools_by_danger("safe"),
"medium": get_tools_by_danger("medium"),
"dangerous": get_tools_by_danger("dangerous")
}
})
@app.post("/api/handle-message")
async def handle_web_message(message: dict, is_auth: bool = Depends(verify_password)):
"""
Manipula mensagem do usuário (alternativa ao chat normal).
Suporta confirmação de ações perigosas.
POST /api/handle-message
{
"text": "faz deploy do app",
"confirmed": false
}
"""
text = message.get("text", "")
confirmed = message.get("confirmed", False)
if not text:
return JSONResponse(content={"reply": "Mensagem vazia"})
reply = await run_in_threadpool(handle_message, text=text, confirmed=confirmed)
return JSONResponse(content={"reply": reply})
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)