Files
BotVPS/bot_logic.py

148 lines
5.9 KiB
Python

import os
import asyncio
import re
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from orchestrator import (
handle_message_async, orchestrate_async, format_confirmation_message,
format_completion_message, execute_step_async
)
from ai_agent import query_agent_async
from audio_handler import transcribe_audio, text_to_speech_async
from config import get_config
# Configuração de logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
ALLOWED_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
async def synthesize_audio(text: str) -> str:
"""Wrapper assíncrono para a síntese de voz centralizada."""
try:
filename = await text_to_speech_async(text)
return os.path.join("/tmp", filename)
except Exception as e:
logger.error(f"TTS Error: {e}")
return ""
async def auth_check(update: Update) -> bool:
if not update.message or str(update.message.chat_id) != ALLOWED_CHAT_ID:
if update.message: await update.message.reply_text("Acesso negado.")
return False
return True
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await auth_check(update): return
await update.message.reply_chat_action(action="typing")
await process_logic(update, context, update.message.text)
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await auth_check(update): return
await update.message.reply_chat_action(action="record_voice")
# Baixa o arquivo de voz do Telegram
voice_file = await context.bot.get_file(update.message.voice.file_id)
ogg_path = f"/tmp/{update.message.voice.file_id}.ogg"
await voice_file.download_to_drive(ogg_path)
try:
# Transcreve (STT)
text = transcribe_audio(ogg_path)
if not text:
await update.message.reply_text("Não consegui entender o áudio.")
return
await update.message.reply_text(f"🎤 Entendi: \"{text}\"")
# Processa como se fosse texto (reutiliza handle_text logic)
# Gambiarra rápida: cria um objeto Update fake ou chama a lógica diretamente
# Melhor: extrair a lógica de handle_text para uma função pura
await process_logic(update, context, text, is_voice=True)
except Exception as e:
logger.error(f"Erro Voz: {e}")
await update.message.reply_text(f"Erro ao processar voz: {e}")
finally:
if os.path.exists(ogg_path): os.remove(ogg_path)
async def process_logic(update: Update, context: ContextTypes.DEFAULT_TYPE, user_msg: str, is_voice: bool = False):
chat_id = update.message.chat_id
# 1. COMANDOS DIRETOS
if user_msg.startswith('/') and user_msg.split()[0] in ['/status', '/tools', '/sync']:
reply = await handle_message_async(user_msg)
await update.message.reply_text(reply)
return
# 2. CONFIRMAÇÃO DE PLANO
if user_msg.lower() in ['sim', 's', 'confirmar'] and 'pending_plan' in context.bot_data:
plan = context.bot_data.pop('pending_plan')
results = []
for step in plan.get("steps", []):
res = await execute_step_async(step)
results.append(res)
if not res["success"] and step.get("danger") == "dangerous": break
reply = format_completion_message({"plan": plan, "results": results})
await update.message.reply_text(reply)
return
# 3. ORCHESTRATOR OU AI AGENT
orchestrator_keywords = ['deploy', 'restart', 'git', 'docker', 'atualiza', 'status']
is_task = any(kw in user_msg.lower() for kw in orchestrator_keywords) or user_msg.startswith('/orchestrate')
if is_task:
task = user_msg.replace('/orchestrate', '').strip()
result = await orchestrate_async(task)
if result["status"] == "needs_confirmation":
context.bot_data['pending_plan'] = result["plan"]
reply = format_confirmation_message(result)
else:
reply = format_completion_message(result)
else:
# Fallback AI Agent
cfg = get_config()
# No Telegram, ainda não estamos mantendo histórico complexo no bot_data (pode ser futuro)
reply = await query_agent_async(user_msg, override_provider=cfg.get("active_provider"))
# Normaliza a resposta: remove tags legadas <REFINED> ou [REFINED] e garante o prefixo RESUMO:
reply_clean = re.sub(r'[<\[]/?REFINED[>\]]', '', reply, flags=re.IGNORECASE).strip()
# Se a resposta não tiver "RESUMO:" mas tiver conteúdo, o orquestrador/IA já deve ter colocado.
# Se por algum motivo não houver, mantemos o texto limpo.
await update.message.reply_text(reply_clean)
# Se foi por voz, responde por voz também
if is_voice:
audio_path = await synthesize_audio(reply_clean)
if audio_path and os.path.exists(audio_path):
with open(audio_path, 'rb') as voice_file:
await update.message.reply_voice(voice_file)
os.remove(audio_path)
def get_telegram_app():
if not TOKEN:
print("AVISO: TELEGRAM_BOT_TOKEN não encontrado.")
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler("start", lambda u, c: u.message.reply_text("Online!")))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
app.add_handler(MessageHandler(filters.COMMAND, handle_text))
app.add_handler(MessageHandler(filters.VOICE, handle_voice))
return app
if __name__ == "__main__":
logger.info("Bot começando a inicialização...")
try:
app = get_telegram_app()
logger.info("Bot online. Iniciando polling...")
app.run_polling(drop_pending_updates=True)
except Exception as e:
logger.error(f"Erro fatal ao iniciar bot: {e}")