import os import asyncio import re import logging from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from orchestrator import ( handle_message_async, orchestrate_async, format_confirmation_message, format_completion_message, execute_step_async ) from ai_agent import query_agent import speech_recognition as sr from pydub import AudioSegment from gtts import gTTS # Configuração de logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") def synthesize_audio(text: str) -> str: try: texto_limpo = re.sub(r'[*`#]', '', text) filepath = "/tmp/reply_audio.mp3" tts = gTTS(text=texto_limpo[:500], lang='pt-br', slow=False) tts.save(filepath) return filepath except Exception as e: print(f"TTS Error: {e}") return "" async def auth_check(update: Update) -> bool: if not update.message or str(update.message.chat_id) != ALLOWED_CHAT_ID: if update.message: await update.message.reply_text("Acesso negado.") return False return True async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await auth_check(update): return user_msg = update.message.text chat_id = update.message.chat_id await update.message.reply_chat_action(action="typing") # 1. COMANDOS DIRETOS if user_msg.startswith('/') and user_msg.split()[0] in ['/status', '/tools', '/sync']: reply = await handle_message_async(user_msg) await update.message.reply_text(reply) return # 2. CONFIRMAÇÃO DE PLANO if user_msg.lower() in ['sim', 's', 'confirmar'] and 'pending_plan' in context.bot_data: plan = context.bot_data.pop('pending_plan') results = [] for step in plan.get("steps", []): res = await execute_step_async(step) results.append(res) if not res["success"] and step.get("danger") == "dangerous": break reply = format_completion_message({"plan": plan, "results": results}) await update.message.reply_text(reply) return # 3. ORCHESTRATOR OU AI AGENT orchestrator_keywords = ['deploy', 'restart', 'git', 'docker', 'atualiza', 'status'] is_task = any(kw in user_msg.lower() for kw in orchestrator_keywords) or user_msg.startswith('/orchestrate') if is_task: task = user_msg.replace('/orchestrate', '').strip() result = await orchestrate_async(task) if result["status"] == "needs_confirmation": context.bot_data['pending_plan'] = result["plan"] reply = format_confirmation_message(result) else: reply = format_completion_message(result) await update.message.reply_text(reply) else: # Fallback AI Agent from config import get_config from ai_agent import query_agent_async cfg = get_config() reply = await query_agent_async(user_msg, override_provider=cfg.get("active_provider")) await update.message.reply_text(reply) async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await auth_check(update): return await update.message.reply_text("Processando aúdio...") await update.message.reply_text("Comando de voz recebido (STT não configurado neste passo).") def get_telegram_app(): if not TOKEN: print("AVISO: TELEGRAM_BOT_TOKEN não encontrado.") app = Application.builder().token(TOKEN).build() app.add_handler(CommandHandler("start", lambda u, c: u.message.reply_text("Online!"))) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)) app.add_handler(MessageHandler(filters.COMMAND, handle_text)) app.add_handler(MessageHandler(filters.VOICE, handle_voice)) return app if __name__ == "__main__": logger.info("Bot começando a inicialização...") try: app = get_telegram_app() logger.info("Bot online. Iniciando polling...") app.run_polling(drop_pending_updates=True) except Exception as e: logger.error(f"Erro fatal ao iniciar bot: {e}")