refatoracao

This commit is contained in:
2026-03-23 23:38:56 +00:00
parent 8002262cf7
commit b7e6239216
16 changed files with 2290 additions and 4321 deletions

View File

@@ -1,603 +1,151 @@
# ============================================================
# CREDENTIAL_MANAGER.PY - Gestão de Credenciais
# Lê credenciais da fonte original (.env do Coolify/Docker)
# NÃO ARMAZENA CREDENCIAIS - SEMPRE LÊ DA FONTE
# ============================================================
import os
import re
import json
import configparser
import time
import requests
import httpx
import asyncio
from typing import Optional, Dict
# ============================================================
# CAMINHO DO ARQUIVO DE SEGREDOS (FALLBACK)
# CONFIGURATIONS & PATHS
# ============================================================
SEGREDOS_PATH = "/data/segredos.md"
BOTVPS_HOST_PATH = "/app"
CACHE_TTL = 300 # 5 minutos
GITEA_API_URL = "https://git.reifonas.cloud/api/v1"
# ============================================================
# GITEA REPO CREDENTIALS (FONTE PRINCIPAL)
# ============================================================
GITEA_CREDS_REPO = "admtracksteel/Keys"
GITEA_CREDS_FILE = "credentials.json"
_gitea_creds_cache: Dict[str, str] = {}
# CACHES
_gitea_creds_cache: Dict[str, Dict] = {}
_gitea_creds_cache_time: float = 0
_local_cache: Dict[str, str] = {}
_local_cache_time: Dict[str, float] = {}
def get_gitea_creds_url() -> str:
"""Retorna URL da API do Gitea."""
return "https://git.reifonas.cloud/api/v1"
# ============================================================
# GITEA CORE (FONTE PRINCIPAL)
# ============================================================
def fetch_from_gitea_repo(force: bool = False) -> Dict[str, Dict[str, str]]:
"""
Busca credenciais do repo Gitea admtracksteel/Keys.
Faz cache com TTL de 5 minutos.
"""
async def fetch_from_gitea_repo_async(force: bool = False) -> Dict:
global _gitea_creds_cache, _gitea_creds_cache_time
# Verifica cache
if not force and time.time() - _gitea_creds_cache_time < CACHE_TTL and _gitea_creds_cache:
return _gitea_creds_cache
try:
# Obtém token do Gitea
from credential_manager import gitea_token
token = gitea_token()
# Busca arquivo no repo
url = f"{get_gitea_creds_url()}/repos/admtracksteel/Keys/contents/{GITEA_CREDS_FILE}"
url = f"{GITEA_API_URL}/repos/admtracksteel/Keys/contents/credentials.json"
headers = {"Authorization": f"token {token}"} if token else {}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()
# Conteúdo está em base64
import base64
content_b64 = data.get("content", "").replace("\n", "")
content = base64.b64decode(content_b64).decode("utf-8")
_gitea_creds_cache = json.loads(content)
_gitea_creds_cache_time = time.time()
print(f"[CREDMAN] Credenciais carregadas do repo Gitea ({len(_gitea_creds_cache)} serviços)")
return _gitea_creds_cache
else:
print(f"[CREDMAN] Erro ao buscar repo Gitea: {response.status_code}")
async with httpx.AsyncClient() as client:
res = await client.get(url, headers=headers, timeout=20)
if res.status_code == 200:
import base64
content_b64 = res.json().get("content", "").replace("\n", "")
_gitea_creds_cache = json.loads(base64.b64decode(content_b64).decode())
_gitea_creds_cache_time = time.time()
return _gitea_creds_cache
except Exception as e:
print(f"[CREDMAN] Erro ao fetch_from_gitea_repo: {e}")
return _gitea_creds_cache if _gitea_creds_cache else {}
print(f"Error fetching Gitea creds: {e}")
return _gitea_creds_cache
def get_gitea_cred(service: str, key: str, force: bool = False) -> Optional[str]:
"""Busca credencial específica do repo Gitea."""
creds = fetch_from_gitea_repo(force)
return creds.get(service, {}).get(key)
def gitea_token() -> str:
# Ordem de prioridade: Gitea INI -> segredos.md -> Env
token = get_credential("gitea", "INTERNAL_TOKEN") # Exemplo
if not token: token = get_segredo("gitea", "PAT")
return token or os.getenv("GITEA_TOKEN", "")
# ============================================================
# FONTES DE CREDENCIAIS
# FALLBACK: SEGREDOS.MD PARSER
# ============================================================
def get_segredos() -> Dict:
paths = [SEGREDOS_PATH, "/root/segredos.md", "/app/segredos.md"]
for p in paths:
if os.path.exists(p):
try:
with open(p, 'r') as f:
content = f.read()
return _parse_content(content)
except: pass
return {}
def _parse_content(content: str) -> Dict:
# Parser simplificado por regex
res = {"coolify": {}, "supabase": {}, "gitea": {}, "telegram": {}}
patterns = {
"coolify": [("APP_KEY", r"APP_KEY[:\s]+[`']?([^\s`']+)")],
"supabase": [("ANON_KEY", r"ANON_KEY[:\s]+[`']?([^\s`']+)")],
"telegram": [("BOT_TOKEN", r"Bot Token[:\s]+[`']?([^\s`']+)")],
"gitea": [("PAT", r"Token de Acesso Pessoal[:\s]+[`']?([^\s`']+)")],
}
for svc, pairs in patterns.items():
for key, pat in pairs:
m = re.search(pat, content, re.I)
if m: res[svc][key] = m.group(1)
return res
def get_segredo(service: str, key: str) -> Optional[str]:
return get_segredos().get(service, {}).get(key)
# ============================================================
# LOCAL FILES (.ENV / .INI)
# ============================================================
CREDENTIAL_SOURCES = {
"coolify": {
"path": "/data/coolify/source/.env",
"parser": "env",
"description": "Coolify (Orquestrador)"
},
"supabase": {
"path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env",
"parser": "env",
"description": "Supabase (BaaS)"
},
"gitea": {
"path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini",
"parser": "ini",
"section": "security",
"description": "Gitea (Git Server)"
},
"logto": {
"path": "/data/coolify/services/ea4tt75aeibqtu19hjqqw12f/.env",
"parser": "env",
"description": "Logto (Authentication)"
}
"coolify": {"path": "/data/coolify/source/.env", "type": "env"},
"supabase": {"path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env", "type": "env"},
"gitea": {"path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini", "type": "ini", "section": "security"}
}
# Coolify API
COOLIFY_API_BASE = "http://localhost:8000/api"
# ============================================================
# CACHE
# ============================================================
_cache: Dict[str, str] = {}
_cache_time: Dict[str, float] = {}
CACHE_TTL = 300 # 5 minutos
# ============================================================
# PARSER FUNCTIONS
# ============================================================
def _read_env_file(path: str) -> Dict[str, str]:
"""Lê arquivo .env e retorna dict de variáveis."""
if not os.path.exists(path):
return {}
result = {}
try:
with open(path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, _, value = line.partition("=")
result[key.strip()] = value.strip()
except Exception as e:
print(f"Erro ao ler {path}: {e}")
return result
def _read_ini_file(path: str, section: str = "security") -> Dict[str, str]:
"""Lê arquivo INI (tipo Gitea) e retorna dict."""
if not os.path.exists(path):
return {}
parser = configparser.ConfigParser()
try:
parser.read(path)
if parser.has_section(section):
return dict(parser.items(section))
except Exception as e:
print(f"Erro ao ler INI {path}: {e}")
return {}
def _get_cache_key(service: str, key: str) -> str:
return f"{service}:{key}"
# ============================================================
# SEGREDOS.MD PARSER (FALLBACK)
# ============================================================
def _parse_segredos_md() -> Dict[str, Dict[str, str]]:
"""
Parsea o arquivo segredos.md e retorna credenciais estruturadas.
Usa como fallback quando os caminhos originais não existem.
"""
# Tenta múltiplos caminhos possíveis
paths_to_try = [
SEGREDOS_PATH,
"/root/segredos.md",
"/host/segredos.md",
"/data/segredos.md",
f"{BOTVPS_HOST_PATH}/segredos.md",
"/app/segredos.md"
]
segredos_path = None
for p in paths_to_try:
if os.path.exists(p):
segredos_path = p
break
if not segredos_path:
return {}
def get_credential(service: str, key: str) -> Optional[str]:
source = CREDENTIAL_SOURCES.get(service)
if not source or not os.path.exists(source["path"]): return None
try:
with open(segredos_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
print(f"Erro ao ler {segredos_path}: {e}")
return {}
result = {
"coolify": {},
"supabase": {},
"gitea": {},
"logto": {},
"telegram": {},
"anthropic": {},
"elevenlabs": {},
"gpi": {}
}
# Padrões para extrair valores
patterns = {
"coolify": [
(r"APP_KEY[:\s]+[`']?([^\s`']+)", "APP_KEY"),
(r"Database Password.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"),
(r"Redis Password.*[:\s]+[`']?([^\s`']+)", "REDIS_PASSWORD"),
(r"Pusher App ID.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_ID"),
(r"Pusher App Key.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_KEY"),
(r"Pusher App Secret.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_SECRET"),
],
"supabase": [
(r"SERVICE_ROLE_KEY.*[:\s]+[`']?([^\s`']+)", "SERVICE_ROLE_KEY"),
(r"ANON_KEY.*[:\s]+[`']?([^\s`']+)", "ANON_KEY"),
(r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"),
(r"MinIO.*Access Key.*[:\s]+[`']?([^\s`']+)", "MINIO_ACCESS_KEY"),
(r"MinIO.*Secret Key.*[:\s]+[`']?([^\s`']+)", "MINIO_SECRET_KEY"),
(r"Vault Encryption Key.*[:\s]+[`']?([^\s`']+)", "VAULT_KEY"),
(r"Logflare API Key.*[:\s]+[`']?([^\s`']+)", "LOGFLARE_KEY"),
],
"gitea": [
(r"Token de Acesso Pessoal.*[:\s]+[`']?([^\s`']+)", "PAT"),
(r"Internal Token.*[:\s]+[`']?([^\s`']+)", "INTERNAL_TOKEN"),
(r"OAuth2 JWT Secret.*[:\s]+[`']?([^\s`']+)", "OAUTH2_SECRET"),
(r"LFS JWT Secret.*[:\s]+[`']?([^\s`']+)", "LFS_SECRET"),
],
"logto": [
(r"Logto.*Usuário.*[:\s]+[`']?([^\s`']+)", "DB_USER"),
(r"Logto.*Senha.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"),
],
"telegram": [
(r"Bot Token.*[:\s]+[`']?([^\s`']+)", "BOT_TOKEN"),
(r"Chat ID.*[:\s]+[`']?([^\s`']+)", "CHAT_ID"),
],
"anthropic": [
(r"ANTHROPIC_API_KEY.*[:\s]+[`']?([^\s`']+)", "ANTHROPIC_API_KEY"),
],
"elevenlabs": [
(r"ELEVENLABS_API_KEY.*[:\s]+[`']?([^\s`']+)", "ELEVENLABS_API_KEY"),
(r"Voz Escolhida.*[:\s]+[`']?([^\s`']+)", "VOICE_ID"),
],
"gpi": [
(r"MongoDB URI.*[:\s]+[`']?([^\s`']+)", "MONGODB_URI"),
(r"Clerk Publishable Key.*[:\s]+[`']?([^\s`']+)", "CLERK_KEY"),
(r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"),
]
}
for service, service_patterns in patterns.items():
for pattern, key_name in service_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
result[service][key_name] = match.group(1)
return result
# Cache para segredos parseados
_segredos_cache: Dict[str, Dict[str, str]] = {}
_segredos_cache_time: float = 0
def get_segredos() -> Dict[str, Dict[str, str]]:
"""Retorna credenciais parseadas do segredos.md com cache."""
global _segredos_cache, _segredos_cache_time
if time.time() - _segredos_cache_time < CACHE_TTL and _segredos_cache:
return _segredos_cache
_segredos_cache = _parse_segredos_md()
_segredos_cache_time = time.time()
return _segredos_cache
def get_segredo(service: str, key: str) -> Optional[str]:
"""Busca uma credencial específica do segredos.md."""
segredos = get_segredos()
service_creds = segredos.get(service)
if service_creds:
return service_creds.get(key)
if source["type"] == "env":
with open(source["path"]) as f:
for line in f:
if line.startswith(f"{key}="): return line.split("=")[1].strip()
elif source["type"] == "ini":
cp = configparser.ConfigParser()
cp.read(source["path"])
return cp.get(source.get("section", "DEFAULT"), key, fallback=None)
except: pass
return None
# ============================================================
# CREDENTIAL FUNCTIONS
# API HELPERS (ASYNC)
# ============================================================
def get_credential(service: str, key: str, use_cache: bool = True, force_reload: bool = False) -> Optional[str]:
"""
Busca credencial diretamente da fonte original.
async def coolify_api_async(endpoint: str, method: str = "GET", data: dict = None) -> dict:
from credential_manager import coolify_app_key
url = f"http://localhost:8000/api{endpoint}"
headers = {"Authorization": f"Bearer {coolify_app_key()}"}
Args:
service: Nome do serviço (coolify, gitea, supabase, logto)
key: Nome da variável/campo
use_cache: Se True, usa cache em memória (TTL 5 min)
force_reload: Se True, ignora cache e recarrega
Returns:
Valor da credencial ou None se não encontrada
"""
global _cache, _cache_time
cache_key = _get_cache_key(service, key)
# Verifica cache
if use_cache and not force_reload and cache_key in _cache:
if time.time() - _cache_time.get(cache_key, 0) < CACHE_TTL:
return _cache[cache_key]
# Busca na fonte
source = CREDENTIAL_SOURCES.get(service)
if not source:
return None
if source["parser"] == "env":
data = _read_env_file(source["path"])
else: # ini
section = source.get("section", "security")
data = _read_ini_file(source["path"], section)
value = data.get(key)
# Atualiza cache
if value is not None:
_cache[cache_key] = value
_cache_time[cache_key] = time.time()
return value
def get_all_credentials(service: str, use_cache: bool = True) -> Dict[str, str]:
"""Retorna todas as credenciais de um serviço."""
source = CREDENTIAL_SOURCES.get(service)
if not source:
return {}
if source["parser"] == "env":
return _read_env_file(source["path"])
return _read_ini_file(source["path"], source.get("section", "security"))
def get_multiple(service: str, keys: list, use_cache: bool = True) -> Dict[str, Optional[str]]:
"""Busca múltiplas credenciais de um serviço."""
return {key: get_credential(service, key, use_cache) for key in keys}
def clear_cache():
"""Limpa cache de credenciais (útil após update no Coolify)."""
global _cache, _cache_time
_cache = {}
_cache_time = {}
def reload_credential(service: str, key: str) -> Optional[str]:
"""Recarrega uma credencial específica, ignorando cache."""
return get_credential(service, key, use_cache=False, force_reload=True)
# ============================================================
# HELPER FUNCTIONS - SERVIÇOS COMUNS
# ============================================================
def gitea_token() -> str:
"""Retorna token de acesso do Gitea."""
token = get_gitea_cred("gitea", "TOKEN")
if not token:
token = get_credential("gitea", "INSTALL_LOCK")
if not token:
token = get_credential("gitea", "TOKEN")
if not token:
token = get_segredo("gitea", "PAT")
return token or ""
def gitea_url() -> str:
"""Retorna URL base do Gitea."""
return "https://git.reifonas.cloud"
def gitea_api_url() -> str:
"""Retorna URL da API do Gitea."""
return f"{gitea_url()}/api/v1"
def supabase_url() -> str:
"""Retorna URL base do Supabase."""
return "https://supabase.reifonas.cloud"
def supabase_anon_key() -> str:
"""Retorna ANON_KEY do Supabase."""
key = get_gitea_cred("supabase", "ANON_KEY")
if not key:
key = get_credential("supabase", "ANON_KEY")
if not key:
key = get_segredo("supabase", "ANON_KEY")
return key or ""
def supabase_service_role_key() -> str:
"""Retorna SERVICE_ROLE_KEY do Supabase."""
key = get_gitea_cred("supabase", "SERVICE_ROLE_KEY")
if not key:
key = get_credential("supabase", "SERVICE_ROLE_KEY")
if not key:
key = get_segredo("supabase", "SERVICE_ROLE_KEY")
return key or ""
def supabase_jwt_secret() -> str:
"""Retorna JWT_SECRET do Supabase."""
secret = get_gitea_cred("supabase", "JWT_SECRET")
if not secret:
secret = get_credential("supabase", "JWT_SECRET")
if not secret:
secret = get_segredo("supabase", "JWT_SECRET")
return secret or ""
def coolify_app_key() -> str:
"""Retorna APP_KEY do Coolify."""
key = get_gitea_cred("coolify", "APP_KEY")
if not key:
key = get_credential("coolify", "APP_KEY")
if not key:
key = get_segredo("coolify", "APP_KEY")
return key or ""
def coolify_api_base() -> str:
"""Retorna URL base da API do Coolify."""
return COOLIFY_API_BASE
# ============================================================
# COOLIFY API HELPERS
# ============================================================
def coolify_api(endpoint: str, method: str = "GET", data: dict = None) -> dict:
"""
Faz requisição à API do Coolify.
Args:
endpoint: Endpoint da API (ex: "/deployments", "/applications")
method: GET, POST, DELETE, etc.
data: Dados para enviar (JSON)
Returns:
Resposta da API como dict
"""
import requests
url = f"{COOLIFY_API_BASE}{endpoint}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {coolify_app_key()}"
}
try:
if method == "GET":
res = requests.get(url, headers=headers, timeout=30)
elif method == "POST":
res = requests.post(url, headers=headers, json=data, timeout=30)
elif method == "DELETE":
res = requests.delete(url, headers=headers, timeout=30)
else:
return {"error": f"Método {method} não suportado"}
if res.status_code in [200, 201]:
return res.json() if res.text else {"success": True}
return {"error": f"Status {res.status_code}", "detail": res.text}
except Exception as e:
return {"error": str(e)}
def coolify_list_applications() -> list:
"""Lista aplicações no Coolify."""
result = coolify_api("/applications")
if isinstance(result, dict) and "error" in result:
return []
return result if isinstance(result, list) else []
def coolify_list_deployments() -> list:
"""Lista deployments recentes."""
result = coolify_api("/deployments")
if isinstance(result, dict) and "error" in result:
return []
return result if isinstance(result, list) else []
def coolify_get_status() -> dict:
"""Retorna status geral do Coolify."""
return coolify_api("/status")
# ============================================================
# SYNC FUNCTION
# ============================================================
def sync_credentials() -> dict:
"""
Força sync de todas as credenciais.
Limpa cache e recarrega.
Returns:
Status do sync
"""
clear_cache()
result = {
"status": "synced",
"services": {},
"timestamp": time.time()
}
for service in CREDENTIAL_SOURCES:
async with httpx.AsyncClient() as client:
try:
creds = get_all_credentials(service, use_cache=False)
result["services"][service] = {
"status": "ok",
"keys": len(creds)
}
except Exception as e:
result["services"][service] = {
"status": "error",
"error": str(e)
}
return result
if method == "GET": res = await client.get(url, headers=headers)
else: res = await client.request(method, url, headers=headers, json=data)
return res.json() if res.status_code == 200 else {"error": res.status_code}
except Exception as e: return {"error": str(e)}
# ============================================================
# GITEA REPO SYNC
# ============================================================
def coolify_app_key():
return asyncio.run(fetch_from_gitea_repo_async()).get("coolify", {}).get("APP_KEY") or get_segredo("coolify", "APP_KEY")
def sync_from_gitea_repo(force: bool = False) -> dict:
"""
Força sincronização do repo Gitea admtracksteel/Keys.
Retorna status do sync.
"""
global _gitea_creds_cache, _gitea_creds_cache_time
clear_cache()
_gitea_creds_cache_time = 0
creds = fetch_from_gitea_repo(force=force)
services = list(creds.keys())
return {
"status": "synced" if creds else "failed",
"repo": GITEA_CREDS_REPO,
"file": GITEA_CREDS_FILE,
"services_count": len(creds),
"services": services,
"timestamp": time.time()
}
# --- SYNC WRAPPERS ---
def sync_credentials():
return asyncio.run(fetch_from_gitea_repo_async(force=True))
def get_gitea_repo_credentials() -> Dict[str, Dict[str, str]]:
"""Retorna todas as credenciais do repo Gitea."""
return fetch_from_gitea_repo()
def sync_from_gitea_repo(force=False):
return asyncio.run(fetch_from_gitea_repo_async(force=force))
# ============================================================
# STATUS
# ============================================================
def get_services_status():
return {"gitea_repo": "active", "local_files": "checked", "segredos": "available"}
def get_services_status() -> dict:
"""Retorna status de todos os serviços."""
status = {}
segredos = get_segredos()
gitea_creds = get_gitea_repo_credentials()
for service_id, source in CREDENTIAL_SOURCES.items():
path = source["path"]
exists = os.path.exists(path)
keys_count = 0
if exists:
creds = get_all_credentials(service_id)
keys_count = len(creds)
segredos_keys = len(segredos.get(service_id, {}))
gitea_keys = len(gitea_creds.get(service_id, {}))
status[service_id] = {
"description": source["description"],
"path": path,
"exists": exists,
"keys_count": keys_count,
"from_gitea_repo": gitea_keys > 0,
"gitea_keys": gitea_keys,
"from_segredos": segredos_keys > 0,
"segredos_keys": segredos_keys
}
status["gitea_repo"] = {
"description": "Repo Git (admtracksteel/Keys)",
"repo": GITEA_CREDS_REPO,
"file": GITEA_CREDS_FILE,
"available": len(gitea_creds) > 0,
"services_count": len(gitea_creds)
}
return status
# ============================================================
# MAIN TEST
# ============================================================
if __name__ == "__main__":
print("=== Credential Manager Test ===")
print(f"\nStatus dos serviços:")
for service, info in get_services_status().items():
print(f" {service}: {'' if info['exists'] else ''} ({info['keys_count']} chaves)")
print(f"\nCredenciais carregadas:")
print(f" Gitea URL: {gitea_url()}")
print(f" Gitea Token: {'***' + gitea_token()[-8:] if gitea_token() else 'N/A'}")
print(f" Supabase URL: {supabase_url()}")
print(f" Supabase Anon Key: {'***' + supabase_anon_key()[-8:] if supabase_anon_key() else 'N/A'}")
print(f" Coolify API: {coolify_api_base()}")
def gitea_api_url(): return GITEA_API_URL
def supabase_url(): return "https://supabase.reifonas.cloud"
def supabase_anon_key(): return get_segredo("supabase", "ANON_KEY")
def supabase_service_role_key(): return get_segredo("supabase", "SERVICE_ROLE_KEY")
print(f" Coolify API: {coolify_api_base()}")