198 lines
8.2 KiB
Python
198 lines
8.2 KiB
Python
import os
|
|
import requests
|
|
import asyncio
|
|
from dotenv import load_dotenv
|
|
from telegram import Update
|
|
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
|
from ai_agent import query_agent
|
|
import speech_recognition as sr
|
|
from pydub import AudioSegment
|
|
from gtts import gTTS
|
|
|
|
load_dotenv()
|
|
|
|
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
|
ALLOWED_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
|
|
ELEVENLABS_KEY = os.getenv("ELEVENLABS_API_KEY")
|
|
VOICE_ID = os.getenv("ELEVENLABS_VOICE_ID")
|
|
|
|
def synthesize_audio(text: str) -> str:
|
|
"""Gera áudio local/gratuito usando gTTS e retorna o arquivo."""
|
|
try:
|
|
# Remove caracteres indesejados e emojis que atrapalham a fala
|
|
texto_limpo = text.replace("🤖", "").replace("🧑🏫", "").replace("*", "").replace("`", "")
|
|
|
|
filepath = "/tmp/reply_audio.mp3"
|
|
tts = gTTS(text=texto_limpo, lang='pt-br', tld='com.br', slow=False)
|
|
tts.save(filepath)
|
|
return filepath
|
|
except Exception as e:
|
|
print(f"Erro ao gerar voz tts: {e}")
|
|
return ""
|
|
|
|
async def auth_check(update: Update) -> bool:
|
|
"""Verifica se o usuário que enviou a mensagem é o Marcos (Chat ID autorizado)."""
|
|
if str(update.message.chat_id) != ALLOWED_CHAT_ID:
|
|
await update.message.reply_text("Acesso negado. Você não tem permissão para controlar esta VPS.")
|
|
return False
|
|
return True
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if not await auth_check(update): return
|
|
await update.message.reply_text("👋 Olá, Marcos! Antigravity VPS Agent online e pronto para receber comandos.")
|
|
|
|
# Memória persistente da conversa (em memória RAM)
|
|
chat_histories = {}
|
|
|
|
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if not await auth_check(update): return
|
|
|
|
chat_id = update.message.chat_id
|
|
user_msg = update.message.text
|
|
await update.message.reply_chat_action(action="typing")
|
|
|
|
# Busca histórico anterior
|
|
history = chat_histories.get(chat_id, [])
|
|
|
|
# Aciona o Agente de IA para processar o prompt e executar Tools se precisar
|
|
from config import get_config
|
|
cfg = get_config()
|
|
reply = query_agent(prompt=user_msg, override_provider=cfg.get("active_provider"), chat_history=history)
|
|
|
|
# Atualiza histórico
|
|
history.append({"user": user_msg, "bot": reply})
|
|
chat_histories[chat_id] = history[-10:] # Mantém apenas as últimas 10
|
|
|
|
# Se o usuário pedir ativamente por áudio no texto
|
|
if "áudio" in user_msg.lower() or "audio" in user_msg.lower() or "voz" in user_msg.lower():
|
|
await update.message.reply_chat_action(action="record_voice")
|
|
audio_path = synthesize_audio(reply)
|
|
if audio_path:
|
|
await update.message.reply_voice(voice=open(audio_path, 'rb'))
|
|
return
|
|
|
|
# --- NOVO: Lógica para enviar IMAGENS se a IA localizou um arquivo ---
|
|
import re
|
|
# Procura por padrões de imagem Markdown ou caminhos absolutos de imagem
|
|
img_matches = re.findall(r'!\[.*?\]\((/.*?)\)', reply) # 
|
|
if not img_matches:
|
|
# Tenta achar caminhos absolutos que terminam em extensões de imagem
|
|
img_matches = re.findall(r'(/[^\s]+?\.(?:jpg|jpeg|png|gif|webp))', reply, re.IGNORECASE)
|
|
|
|
if img_matches:
|
|
for img_path in img_matches:
|
|
# Garante que o caminho use /host_root se for um arquivo da VPS
|
|
real_path = img_path
|
|
if not real_path.startswith("/host_root") and real_path.startswith("/root"):
|
|
real_path = f"/host_root{real_path}"
|
|
|
|
if os.path.exists(real_path):
|
|
try:
|
|
await update.message.reply_chat_action(action="upload_photo")
|
|
await update.message.reply_photo(photo=open(real_path, 'rb'), caption="🖼️ Imagem localizada na VPS")
|
|
except Exception as e:
|
|
print(f"Erro ao enviar imagem {real_path}: {e}")
|
|
|
|
# Responde no chat normalmente
|
|
await update.message.reply_text(reply)
|
|
|
|
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if not await auth_check(update): return
|
|
|
|
await update.message.reply_chat_action(action="record_voice")
|
|
|
|
# Baixa o aúdio do telegram
|
|
voice_file = await update.message.voice.get_file()
|
|
ogg_path = "/tmp/voice.ogg"
|
|
wav_path = "/tmp/voice.wav"
|
|
|
|
await voice_file.download_to_drive(ogg_path)
|
|
|
|
# Converte para WAV (Requer ffmpeg instalado na maquina)
|
|
try:
|
|
audio = AudioSegment.from_ogg(ogg_path)
|
|
audio.export(wav_path, format="wav")
|
|
except Exception as e:
|
|
await update.message.reply_text(f"Erro ao processar áudio (O ffmpeg está instalado na VPS?): {e}")
|
|
return
|
|
|
|
# Usando SpeechRecognition nativo para transcrever (pode usar Whisper no Ollama depois)
|
|
recognizer = sr.Recognizer()
|
|
with sr.AudioFile(wav_path) as source:
|
|
audio_data = recognizer.record(source)
|
|
try:
|
|
text = recognizer.recognize_google(audio_data, language="pt-BR")
|
|
await update.message.reply_text(f"🗣️ Reconhecido: _{text}_", parse_mode="Markdown")
|
|
|
|
# Busca histórico anterior
|
|
chat_id = update.message.chat_id
|
|
history = chat_histories.get(chat_id, [])
|
|
|
|
# Envia o texto reconhecido para o Agente (respeitando a configuração ativa)
|
|
from config import get_config
|
|
cfg = get_config()
|
|
reply = query_agent(prompt=text, override_provider=cfg.get("active_provider"), chat_history=history)
|
|
|
|
# Atualiza histórico
|
|
history.append({"user": text, "bot": reply})
|
|
chat_histories[chat_id] = history[-10:]
|
|
|
|
# Sintetiza com ElevenLabs e responde com Áudio
|
|
audio_path = synthesize_audio(reply)
|
|
if audio_path:
|
|
await update.message.reply_voice(voice=open(audio_path, 'rb'))
|
|
else:
|
|
await update.message.reply_text(reply)
|
|
|
|
except sr.UnknownValueError:
|
|
await update.message.reply_text("Não consegui entender o que foi dito no áudio.")
|
|
except sr.RequestError as e:
|
|
await update.message.reply_text(f"Erro no serviço de STT: {e}")
|
|
|
|
async def llm_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if not await auth_check(update): return
|
|
args = context.args
|
|
from config import get_config, save_config
|
|
|
|
if not args:
|
|
cfg = get_config()
|
|
await update.message.reply_text(f"Comando incompleto. Use: /llm gemini ou /llm ollama.\n*Status Atual:* {cfg.get('active_provider').upper()}")
|
|
return
|
|
|
|
new_model = args[0].lower()
|
|
if new_model in ["gemini", "ollama"]:
|
|
cfg = get_config()
|
|
cfg["active_provider"] = new_model
|
|
save_config(cfg)
|
|
await update.message.reply_text(f"✅ Inteligência Artificial comutada com sucesso para: *{new_model.upper()}*")
|
|
else:
|
|
await update.message.reply_text("Modelos disponíveis: gemini ou ollama.")
|
|
|
|
async def clear_history(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if not await auth_check(update): return
|
|
chat_id = update.message.chat_id
|
|
if chat_id in chat_histories:
|
|
chat_histories[chat_id] = []
|
|
await update.message.reply_text("🧹 Memória limpa com sucesso!")
|
|
else:
|
|
await update.message.reply_text("A memória já está vazia.")
|
|
|
|
def get_telegram_app():
|
|
if not TOKEN:
|
|
raise ValueError("TELEGRAM_BOT_TOKEN não encontrado no .env")
|
|
app = Application.builder().token(TOKEN).build()
|
|
app.add_handler(CommandHandler("start", start))
|
|
app.add_handler(CommandHandler("llm", llm_command))
|
|
app.add_handler(CommandHandler("limpar", clear_history))
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
|
|
app.add_handler(MessageHandler(filters.VOICE, handle_voice))
|
|
return app
|
|
|
|
# Para testes rápidos se rodado standalone
|
|
if __name__ == "__main__":
|
|
print("--- INICIANDO BOT TELEGRAM (POLLING) ---")
|
|
print("Limpando Webhooks e mensagens pendentes para evitar CONFLITOS...")
|
|
app = get_telegram_app()
|
|
# drop_pending_updates=True limpa a fila e desativa webhooks automaticamente
|
|
app.run_polling(drop_pending_updates=True)
|