141 lines
4.6 KiB
Python
141 lines
4.6 KiB
Python
"""
|
|
Session Manager - Persistência de histórico de chat via SQLite
|
|
Mantém contexto conversacional entre mensagens do Telegram
|
|
"""
|
|
import sqlite3
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import List, Dict, Optional
|
|
|
|
DB_PATH = "/root/Apps/BotVPS/data/sessions.db"
|
|
|
|
def _get_db():
|
|
"""Garante que o diretório e banco existem."""
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def init_db():
|
|
"""Cria as tabelas se não existirem."""
|
|
with _get_db() as db:
|
|
db.execute("""
|
|
CREATE TABLE IF NOT EXISTS chat_sessions (
|
|
chat_id INTEGER PRIMARY KEY,
|
|
history TEXT DEFAULT '[]',
|
|
created_at TEXT,
|
|
updated_at TEXT
|
|
)
|
|
""")
|
|
db.execute("""
|
|
CREATE TABLE IF NOT EXISTS orchestrator_state (
|
|
chat_id INTEGER PRIMARY KEY,
|
|
plan TEXT,
|
|
created_at TEXT
|
|
)
|
|
""")
|
|
db.commit()
|
|
|
|
def get_history(chat_id: int) -> List[Dict]:
|
|
"""Retorna o histórico de um chat."""
|
|
with _get_db() as db:
|
|
row = db.execute(
|
|
"SELECT history FROM chat_sessions WHERE chat_id = ?", (chat_id,)
|
|
).fetchone()
|
|
if row:
|
|
return json.loads(row[0])
|
|
return []
|
|
|
|
def save_history(chat_id: int, history: List[Dict]):
|
|
"""Salva o histórico de um chat."""
|
|
now = datetime.now().isoformat()
|
|
history_json = json.dumps(history, ensure_ascii=False)
|
|
with _get_db() as db:
|
|
db.execute("""
|
|
INSERT INTO chat_sessions (chat_id, history, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(chat_id) DO UPDATE SET history = ?, updated_at = ?
|
|
""", (chat_id, history_json, now, now, history_json, now))
|
|
db.commit()
|
|
|
|
def add_message(chat_id: int, user: str, bot: str, max_history: int = 20):
|
|
"""Adiciona um par de mensagens ao histórico."""
|
|
history = get_history(chat_id)
|
|
history.append({"user": user, "bot": bot})
|
|
# Mantém apenas os últimos N pares
|
|
if len(history) > max_history:
|
|
history = history[-max_history:]
|
|
save_history(chat_id, history)
|
|
|
|
def clear_history(chat_id: int):
|
|
"""Limpa o histórico de um chat."""
|
|
save_history(chat_id, [])
|
|
|
|
def set_orchestrator_pending(chat_id: int, plan: dict):
|
|
"""Salva estado pendente de orquestração para confirmação posterior."""
|
|
conn = _get_db()
|
|
try:
|
|
import json as json_mod
|
|
plan_json = json_mod.dumps(plan)
|
|
now = datetime.now().isoformat()
|
|
conn.execute("""
|
|
INSERT INTO chat_sessions (chat_id, history, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(chat_id) DO UPDATE SET
|
|
history = COALESCE((SELECT history FROM chat_sessions WHERE chat_id = ?), '[]'),
|
|
updated_at = ?
|
|
""", (chat_id, '[]', now, now, chat_id, now))
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS orchestrator_state (
|
|
chat_id INTEGER PRIMARY KEY,
|
|
plan TEXT,
|
|
created_at TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
INSERT INTO orchestrator_state (chat_id, plan, created_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(chat_id) DO UPDATE SET plan = ?, created_at = ?
|
|
""", (chat_id, plan_json, now, plan_json, now))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_orchestrator_pending(chat_id: int) -> Optional[Dict]:
|
|
"""Retorna o plano pendente de confirmação, se existir."""
|
|
conn = _get_db()
|
|
try:
|
|
import json as json_mod
|
|
row = conn.execute(
|
|
"SELECT plan, created_at FROM orchestrator_state WHERE chat_id = ?", (chat_id,)
|
|
).fetchone()
|
|
if row:
|
|
import json as json_mod
|
|
return {"plan": json_mod.loads(row[0]), "created_at": row[1]}
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
def clear_orchestrator_pending(chat_id: int):
|
|
"""Remove o estado pendente de orquestração."""
|
|
conn = _get_db()
|
|
try:
|
|
conn.execute("DELETE FROM orchestrator_state WHERE chat_id = ?", (chat_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_session_info(chat_id: int) -> Optional[Dict]:
|
|
"""Retorna info da sessão."""
|
|
with _get_db() as db:
|
|
row = db.execute(
|
|
"SELECT * FROM chat_sessions WHERE chat_id = ?", (chat_id,)
|
|
).fetchone()
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
# Inicializa o banco ao importar
|
|
init_db()
|