import os import logging import httpx import asyncio from telegram import Update from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CommandHandler, filters from dotenv import load_dotenv # Carrega as variáveis do arquivo .env load_dotenv() # Configurações obtidas do .env TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_USER_ID = os.getenv("TELEGRAM_CHAT_ID") # Sincroniza com a PORTA definida no .env (Dica: .env diz 8000) API_PORT = os.getenv("PORT", "8001") API_BASE_URL = f"http://localhost:{API_PORT}" # Timeout aumentado para 300s para permitir que o Agente execute múltiplas ferramentas GLOBAL_TIMEOUT = 300.0 # O ID permitido deve ser comparado como string ou int, padronizando aqui if ALLOWED_USER_ID: ALLOWED_USER_ID = int(ALLOWED_USER_ID) # Configuração de Logs logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) import uuid import time import asyncio from session_manager import get_history, add_message, clear_history as sm_clear_history, set_orchestrator_pending, get_orchestrator_pending, clear_orchestrator_pending # ============================================================ # CIRCUIT BREAKER — API degradation state # ============================================================ _api_failure_count = 0 _api_degraded_until = 0.0 # timestamp when degraded period ends DEGRADED_DURATION = 60.0 # seconds FAILURE_THRESHOLD = 3 # ============================================================ # RATE LIMITING — per chat_id # ============================================================ _processing_chats: dict[int, float] = {} RATE_LIMIT_SECONDS = 30 # ============================================================ # HANDLERS # ============================================================ async def clear_history(update: Update, context: ContextTypes.DEFAULT_TYPE): """Limpa o histórico do usuário.""" chat_id = update.effective_chat.id sm_clear_history(chat_id) await update.message.reply_text("🧹 Histórico de conversa limpo! Como posso ajudar agora?") async def call_antigravity_api(endpoint: str, payload: dict) -> str: """Faz a chamada para a API interna do BotVPS com circuit breaker.""" global _api_failure_count, _api_degraded_until # Circuit breaker: check if API is degraded now = time.time() if now < _api_degraded_until: remaining = int(_api_degraded_until - now) return f"❌ *API em Modo Degradado:* Muitas falhas recentes. Tente novamente em {remaining}s." max_retries = 3 for attempt in range(max_retries): try: async with httpx.AsyncClient(timeout=GLOBAL_TIMEOUT) as client: logger.info(f"Enviando payload para {endpoint} (Tentativa {attempt+1})") response = await client.post(f"{API_BASE_URL}{endpoint}", json=payload) response.raise_for_status() data = response.json() # Success: reset circuit breaker _api_failure_count = 0 return data.get("reply") or data.get("message") or str(data) except (httpx.ConnectError, httpx.HTTPStatusError) as e: logger.error(f"Erro de conexão na API (Tentativa {attempt+1}): {e}") if attempt < max_retries - 1: await asyncio.sleep(2) else: # Final failure: trip circuit breaker _api_failure_count += 1 if _api_failure_count >= FAILURE_THRESHOLD: _api_degraded_until = time.time() + DEGRADED_DURATION logger.warning(f"CIRCUIT BREAKER ATIVADO — API degradada por {DEGRADED_DURATION}s") return "❌ *Erro de Conexão:* A API do BotVPS parece estar offline ou reiniciando. Tente novamente em instantes." except Exception as e: logger.error(f"Erro inesperado na chamada API: {str(e)}") return f"❌ *Erro Interno:* {str(e)}" async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Manipulador central de mensagens com blindagem contra crashes.""" try: if not update.message or not update.message.text: return chat_id = update.effective_chat.id user_id = update.effective_user.id # Filtro de Segurança if ALLOWED_USER_ID and user_id != ALLOWED_USER_ID: logger.warning(f"Acesso negado para o ID: {user_id}") return text = update.message.text logger.info(f"Mensagem recebida: {text[:50]}...") # Rate limiting: block if a message from this chat is already processing now = time.time() if chat_id in _processing_chats: elapsed = now - _processing_chats[chat_id] if elapsed < RATE_LIMIT_SECONDS: remaining = int(RATE_LIMIT_SECONDS - elapsed) await update.message.reply_text(f"⏳ Aguarde {remaining}s antes de enviar outra mensagem.") return _processing_chats[chat_id] = now # Lógica de reset por texto cmd_limpar = text.lower().strip() if cmd_limpar in ["reset", "limpar histórico", "limpar"]: sm_clear_history(chat_id) await update.message.reply_text("🧹 Memória limpa. O que deseja fazer?") return # Carrega histórico persistente chat_history = get_history(chat_id) # Normaliza texto (remove barra inicial se existir) text_normalized = text.strip() starts_with_slash = text_normalized.startswith('/') text_clean = text_normalized.lstrip('/') is_hermes = text_clean.lower().startswith('hermes') is_cmd = text_clean.lower().startswith(('bash', 'vps', 'cmd')) text_lower = text.lower().strip() # Verifica se há orchestrator pendente para confirmação pending = get_orchestrator_pending(chat_id) is_confirmation = text_lower in ('sim', 's', 'confirmar', 'confirma', 'yes', 'y', 'confirme', 'ok', 'conf', 'aceito', 'sim!', 'sim,', 'yes!') # Processamento if starts_with_slash and is_cmd: task = text_clean[len(text_clean.split()[0]):].strip() if not task: await update.message.reply_text("❓ Envie o comando após o prefixo.") return await update.message.reply_text("⚙️ *Processando tarefa...*", parse_mode='Markdown') reply = await call_antigravity_api("/api/orchestrate", {"task": task, "chat_id": chat_id}) elif pending and is_confirmation: # Há um plano pendente — continua com confirmação clear_orchestrator_pending(chat_id) await update.message.reply_text("✅ *Confirmado! Executando o plano...*", parse_mode='Markdown') reply = await call_antigravity_api("/api/orchestrate", { "task": pending.get("task", ""), "plan": pending.get("plan"), "chat_id": chat_id, "confirmed": True }) elif pending: # Há um plano pendente mas usuário não confirmou — lembra dele clear_orchestrator_pending(chat_id) await update.message.reply_text("🛑 Execução cancelada. Enviando nova requisição...") payload = {"text": text, "history": chat_history[-10:]} reply = await call_antigravity_api("/api/chat", payload) add_message(chat_id, text, reply) elif is_hermes: # Extrai a tarefa (remove "hermes" do início, com ou sem barra) task = text_clean[6:].strip() if len(text_clean) > 6 else "" if not task: await update.message.reply_text("❓ Digite sua tarefa após 'hermes'. Ex: `hermes Instale o nginx`") return await update.message.reply_text("🤖 *Hermes assumindo o controle. Isso pode demorar alguns minutos...*", parse_mode='Markdown') # Passa contexto completo: user_id, chat_id e histórico reply = await call_antigravity_api("/api/hermes", { "task": task, "user_id": user_id, "chat_id": chat_id, "history": chat_history[-10:] }) else: await context.bot.send_chat_action(chat_id=chat_id, action="typing") payload = {"text": text, "history": chat_history[-10:]} reply = await call_antigravity_api("/api/chat", payload) # Salva no histórico persistente add_message(chat_id, text, reply) # Envia resposta if len(reply) > 4000: reply = reply[:3900] + "... [Truncado]" try: await update.message.reply_text(reply, parse_mode='Markdown') except Exception: await update.message.reply_text(reply) except Exception as e: logger.error(f"FALHA NO HANDLE_MESSAGE: {e}") try: await update.message.reply_text("⚠️ Ocorreu um erro ao processar sua mensagem. O sistema foi notificado.") except: pass async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE): """Manipula mensagens de voz com blindagem contra crashes.""" try: if not update.message or not update.message.voice: return chat_id = update.effective_chat.id user_id = update.effective_user.id if ALLOWED_USER_ID and user_id != ALLOWED_USER_ID: return await context.bot.send_chat_action(chat_id=chat_id, action="record_voice") voice_file = await update.message.voice.get_file() temp_path = f"/tmp/tg_voice_{uuid.uuid4().hex}.ogg" await voice_file.download_to_drive(temp_path) async with httpx.AsyncClient(timeout=GLOBAL_TIMEOUT) as client: with open(temp_path, "rb") as f: files = {"audio": (os.path.basename(temp_path), f, "audio/ogg")} response = await client.post(f"{API_BASE_URL}/api/chat-audio", files=files) response.raise_for_status() data = response.json() user_text = data.get("text", "[Voz não transcrita]") bot_reply = data.get("reply", "Erro no processamento.") audio_url = data.get("audio_url") await update.message.reply_text(f"🎤 *Sua mensagem:* {user_text}") try: await update.message.reply_text(bot_reply, parse_mode='Markdown') except: await update.message.reply_text(bot_reply) if audio_url: filename = audio_url.split("/")[-1] audio_path = os.path.join("/tmp", filename) logger.info(f"Tentando enviar voz: {audio_path}") if os.path.exists(audio_path): with open(audio_path, "rb") as audio_file: await context.bot.send_voice(chat_id=chat_id, voice=audio_file) logger.info(f"Voz enviada com sucesso: {audio_path}") else: logger.error(f"Arquivo de áudio não encontrado: {audio_path}") add_message(chat_id, user_text, bot_reply) except Exception as e: logger.error(f"FALHA NO HANDLE_VOICE: {e}") await update.message.reply_text("⚠️ Erro ao processar áudio.") finally: if 'temp_path' in locals() and os.path.exists(temp_path): os.remove(temp_path) if __name__ == '__main__': if not TOKEN: logger.error("ERRO: TOKEN ausente!") exit(1) # Inicializa o Bot (python-telegram-bot v20+) application = ApplicationBuilder().token(TOKEN).build() # Adiciona handlers application.add_handler(CommandHandler("limpar", clear_history)) application.add_handler(CommandHandler("clear", clear_history)) application.add_handler(MessageHandler(filters.TEXT | filters.COMMAND, handle_message)) application.add_handler(MessageHandler(filters.VOICE, handle_voice)) logger.info("Ponte Iniciada. Modo: Resiliente.") application.run_polling(drop_pending_updates=True)