Files
BotVPS/bot_logic.py
Marcos b782b611a2 fix: Save full plan for confirmation instead of just task
- Store complete plan in pending_plans dict
- When user confirms, execute stored plan directly
- Fixes 0/0 steps issue when confirming
2026-03-22 15:38:27 -03:00

299 lines
12 KiB
Python

import os
import requests
import asyncio
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from ai_agent import query_agent
import speech_recognition as sr
from pydub import AudioSegment
from gtts import gTTS
from orchestrator import handle_message, orchestrate, format_confirmation_message, format_completion_message
load_dotenv()
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
ALLOWED_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
ELEVENLABS_KEY = os.getenv("ELEVENLABS_API_KEY")
VOICE_ID = os.getenv("ELEVENLABS_VOICE_ID")
def synthesize_audio(text: str) -> str:
"""Gera áudio local/gratuito usando gTTS e retorna o arquivo."""
try:
# Remove caracteres indesejados e emojis que atrapalham a fala
texto_limpo = text.replace("🤖", "").replace("🧑‍🏫", "").replace("*", "").replace("`", "")
filepath = "/tmp/reply_audio.mp3"
tts = gTTS(text=texto_limpo, lang='pt-br', tld='com.br', slow=False)
tts.save(filepath)
return filepath
except Exception as e:
print(f"Erro ao gerar voz tts: {e}")
return ""
async def auth_check(update: Update) -> bool:
"""Verifica se o usuário que enviou a mensagem é o Marcos (Chat ID autorizado)."""
if str(update.message.chat_id) != ALLOWED_CHAT_ID:
await update.message.reply_text("Acesso negado. Você não tem permissão para controlar esta VPS.")
return False
return True
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await auth_check(update): return
await update.message.reply_text("👋 Olá, Marcos! Antigravity VPS Agent online e pronto para receber comandos.")
# Memória persistente da conversa (em memória RAM)
chat_histories = {}
# Armazena planos pendentes de confirmação
pending_plans = {}
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await auth_check(update): return
chat_id = update.message.chat_id
user_msg = update.message.text
await update.message.reply_chat_action(action="typing")
# =====================================================
# COMANDOS DO ORCHESTRATOR
# =====================================================
if user_msg.startswith('/orchestrate'):
# Força uso do orchestrator
task = user_msg.replace('/orchestrate', '').strip()
result = orchestrate(task, user_confirmed=False)
if result["status"] == "needs_confirmation":
reply = format_confirmation_message(result)
else:
reply = format_completion_message(result)
await update.message.reply_text(reply)
return
if user_msg.startswith('/status') and not user_msg.startswith('/statusall'):
# Status do orchestrator
reply = handle_message('/status')
await update.message.reply_text(reply)
return
if user_msg.startswith('/tools'):
# Lista de ferramentas
reply = handle_message('/tools')
await update.message.reply_text(reply)
return
if user_msg.startswith('/sync'):
# Sync de credenciais
reply = handle_message('/sync')
await update.message.reply_text(reply)
return
if user_msg.lower() in ['sim', 'confirmar', 'confirma', 'sim!', 'confirma!', 's']:
# Verifica se há confirmação pendente
plan_key = f"plan_{chat_id}"
if plan_key in pending_plans:
plan = pending_plans[plan_key]
del pending_plans[plan_key]
# Executa o plano usando execute_plan
from orchestrator import execute_step
results = []
for step in plan.get("steps", []):
result = execute_step(step)
results.append(result)
# Para em erro crítico
if not result.get("success") and step.get("danger") == "dangerous":
results.append({
"success": False,
"output": "Execucao abortada.",
"step": -1
})
break
# Formata resultado
plan_result = {
"status": "completed",
"plan": plan,
"results": results
}
reply = format_completion_message(plan_result)
await update.message.reply_text(reply)
return
if user_msg.lower() in ['nao', 'não', 'cancelar', 'cancela', 'n', 'n!']:
# Cancela confirmação pendente
plan_key = f"plan_{chat_id}"
if plan_key in pending_plans:
del pending_plans[plan_key]
await update.message.reply_text("Operacao cancelada.")
return
await update.message.reply_text("Nada pendente para cancelar.")
return
# =====================================================
# ORCHESTRATOR: Detecta tarefas de orquestração
# =====================================================
orchestrator_keywords = ['deploy', 'restart', 'restartar', 'reiniciar',
'git pull', 'git push', 'docker', 'container',
'mostra status', 'status dos', 'verificar',
'faz um', 'executa', 'roda', 'rodar',
'atualiza', 'atualizar', 'backup']
is_orchestrator_task = any(kw in user_msg.lower() for kw in orchestrator_keywords)
if is_orchestrator_task:
result = orchestrate(user_msg, user_confirmed=False)
if result["status"] == "needs_confirmation":
# Salva plano pendente
plan_key = f"plan_{chat_id}"
pending_plans[plan_key] = result["plan"]
reply = format_confirmation_message(result)
reply += "\n\nResponda *sim* para confirmar ou *nao* para cancelar."
else:
reply = format_completion_message(result)
await update.message.reply_text(reply)
return
# =====================================================
# FALLBACK: Usa o agente normal (ai_agent.py)
# =====================================================
history = chat_histories.get(chat_id, [])
from config import get_config
cfg = get_config()
reply = query_agent(prompt=user_msg, override_provider=cfg.get("active_provider"), chat_history=history)
# Atualiza histórico
history.append({"user": user_msg, "bot": reply})
chat_histories[chat_id] = history[-10:]
# Se o usuário pedir ativamente por áudio no texto
if "áudio" in user_msg.lower() or "audio" in user_msg.lower() or "voz" in user_msg.lower():
await update.message.reply_chat_action(action="record_voice")
audio_path = synthesize_audio(reply)
if audio_path:
await update.message.reply_voice(voice=open(audio_path, 'rb'))
return
# --- NOVO: Lógica para enviar IMAGENS se a IA localizou um arquivo ---
import re
img_matches = re.findall(r'!\[.*?\]\((/.*?)\)', reply)
if not img_matches:
img_matches = re.findall(r'(/[^\s]+?\.(?:jpg|jpeg|png|gif|webp))', reply, re.IGNORECASE)
if img_matches:
for img_path in img_matches:
real_path = img_path
if not real_path.startswith("/host_root") and real_path.startswith("/root"):
real_path = f"/host_root{real_path}"
if os.path.exists(real_path):
try:
await update.message.reply_chat_action(action="upload_photo")
await update.message.reply_photo(photo=open(real_path, 'rb'), caption="Imagem localizada na VPS")
except Exception as e:
print(f"Erro ao enviar imagem {real_path}: {e}")
await update.message.reply_text(reply)
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await auth_check(update): return
await update.message.reply_chat_action(action="record_voice")
# Baixa o aúdio do telegram
voice_file = await update.message.voice.get_file()
ogg_path = "/tmp/voice.ogg"
wav_path = "/tmp/voice.wav"
await voice_file.download_to_drive(ogg_path)
# Converte para WAV (Requer ffmpeg instalado na maquina)
try:
audio = AudioSegment.from_ogg(ogg_path)
audio.export(wav_path, format="wav")
except Exception as e:
await update.message.reply_text(f"Erro ao processar áudio (O ffmpeg está instalado na VPS?): {e}")
return
# Usando SpeechRecognition nativo para transcrever (pode usar Whisper no Ollama depois)
recognizer = sr.Recognizer()
with sr.AudioFile(wav_path) as source:
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_google(audio_data, language="pt-BR")
await update.message.reply_text(f"🗣️ Reconhecido: _{text}_", parse_mode="Markdown")
# Busca histórico anterior
chat_id = update.message.chat_id
history = chat_histories.get(chat_id, [])
# Envia o texto reconhecido para o Agente (respeitando a configuração ativa)
from config import get_config
cfg = get_config()
reply = query_agent(prompt=text, override_provider=cfg.get("active_provider"), chat_history=history)
# Atualiza histórico
history.append({"user": text, "bot": reply})
chat_histories[chat_id] = history[-10:]
# Sintetiza com ElevenLabs e responde com Áudio
audio_path = synthesize_audio(reply)
if audio_path:
await update.message.reply_voice(voice=open(audio_path, 'rb'))
else:
await update.message.reply_text(reply)
except sr.UnknownValueError:
await update.message.reply_text("Não consegui entender o que foi dito no áudio.")
except sr.RequestError as e:
await update.message.reply_text(f"Erro no serviço de STT: {e}")
async def llm_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await auth_check(update): return
args = context.args
from config import get_config, save_config
if not args:
cfg = get_config()
await update.message.reply_text(f"Comando incompleto. Use: /llm gemini ou /llm ollama.\n*Status Atual:* {cfg.get('active_provider').upper()}")
return
new_model = args[0].lower()
if new_model in ["gemini", "ollama"]:
cfg = get_config()
cfg["active_provider"] = new_model
save_config(cfg)
await update.message.reply_text(f"✅ Inteligência Artificial comutada com sucesso para: *{new_model.upper()}*")
else:
await update.message.reply_text("Modelos disponíveis: gemini ou ollama.")
async def clear_history(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await auth_check(update): return
chat_id = update.message.chat_id
if chat_id in chat_histories:
chat_histories[chat_id] = []
await update.message.reply_text("🧹 Memória limpa com sucesso!")
else:
await update.message.reply_text("A memória já está vazia.")
def get_telegram_app():
if not TOKEN:
raise ValueError("TELEGRAM_BOT_TOKEN não encontrado no .env")
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("llm", llm_command))
app.add_handler(CommandHandler("limpar", clear_history))
app.add_handler(CommandHandler("status", lambda u, c: handle_text(u, c))) # Alias para status
app.add_handler(CommandHandler("tools", lambda u, c: handle_text(u, c))) # Alias para tools
app.add_handler(CommandHandler("sync", lambda u, c: handle_text(u, c))) # Alias para sync
app.add_handler(CommandHandler("orchestrate", lambda u, c: handle_text(u, c))) # Orchestrate
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
app.add_handler(MessageHandler(filters.VOICE, handle_voice))
return app
# Para testes rápidos se rodado standalone
if __name__ == "__main__":
print("--- INICIANDO BOT TELEGRAM (POLLING) ---")
print("Limpando Webhooks e mensagens pendentes para evitar CONFLITOS...")
app = get_telegram_app()
# drop_pending_updates=True limpa a fila e desativa webhooks automaticamente
app.run_polling(drop_pending_updates=True)