Files
BotVPS/credential_manager.py

340 lines
11 KiB
Python

# ============================================================
# CREDENTIAL_MANAGER.PY - Gestão de Credenciais
# Lê credenciais da fonte original (.env do Coolify/Docker)
# NÃO ARMAZENA CREDENCIAIS - SEMPRE LÊ DA FONTE
# ============================================================
import os
import configparser
import time
from typing import Optional, Dict
# ============================================================
# FONTES DE CREDENCIAIS
# ============================================================
CREDENTIAL_SOURCES = {
"coolify": {
"path": "/data/coolify/source/.env",
"parser": "env",
"description": "Coolify (Orquestrador)"
},
"supabase": {
"path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env",
"parser": "env",
"description": "Supabase (BaaS)"
},
"gitea": {
"path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini",
"parser": "ini",
"section": "security",
"description": "Gitea (Git Server)"
},
"logto": {
"path": "/data/coolify/services/ea4tt75aeibqtu19hjqqw12f/.env",
"parser": "env",
"description": "Logto (Authentication)"
}
}
# Coolify API
COOLIFY_API_BASE = "http://localhost:8000/api"
# ============================================================
# CACHE
# ============================================================
_cache: Dict[str, str] = {}
_cache_time: Dict[str, float] = {}
CACHE_TTL = 300 # 5 minutos
# ============================================================
# PARSER FUNCTIONS
# ============================================================
def _read_env_file(path: str) -> Dict[str, str]:
"""Lê arquivo .env e retorna dict de variáveis."""
if not os.path.exists(path):
return {}
result = {}
try:
with open(path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, _, value = line.partition("=")
result[key.strip()] = value.strip()
except Exception as e:
print(f"Erro ao ler {path}: {e}")
return result
def _read_ini_file(path: str, section: str = "security") -> Dict[str, str]:
"""Lê arquivo INI (tipo Gitea) e retorna dict."""
if not os.path.exists(path):
return {}
parser = configparser.ConfigParser()
try:
parser.read(path)
if parser.has_section(section):
return dict(parser.items(section))
except Exception as e:
print(f"Erro ao ler INI {path}: {e}")
return {}
def _get_cache_key(service: str, key: str) -> str:
return f"{service}:{key}"
# ============================================================
# CREDENTIAL FUNCTIONS
# ============================================================
def get_credential(service: str, key: str, use_cache: bool = True, force_reload: bool = False) -> Optional[str]:
"""
Busca credencial diretamente da fonte original.
Args:
service: Nome do serviço (coolify, gitea, supabase, logto)
key: Nome da variável/campo
use_cache: Se True, usa cache em memória (TTL 5 min)
force_reload: Se True, ignora cache e recarrega
Returns:
Valor da credencial ou None se não encontrada
"""
global _cache, _cache_time
cache_key = _get_cache_key(service, key)
# Verifica cache
if use_cache and not force_reload and cache_key in _cache:
if time.time() - _cache_time.get(cache_key, 0) < CACHE_TTL:
return _cache[cache_key]
# Busca na fonte
source = CREDENTIAL_SOURCES.get(service)
if not source:
return None
if source["parser"] == "env":
data = _read_env_file(source["path"])
else: # ini
section = source.get("section", "security")
data = _read_ini_file(source["path"], section)
value = data.get(key)
# Atualiza cache
if value is not None:
_cache[cache_key] = value
_cache_time[cache_key] = time.time()
return value
def get_all_credentials(service: str, use_cache: bool = True) -> Dict[str, str]:
"""Retorna todas as credenciais de um serviço."""
source = CREDENTIAL_SOURCES.get(service)
if not source:
return {}
if source["parser"] == "env":
return _read_env_file(source["path"])
return _read_ini_file(source["path"], source.get("section", "security"))
def get_multiple(service: str, keys: list, use_cache: bool = True) -> Dict[str, Optional[str]]:
"""Busca múltiplas credenciais de um serviço."""
return {key: get_credential(service, key, use_cache) for key in keys}
def clear_cache():
"""Limpa cache de credenciais (útil após update no Coolify)."""
global _cache, _cache_time
_cache = {}
_cache_time = {}
def reload_credential(service: str, key: str) -> Optional[str]:
"""Recarrega uma credencial específica, ignorando cache."""
return get_credential(service, key, use_cache=False, force_reload=True)
# ============================================================
# HELPER FUNCTIONS - SERVIÇOS COMUNS
# ============================================================
def gitea_token() -> str:
"""Retorna token de acesso do Gitea."""
# Tenta buscar do app.ini (INSTALL_LOCK ou TOKEN)
token = get_credential("gitea", "INSTALL_LOCK")
if not token:
token = get_credential("gitea", "TOKEN")
return token or ""
def gitea_url() -> str:
"""Retorna URL base do Gitea."""
return "https://git.reifonas.cloud"
def gitea_api_url() -> str:
"""Retorna URL da API do Gitea."""
return f"{gitea_url()}/api/v1"
def supabase_url() -> str:
"""Retorna URL base do Supabase."""
return "https://supabase.reifonas.cloud"
def supabase_anon_key() -> str:
"""Retorna ANON_KEY do Supabase."""
return get_credential("supabase", "ANON_KEY") or ""
def supabase_service_role_key() -> str:
"""Retorna SERVICE_ROLE_KEY do Supabase."""
return get_credential("supabase", "SERVICE_ROLE_KEY") or ""
def supabase_jwt_secret() -> str:
"""Retorna JWT_SECRET do Supabase."""
return get_credential("supabase", "JWT_SECRET") or ""
def coolify_app_key() -> str:
"""Retorna APP_KEY do Coolify."""
return get_credential("coolify", "APP_KEY") or ""
def coolify_api_base() -> str:
"""Retorna URL base da API do Coolify."""
return COOLIFY_API_BASE
# ============================================================
# COOLIFY API HELPERS
# ============================================================
def coolify_api(endpoint: str, method: str = "GET", data: dict = None) -> dict:
"""
Faz requisição à API do Coolify.
Args:
endpoint: Endpoint da API (ex: "/deployments", "/applications")
method: GET, POST, DELETE, etc.
data: Dados para enviar (JSON)
Returns:
Resposta da API como dict
"""
import requests
url = f"{COOLIFY_API_BASE}{endpoint}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {coolify_app_key()}"
}
try:
if method == "GET":
res = requests.get(url, headers=headers, timeout=30)
elif method == "POST":
res = requests.post(url, headers=headers, json=data, timeout=30)
elif method == "DELETE":
res = requests.delete(url, headers=headers, timeout=30)
else:
return {"error": f"Método {method} não suportado"}
if res.status_code in [200, 201]:
return res.json() if res.text else {"success": True}
return {"error": f"Status {res.status_code}", "detail": res.text}
except Exception as e:
return {"error": str(e)}
def coolify_list_applications() -> list:
"""Lista aplicações no Coolify."""
result = coolify_api("/applications")
if isinstance(result, dict) and "error" in result:
return []
return result if isinstance(result, list) else []
def coolify_list_deployments() -> list:
"""Lista deployments recentes."""
result = coolify_api("/deployments")
if isinstance(result, dict) and "error" in result:
return []
return result if isinstance(result, list) else []
def coolify_get_status() -> dict:
"""Retorna status geral do Coolify."""
return coolify_api("/status")
# ============================================================
# SYNC FUNCTION
# ============================================================
def sync_credentials() -> dict:
"""
Força sync de todas as credenciais.
Limpa cache e recarrega.
Returns:
Status do sync
"""
clear_cache()
result = {
"status": "synced",
"services": {},
"timestamp": time.time()
}
for service in CREDENTIAL_SOURCES:
try:
creds = get_all_credentials(service, use_cache=False)
result["services"][service] = {
"status": "ok",
"keys": len(creds)
}
except Exception as e:
result["services"][service] = {
"status": "error",
"error": str(e)
}
return result
# ============================================================
# STATUS
# ============================================================
def get_services_status() -> dict:
"""Retorna status de todos os serviços."""
status = {}
for service_id, source in CREDENTIAL_SOURCES.items():
path = source["path"]
exists = os.path.exists(path)
status[service_id] = {
"description": source["description"],
"path": path,
"exists": exists,
"keys_count": 0
}
if exists:
creds = get_all_credentials(service_id)
status[service_id]["keys_count"] = len(creds)
return status
# ============================================================
# MAIN TEST
# ============================================================
if __name__ == "__main__":
print("=== Credential Manager Test ===")
print(f"\nStatus dos serviços:")
for service, info in get_services_status().items():
print(f" {service}: {'' if info['exists'] else ''} ({info['keys_count']} chaves)")
print(f"\nCredenciais carregadas:")
print(f" Gitea URL: {gitea_url()}")
print(f" Gitea Token: {'***' + gitea_token()[-8:] if gitea_token() else 'N/A'}")
print(f" Supabase URL: {supabase_url()}")
print(f" Supabase Anon Key: {'***' + supabase_anon_key()[-8:] if supabase_anon_key() else 'N/A'}")
print(f" Coolify API: {coolify_api_base()}")