""" Session Manager - Persistência de histórico de chat via SQLite Mantém contexto conversacional entre mensagens do Telegram """ import sqlite3 import json import os from datetime import datetime from typing import List, Dict, Optional DB_PATH = "/root/Apps/BotVPS/data/sessions.db" def _get_db(): """Garante que o diretório e banco existem.""" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): """Cria as tabelas se não existirem.""" with _get_db() as db: db.execute(""" CREATE TABLE IF NOT EXISTS chat_sessions ( chat_id INTEGER PRIMARY KEY, history TEXT DEFAULT '[]', created_at TEXT, updated_at TEXT ) """) db.execute(""" CREATE TABLE IF NOT EXISTS orchestrator_state ( chat_id INTEGER PRIMARY KEY, plan TEXT, created_at TEXT ) """) db.commit() def get_history(chat_id: int) -> List[Dict]: """Retorna o histórico de um chat.""" with _get_db() as db: row = db.execute( "SELECT history FROM chat_sessions WHERE chat_id = ?", (chat_id,) ).fetchone() if row: return json.loads(row[0]) return [] def save_history(chat_id: int, history: List[Dict]): """Salva o histórico de um chat.""" now = datetime.now().isoformat() history_json = json.dumps(history, ensure_ascii=False) with _get_db() as db: db.execute(""" INSERT INTO chat_sessions (chat_id, history, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(chat_id) DO UPDATE SET history = ?, updated_at = ? """, (chat_id, history_json, now, now, history_json, now)) db.commit() def add_message(chat_id: int, user: str, bot: str, max_history: int = 20): """Adiciona um par de mensagens ao histórico.""" history = get_history(chat_id) history.append({"user": user, "bot": bot}) # Mantém apenas os últimos N pares if len(history) > max_history: history = history[-max_history:] save_history(chat_id, history) def clear_history(chat_id: int): """Limpa o histórico de um chat.""" save_history(chat_id, []) def set_orchestrator_pending(chat_id: int, plan: dict): """Salva estado pendente de orquestração para confirmação posterior.""" conn = _get_db() try: import json as json_mod plan_json = json_mod.dumps(plan) now = datetime.now().isoformat() conn.execute(""" INSERT INTO chat_sessions (chat_id, history, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(chat_id) DO UPDATE SET history = COALESCE((SELECT history FROM chat_sessions WHERE chat_id = ?), '[]'), updated_at = ? """, (chat_id, '[]', now, now, chat_id, now)) conn.execute(""" CREATE TABLE IF NOT EXISTS orchestrator_state ( chat_id INTEGER PRIMARY KEY, plan TEXT, created_at TEXT ) """) conn.execute(""" INSERT INTO orchestrator_state (chat_id, plan, created_at) VALUES (?, ?, ?) ON CONFLICT(chat_id) DO UPDATE SET plan = ?, created_at = ? """, (chat_id, plan_json, now, plan_json, now)) conn.commit() finally: conn.close() def get_orchestrator_pending(chat_id: int) -> Optional[Dict]: """Retorna o plano pendente de confirmação, se existir.""" conn = _get_db() try: import json as json_mod row = conn.execute( "SELECT plan, created_at FROM orchestrator_state WHERE chat_id = ?", (chat_id,) ).fetchone() if row: import json as json_mod return {"plan": json_mod.loads(row[0]), "created_at": row[1]} return None finally: conn.close() def clear_orchestrator_pending(chat_id: int): """Remove o estado pendente de orquestração.""" conn = _get_db() try: conn.execute("DELETE FROM orchestrator_state WHERE chat_id = ?", (chat_id,)) conn.commit() finally: conn.close() def get_session_info(chat_id: int) -> Optional[Dict]: """Retorna info da sessão.""" with _get_db() as db: row = db.execute( "SELECT * FROM chat_sessions WHERE chat_id = ?", (chat_id,) ).fetchone() if row: return dict(row) return None # Inicializa o banco ao importar init_db()