Files
BotVPS/session_manager.py

141 lines
4.6 KiB
Python

"""
Session Manager - Persistência de histórico de chat via SQLite
Mantém contexto conversacional entre mensagens do Telegram
"""
import sqlite3
import json
import os
from datetime import datetime
from typing import List, Dict, Optional
DB_PATH = "/root/Apps/BotVPS/data/sessions.db"
def _get_db():
"""Garante que o diretório e banco existem."""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Cria as tabelas se não existirem."""
with _get_db() as db:
db.execute("""
CREATE TABLE IF NOT EXISTS chat_sessions (
chat_id INTEGER PRIMARY KEY,
history TEXT DEFAULT '[]',
created_at TEXT,
updated_at TEXT
)
""")
db.execute("""
CREATE TABLE IF NOT EXISTS orchestrator_state (
chat_id INTEGER PRIMARY KEY,
plan TEXT,
created_at TEXT
)
""")
db.commit()
def get_history(chat_id: int) -> List[Dict]:
"""Retorna o histórico de um chat."""
with _get_db() as db:
row = db.execute(
"SELECT history FROM chat_sessions WHERE chat_id = ?", (chat_id,)
).fetchone()
if row:
return json.loads(row[0])
return []
def save_history(chat_id: int, history: List[Dict]):
"""Salva o histórico de um chat."""
now = datetime.now().isoformat()
history_json = json.dumps(history, ensure_ascii=False)
with _get_db() as db:
db.execute("""
INSERT INTO chat_sessions (chat_id, history, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(chat_id) DO UPDATE SET history = ?, updated_at = ?
""", (chat_id, history_json, now, now, history_json, now))
db.commit()
def add_message(chat_id: int, user: str, bot: str, max_history: int = 20):
"""Adiciona um par de mensagens ao histórico."""
history = get_history(chat_id)
history.append({"user": user, "bot": bot})
# Mantém apenas os últimos N pares
if len(history) > max_history:
history = history[-max_history:]
save_history(chat_id, history)
def clear_history(chat_id: int):
"""Limpa o histórico de um chat."""
save_history(chat_id, [])
def set_orchestrator_pending(chat_id: int, plan: dict):
"""Salva estado pendente de orquestração para confirmação posterior."""
conn = _get_db()
try:
import json as json_mod
plan_json = json_mod.dumps(plan)
now = datetime.now().isoformat()
conn.execute("""
INSERT INTO chat_sessions (chat_id, history, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(chat_id) DO UPDATE SET
history = COALESCE((SELECT history FROM chat_sessions WHERE chat_id = ?), '[]'),
updated_at = ?
""", (chat_id, '[]', now, now, chat_id, now))
conn.execute("""
CREATE TABLE IF NOT EXISTS orchestrator_state (
chat_id INTEGER PRIMARY KEY,
plan TEXT,
created_at TEXT
)
""")
conn.execute("""
INSERT INTO orchestrator_state (chat_id, plan, created_at)
VALUES (?, ?, ?)
ON CONFLICT(chat_id) DO UPDATE SET plan = ?, created_at = ?
""", (chat_id, plan_json, now, plan_json, now))
conn.commit()
finally:
conn.close()
def get_orchestrator_pending(chat_id: int) -> Optional[Dict]:
"""Retorna o plano pendente de confirmação, se existir."""
conn = _get_db()
try:
import json as json_mod
row = conn.execute(
"SELECT plan, created_at FROM orchestrator_state WHERE chat_id = ?", (chat_id,)
).fetchone()
if row:
import json as json_mod
return {"plan": json_mod.loads(row[0]), "created_at": row[1]}
return None
finally:
conn.close()
def clear_orchestrator_pending(chat_id: int):
"""Remove o estado pendente de orquestração."""
conn = _get_db()
try:
conn.execute("DELETE FROM orchestrator_state WHERE chat_id = ?", (chat_id,))
conn.commit()
finally:
conn.close()
def get_session_info(chat_id: int) -> Optional[Dict]:
"""Retorna info da sessão."""
with _get_db() as db:
row = db.execute(
"SELECT * FROM chat_sessions WHERE chat_id = ?", (chat_id,)
).fetchone()
if row:
return dict(row)
return None
# Inicializa o banco ao importar
init_db()