import os import requests import asyncio from dotenv import load_dotenv from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from ai_agent import query_agent import speech_recognition as sr from pydub import AudioSegment from gtts import gTTS load_dotenv() TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") ELEVENLABS_KEY = os.getenv("ELEVENLABS_API_KEY") VOICE_ID = os.getenv("ELEVENLABS_VOICE_ID") def synthesize_audio(text: str) -> str: """Gera áudio local/gratuito usando gTTS e retorna o arquivo.""" try: # Remove caracteres indesejados e emojis que atrapalham a fala texto_limpo = text.replace("🤖", "").replace("🧑‍🏫", "").replace("*", "").replace("`", "") filepath = "/tmp/reply_audio.mp3" tts = gTTS(text=texto_limpo, lang='pt-br', tld='com.br', slow=False) tts.save(filepath) return filepath except Exception as e: print(f"Erro ao gerar voz tts: {e}") return "" async def auth_check(update: Update) -> bool: """Verifica se o usuário que enviou a mensagem é o Marcos (Chat ID autorizado).""" if str(update.message.chat_id) != ALLOWED_CHAT_ID: await update.message.reply_text("Acesso negado. Você não tem permissão para controlar esta VPS.") return False return True async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await auth_check(update): return await update.message.reply_text("👋 Olá, Marcos! Antigravity VPS Agent online e pronto para receber comandos.") # Memória persistente da conversa (em memória RAM) chat_histories = {} async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await auth_check(update): return chat_id = update.message.chat_id user_msg = update.message.text await update.message.reply_chat_action(action="typing") # Busca histórico anterior history = chat_histories.get(chat_id, []) # Aciona o Agente de IA para processar o prompt e executar Tools se precisar from config import get_config cfg = get_config() reply = query_agent(prompt=user_msg, override_provider=cfg.get("active_provider"), chat_history=history) # Atualiza histórico history.append({"user": user_msg, "bot": reply}) chat_histories[chat_id] = history[-10:] # Mantém apenas as últimas 10 # Se o usuário pedir ativamente por áudio no texto if "áudio" in user_msg.lower() or "audio" in user_msg.lower() or "voz" in user_msg.lower(): await update.message.reply_chat_action(action="record_voice") audio_path = synthesize_audio(reply) if audio_path: await update.message.reply_voice(voice=open(audio_path, 'rb')) return # Responde no chat normalmente await update.message.reply_text(reply) async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await auth_check(update): return await update.message.reply_chat_action(action="record_voice") # Baixa o aúdio do telegram voice_file = await update.message.voice.get_file() ogg_path = "/tmp/voice.ogg" wav_path = "/tmp/voice.wav" await voice_file.download_to_drive(ogg_path) # Converte para WAV (Requer ffmpeg instalado na maquina) try: audio = AudioSegment.from_ogg(ogg_path) audio.export(wav_path, format="wav") except Exception as e: await update.message.reply_text(f"Erro ao processar áudio (O ffmpeg está instalado na VPS?): {e}") return # Usando SpeechRecognition nativo para transcrever (pode usar Whisper no Ollama depois) recognizer = sr.Recognizer() with sr.AudioFile(wav_path) as source: audio_data = recognizer.record(source) try: text = recognizer.recognize_google(audio_data, language="pt-BR") await update.message.reply_text(f"🗣️ Reconhecido: _{text}_", parse_mode="Markdown") # Busca histórico anterior chat_id = update.message.chat_id history = chat_histories.get(chat_id, []) # Envia o texto reconhecido para o Agente (respeitando a configuração ativa) from config import get_config cfg = get_config() reply = query_agent(prompt=text, override_provider=cfg.get("active_provider"), chat_history=history) # Atualiza histórico history.append({"user": text, "bot": reply}) chat_histories[chat_id] = history[-10:] # Sintetiza com ElevenLabs e responde com Áudio audio_path = synthesize_audio(reply) if audio_path: await update.message.reply_voice(voice=open(audio_path, 'rb')) else: await update.message.reply_text(reply) except sr.UnknownValueError: await update.message.reply_text("Não consegui entender o que foi dito no áudio.") except sr.RequestError as e: await update.message.reply_text(f"Erro no serviço de STT: {e}") async def llm_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await auth_check(update): return args = context.args from config import get_config, save_config if not args: cfg = get_config() await update.message.reply_text(f"Comando incompleto. Use: /llm gemini ou /llm ollama.\n*Status Atual:* {cfg.get('active_provider').upper()}") return new_model = args[0].lower() if new_model in ["gemini", "ollama"]: cfg = get_config() cfg["active_provider"] = new_model save_config(cfg) await update.message.reply_text(f"✅ Inteligência Artificial comutada com sucesso para: *{new_model.upper()}*") else: await update.message.reply_text("Modelos disponíveis: gemini ou ollama.") async def clear_history(update: Update, context: ContextTypes.DEFAULT_TYPE): if not await auth_check(update): return chat_id = update.message.chat_id if chat_id in chat_histories: chat_histories[chat_id] = [] await update.message.reply_text("🧹 Memória limpa com sucesso!") else: await update.message.reply_text("A memória já está vazia.") def get_telegram_app(): if not TOKEN: raise ValueError("TELEGRAM_BOT_TOKEN não encontrado no .env") app = Application.builder().token(TOKEN).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("llm", llm_command)) app.add_handler(CommandHandler("limpar", clear_history)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)) app.add_handler(MessageHandler(filters.VOICE, handle_voice)) return app # Para testes rápidos se rodado standalone if __name__ == "__main__": print("--- INICIANDO BOT TELEGRAM (POLLING) ---") print("Limpando Webhooks e mensagens pendentes para evitar CONFLITOS...") app = get_telegram_app() # drop_pending_updates=True limpa a fila e desativa webhooks automaticamente app.run_polling(drop_pending_updates=True)