604 lines
20 KiB
Python
604 lines
20 KiB
Python
# ============================================================
|
|
# CREDENTIAL_MANAGER.PY - Gestão de Credenciais
|
|
# Lê credenciais da fonte original (.env do Coolify/Docker)
|
|
# NÃO ARMAZENA CREDENCIAIS - SEMPRE LÊ DA FONTE
|
|
# ============================================================
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import configparser
|
|
import time
|
|
import requests
|
|
from typing import Optional, Dict
|
|
|
|
# ============================================================
|
|
# CAMINHO DO ARQUIVO DE SEGREDOS (FALLBACK)
|
|
# ============================================================
|
|
|
|
SEGREDOS_PATH = "/data/segredos.md"
|
|
BOTVPS_HOST_PATH = "/app"
|
|
|
|
# ============================================================
|
|
# GITEA REPO CREDENTIALS (FONTE PRINCIPAL)
|
|
# ============================================================
|
|
|
|
GITEA_CREDS_REPO = "admtracksteel/Keys"
|
|
GITEA_CREDS_FILE = "credentials.json"
|
|
_gitea_creds_cache: Dict[str, str] = {}
|
|
_gitea_creds_cache_time: float = 0
|
|
|
|
def get_gitea_creds_url() -> str:
|
|
"""Retorna URL da API do Gitea."""
|
|
return "https://git.reifonas.cloud/api/v1"
|
|
|
|
def fetch_from_gitea_repo(force: bool = False) -> Dict[str, Dict[str, str]]:
|
|
"""
|
|
Busca credenciais do repo Gitea admtracksteel/Keys.
|
|
Faz cache com TTL de 5 minutos.
|
|
"""
|
|
global _gitea_creds_cache, _gitea_creds_cache_time
|
|
|
|
# Verifica cache
|
|
if not force and time.time() - _gitea_creds_cache_time < CACHE_TTL and _gitea_creds_cache:
|
|
return _gitea_creds_cache
|
|
|
|
try:
|
|
# Obtém token do Gitea
|
|
from credential_manager import gitea_token
|
|
token = gitea_token()
|
|
|
|
# Busca arquivo no repo
|
|
url = f"{get_gitea_creds_url()}/repos/admtracksteel/Keys/contents/{GITEA_CREDS_FILE}"
|
|
headers = {"Authorization": f"token {token}"} if token else {}
|
|
|
|
response = requests.get(url, headers=headers, timeout=30)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
# Conteúdo está em base64
|
|
import base64
|
|
content_b64 = data.get("content", "").replace("\n", "")
|
|
content = base64.b64decode(content_b64).decode("utf-8")
|
|
_gitea_creds_cache = json.loads(content)
|
|
_gitea_creds_cache_time = time.time()
|
|
print(f"[CREDMAN] Credenciais carregadas do repo Gitea ({len(_gitea_creds_cache)} serviços)")
|
|
return _gitea_creds_cache
|
|
else:
|
|
print(f"[CREDMAN] Erro ao buscar repo Gitea: {response.status_code}")
|
|
except Exception as e:
|
|
print(f"[CREDMAN] Erro ao fetch_from_gitea_repo: {e}")
|
|
|
|
return _gitea_creds_cache if _gitea_creds_cache else {}
|
|
|
|
def get_gitea_cred(service: str, key: str, force: bool = False) -> Optional[str]:
|
|
"""Busca credencial específica do repo Gitea."""
|
|
creds = fetch_from_gitea_repo(force)
|
|
return creds.get(service, {}).get(key)
|
|
|
|
# ============================================================
|
|
# FONTES DE CREDENCIAIS
|
|
# ============================================================
|
|
|
|
CREDENTIAL_SOURCES = {
|
|
"coolify": {
|
|
"path": "/data/coolify/source/.env",
|
|
"parser": "env",
|
|
"description": "Coolify (Orquestrador)"
|
|
},
|
|
"supabase": {
|
|
"path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env",
|
|
"parser": "env",
|
|
"description": "Supabase (BaaS)"
|
|
},
|
|
"gitea": {
|
|
"path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini",
|
|
"parser": "ini",
|
|
"section": "security",
|
|
"description": "Gitea (Git Server)"
|
|
},
|
|
"logto": {
|
|
"path": "/data/coolify/services/ea4tt75aeibqtu19hjqqw12f/.env",
|
|
"parser": "env",
|
|
"description": "Logto (Authentication)"
|
|
}
|
|
}
|
|
|
|
# Coolify API
|
|
COOLIFY_API_BASE = "http://localhost:8000/api"
|
|
|
|
# ============================================================
|
|
# CACHE
|
|
# ============================================================
|
|
|
|
_cache: Dict[str, str] = {}
|
|
_cache_time: Dict[str, float] = {}
|
|
CACHE_TTL = 300 # 5 minutos
|
|
|
|
# ============================================================
|
|
# PARSER FUNCTIONS
|
|
# ============================================================
|
|
|
|
def _read_env_file(path: str) -> Dict[str, str]:
|
|
"""Lê arquivo .env e retorna dict de variáveis."""
|
|
if not os.path.exists(path):
|
|
return {}
|
|
|
|
result = {}
|
|
try:
|
|
with open(path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and "=" in line and not line.startswith("#"):
|
|
key, _, value = line.partition("=")
|
|
result[key.strip()] = value.strip()
|
|
except Exception as e:
|
|
print(f"Erro ao ler {path}: {e}")
|
|
|
|
return result
|
|
|
|
def _read_ini_file(path: str, section: str = "security") -> Dict[str, str]:
|
|
"""Lê arquivo INI (tipo Gitea) e retorna dict."""
|
|
if not os.path.exists(path):
|
|
return {}
|
|
|
|
parser = configparser.ConfigParser()
|
|
try:
|
|
parser.read(path)
|
|
|
|
if parser.has_section(section):
|
|
return dict(parser.items(section))
|
|
except Exception as e:
|
|
print(f"Erro ao ler INI {path}: {e}")
|
|
|
|
return {}
|
|
|
|
def _get_cache_key(service: str, key: str) -> str:
|
|
return f"{service}:{key}"
|
|
|
|
# ============================================================
|
|
# SEGREDOS.MD PARSER (FALLBACK)
|
|
# ============================================================
|
|
|
|
def _parse_segredos_md() -> Dict[str, Dict[str, str]]:
|
|
"""
|
|
Parsea o arquivo segredos.md e retorna credenciais estruturadas.
|
|
Usa como fallback quando os caminhos originais não existem.
|
|
"""
|
|
# Tenta múltiplos caminhos possíveis
|
|
paths_to_try = [
|
|
SEGREDOS_PATH,
|
|
"/root/segredos.md",
|
|
"/host/segredos.md",
|
|
"/data/segredos.md",
|
|
f"{BOTVPS_HOST_PATH}/segredos.md",
|
|
"/app/segredos.md"
|
|
]
|
|
|
|
segredos_path = None
|
|
for p in paths_to_try:
|
|
if os.path.exists(p):
|
|
segredos_path = p
|
|
break
|
|
|
|
if not segredos_path:
|
|
return {}
|
|
|
|
try:
|
|
with open(segredos_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
except Exception as e:
|
|
print(f"Erro ao ler {segredos_path}: {e}")
|
|
return {}
|
|
|
|
result = {
|
|
"coolify": {},
|
|
"supabase": {},
|
|
"gitea": {},
|
|
"logto": {},
|
|
"telegram": {},
|
|
"anthropic": {},
|
|
"elevenlabs": {},
|
|
"gpi": {}
|
|
}
|
|
|
|
# Padrões para extrair valores
|
|
patterns = {
|
|
"coolify": [
|
|
(r"APP_KEY[:\s]+[`']?([^\s`']+)", "APP_KEY"),
|
|
(r"Database Password.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"),
|
|
(r"Redis Password.*[:\s]+[`']?([^\s`']+)", "REDIS_PASSWORD"),
|
|
(r"Pusher App ID.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_ID"),
|
|
(r"Pusher App Key.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_KEY"),
|
|
(r"Pusher App Secret.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_SECRET"),
|
|
],
|
|
"supabase": [
|
|
(r"SERVICE_ROLE_KEY.*[:\s]+[`']?([^\s`']+)", "SERVICE_ROLE_KEY"),
|
|
(r"ANON_KEY.*[:\s]+[`']?([^\s`']+)", "ANON_KEY"),
|
|
(r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"),
|
|
(r"MinIO.*Access Key.*[:\s]+[`']?([^\s`']+)", "MINIO_ACCESS_KEY"),
|
|
(r"MinIO.*Secret Key.*[:\s]+[`']?([^\s`']+)", "MINIO_SECRET_KEY"),
|
|
(r"Vault Encryption Key.*[:\s]+[`']?([^\s`']+)", "VAULT_KEY"),
|
|
(r"Logflare API Key.*[:\s]+[`']?([^\s`']+)", "LOGFLARE_KEY"),
|
|
],
|
|
"gitea": [
|
|
(r"Token de Acesso Pessoal.*[:\s]+[`']?([^\s`']+)", "PAT"),
|
|
(r"Internal Token.*[:\s]+[`']?([^\s`']+)", "INTERNAL_TOKEN"),
|
|
(r"OAuth2 JWT Secret.*[:\s]+[`']?([^\s`']+)", "OAUTH2_SECRET"),
|
|
(r"LFS JWT Secret.*[:\s]+[`']?([^\s`']+)", "LFS_SECRET"),
|
|
],
|
|
"logto": [
|
|
(r"Logto.*Usuário.*[:\s]+[`']?([^\s`']+)", "DB_USER"),
|
|
(r"Logto.*Senha.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"),
|
|
],
|
|
"telegram": [
|
|
(r"Bot Token.*[:\s]+[`']?([^\s`']+)", "BOT_TOKEN"),
|
|
(r"Chat ID.*[:\s]+[`']?([^\s`']+)", "CHAT_ID"),
|
|
],
|
|
"anthropic": [
|
|
(r"ANTHROPIC_API_KEY.*[:\s]+[`']?([^\s`']+)", "ANTHROPIC_API_KEY"),
|
|
],
|
|
"elevenlabs": [
|
|
(r"ELEVENLABS_API_KEY.*[:\s]+[`']?([^\s`']+)", "ELEVENLABS_API_KEY"),
|
|
(r"Voz Escolhida.*[:\s]+[`']?([^\s`']+)", "VOICE_ID"),
|
|
],
|
|
"gpi": [
|
|
(r"MongoDB URI.*[:\s]+[`']?([^\s`']+)", "MONGODB_URI"),
|
|
(r"Clerk Publishable Key.*[:\s]+[`']?([^\s`']+)", "CLERK_KEY"),
|
|
(r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"),
|
|
]
|
|
}
|
|
|
|
for service, service_patterns in patterns.items():
|
|
for pattern, key_name in service_patterns:
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|
if match:
|
|
result[service][key_name] = match.group(1)
|
|
|
|
return result
|
|
|
|
# Cache para segredos parseados
|
|
_segredos_cache: Dict[str, Dict[str, str]] = {}
|
|
_segredos_cache_time: float = 0
|
|
|
|
def get_segredos() -> Dict[str, Dict[str, str]]:
|
|
"""Retorna credenciais parseadas do segredos.md com cache."""
|
|
global _segredos_cache, _segredos_cache_time
|
|
|
|
if time.time() - _segredos_cache_time < CACHE_TTL and _segredos_cache:
|
|
return _segredos_cache
|
|
|
|
_segredos_cache = _parse_segredos_md()
|
|
_segredos_cache_time = time.time()
|
|
return _segredos_cache
|
|
|
|
def get_segredo(service: str, key: str) -> Optional[str]:
|
|
"""Busca uma credencial específica do segredos.md."""
|
|
segredos = get_segredos()
|
|
service_creds = segredos.get(service)
|
|
if service_creds:
|
|
return service_creds.get(key)
|
|
return None
|
|
|
|
# ============================================================
|
|
# CREDENTIAL FUNCTIONS
|
|
# ============================================================
|
|
|
|
def get_credential(service: str, key: str, use_cache: bool = True, force_reload: bool = False) -> Optional[str]:
|
|
"""
|
|
Busca credencial diretamente da fonte original.
|
|
|
|
Args:
|
|
service: Nome do serviço (coolify, gitea, supabase, logto)
|
|
key: Nome da variável/campo
|
|
use_cache: Se True, usa cache em memória (TTL 5 min)
|
|
force_reload: Se True, ignora cache e recarrega
|
|
|
|
Returns:
|
|
Valor da credencial ou None se não encontrada
|
|
"""
|
|
global _cache, _cache_time
|
|
|
|
cache_key = _get_cache_key(service, key)
|
|
|
|
# Verifica cache
|
|
if use_cache and not force_reload and cache_key in _cache:
|
|
if time.time() - _cache_time.get(cache_key, 0) < CACHE_TTL:
|
|
return _cache[cache_key]
|
|
|
|
# Busca na fonte
|
|
source = CREDENTIAL_SOURCES.get(service)
|
|
if not source:
|
|
return None
|
|
|
|
if source["parser"] == "env":
|
|
data = _read_env_file(source["path"])
|
|
else: # ini
|
|
section = source.get("section", "security")
|
|
data = _read_ini_file(source["path"], section)
|
|
|
|
value = data.get(key)
|
|
|
|
# Atualiza cache
|
|
if value is not None:
|
|
_cache[cache_key] = value
|
|
_cache_time[cache_key] = time.time()
|
|
|
|
return value
|
|
|
|
def get_all_credentials(service: str, use_cache: bool = True) -> Dict[str, str]:
|
|
"""Retorna todas as credenciais de um serviço."""
|
|
source = CREDENTIAL_SOURCES.get(service)
|
|
if not source:
|
|
return {}
|
|
|
|
if source["parser"] == "env":
|
|
return _read_env_file(source["path"])
|
|
return _read_ini_file(source["path"], source.get("section", "security"))
|
|
|
|
def get_multiple(service: str, keys: list, use_cache: bool = True) -> Dict[str, Optional[str]]:
|
|
"""Busca múltiplas credenciais de um serviço."""
|
|
return {key: get_credential(service, key, use_cache) for key in keys}
|
|
|
|
def clear_cache():
|
|
"""Limpa cache de credenciais (útil após update no Coolify)."""
|
|
global _cache, _cache_time
|
|
_cache = {}
|
|
_cache_time = {}
|
|
|
|
def reload_credential(service: str, key: str) -> Optional[str]:
|
|
"""Recarrega uma credencial específica, ignorando cache."""
|
|
return get_credential(service, key, use_cache=False, force_reload=True)
|
|
|
|
# ============================================================
|
|
# HELPER FUNCTIONS - SERVIÇOS COMUNS
|
|
# ============================================================
|
|
|
|
def gitea_token() -> str:
|
|
"""Retorna token de acesso do Gitea."""
|
|
token = get_gitea_cred("gitea", "TOKEN")
|
|
if not token:
|
|
token = get_credential("gitea", "INSTALL_LOCK")
|
|
if not token:
|
|
token = get_credential("gitea", "TOKEN")
|
|
if not token:
|
|
token = get_segredo("gitea", "PAT")
|
|
return token or ""
|
|
|
|
def gitea_url() -> str:
|
|
"""Retorna URL base do Gitea."""
|
|
return "https://git.reifonas.cloud"
|
|
|
|
def gitea_api_url() -> str:
|
|
"""Retorna URL da API do Gitea."""
|
|
return f"{gitea_url()}/api/v1"
|
|
|
|
def supabase_url() -> str:
|
|
"""Retorna URL base do Supabase."""
|
|
return "https://supabase.reifonas.cloud"
|
|
|
|
def supabase_anon_key() -> str:
|
|
"""Retorna ANON_KEY do Supabase."""
|
|
key = get_gitea_cred("supabase", "ANON_KEY")
|
|
if not key:
|
|
key = get_credential("supabase", "ANON_KEY")
|
|
if not key:
|
|
key = get_segredo("supabase", "ANON_KEY")
|
|
return key or ""
|
|
|
|
def supabase_service_role_key() -> str:
|
|
"""Retorna SERVICE_ROLE_KEY do Supabase."""
|
|
key = get_gitea_cred("supabase", "SERVICE_ROLE_KEY")
|
|
if not key:
|
|
key = get_credential("supabase", "SERVICE_ROLE_KEY")
|
|
if not key:
|
|
key = get_segredo("supabase", "SERVICE_ROLE_KEY")
|
|
return key or ""
|
|
|
|
def supabase_jwt_secret() -> str:
|
|
"""Retorna JWT_SECRET do Supabase."""
|
|
secret = get_gitea_cred("supabase", "JWT_SECRET")
|
|
if not secret:
|
|
secret = get_credential("supabase", "JWT_SECRET")
|
|
if not secret:
|
|
secret = get_segredo("supabase", "JWT_SECRET")
|
|
return secret or ""
|
|
|
|
def coolify_app_key() -> str:
|
|
"""Retorna APP_KEY do Coolify."""
|
|
key = get_gitea_cred("coolify", "APP_KEY")
|
|
if not key:
|
|
key = get_credential("coolify", "APP_KEY")
|
|
if not key:
|
|
key = get_segredo("coolify", "APP_KEY")
|
|
return key or ""
|
|
|
|
def coolify_api_base() -> str:
|
|
"""Retorna URL base da API do Coolify."""
|
|
return COOLIFY_API_BASE
|
|
|
|
# ============================================================
|
|
# COOLIFY API HELPERS
|
|
# ============================================================
|
|
|
|
def coolify_api(endpoint: str, method: str = "GET", data: dict = None) -> dict:
|
|
"""
|
|
Faz requisição à API do Coolify.
|
|
|
|
Args:
|
|
endpoint: Endpoint da API (ex: "/deployments", "/applications")
|
|
method: GET, POST, DELETE, etc.
|
|
data: Dados para enviar (JSON)
|
|
|
|
Returns:
|
|
Resposta da API como dict
|
|
"""
|
|
import requests
|
|
|
|
url = f"{COOLIFY_API_BASE}{endpoint}"
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {coolify_app_key()}"
|
|
}
|
|
|
|
try:
|
|
if method == "GET":
|
|
res = requests.get(url, headers=headers, timeout=30)
|
|
elif method == "POST":
|
|
res = requests.post(url, headers=headers, json=data, timeout=30)
|
|
elif method == "DELETE":
|
|
res = requests.delete(url, headers=headers, timeout=30)
|
|
else:
|
|
return {"error": f"Método {method} não suportado"}
|
|
|
|
if res.status_code in [200, 201]:
|
|
return res.json() if res.text else {"success": True}
|
|
return {"error": f"Status {res.status_code}", "detail": res.text}
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def coolify_list_applications() -> list:
|
|
"""Lista aplicações no Coolify."""
|
|
result = coolify_api("/applications")
|
|
if isinstance(result, dict) and "error" in result:
|
|
return []
|
|
return result if isinstance(result, list) else []
|
|
|
|
def coolify_list_deployments() -> list:
|
|
"""Lista deployments recentes."""
|
|
result = coolify_api("/deployments")
|
|
if isinstance(result, dict) and "error" in result:
|
|
return []
|
|
return result if isinstance(result, list) else []
|
|
|
|
def coolify_get_status() -> dict:
|
|
"""Retorna status geral do Coolify."""
|
|
return coolify_api("/status")
|
|
|
|
# ============================================================
|
|
# SYNC FUNCTION
|
|
# ============================================================
|
|
|
|
def sync_credentials() -> dict:
|
|
"""
|
|
Força sync de todas as credenciais.
|
|
Limpa cache e recarrega.
|
|
|
|
Returns:
|
|
Status do sync
|
|
"""
|
|
clear_cache()
|
|
|
|
result = {
|
|
"status": "synced",
|
|
"services": {},
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
for service in CREDENTIAL_SOURCES:
|
|
try:
|
|
creds = get_all_credentials(service, use_cache=False)
|
|
result["services"][service] = {
|
|
"status": "ok",
|
|
"keys": len(creds)
|
|
}
|
|
except Exception as e:
|
|
result["services"][service] = {
|
|
"status": "error",
|
|
"error": str(e)
|
|
}
|
|
|
|
return result
|
|
|
|
# ============================================================
|
|
# GITEA REPO SYNC
|
|
# ============================================================
|
|
|
|
def sync_from_gitea_repo(force: bool = False) -> dict:
|
|
"""
|
|
Força sincronização do repo Gitea admtracksteel/Keys.
|
|
Retorna status do sync.
|
|
"""
|
|
global _gitea_creds_cache, _gitea_creds_cache_time
|
|
|
|
clear_cache()
|
|
_gitea_creds_cache_time = 0
|
|
|
|
creds = fetch_from_gitea_repo(force=force)
|
|
|
|
services = list(creds.keys())
|
|
|
|
return {
|
|
"status": "synced" if creds else "failed",
|
|
"repo": GITEA_CREDS_REPO,
|
|
"file": GITEA_CREDS_FILE,
|
|
"services_count": len(creds),
|
|
"services": services,
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
def get_gitea_repo_credentials() -> Dict[str, Dict[str, str]]:
|
|
"""Retorna todas as credenciais do repo Gitea."""
|
|
return fetch_from_gitea_repo()
|
|
|
|
# ============================================================
|
|
# STATUS
|
|
# ============================================================
|
|
|
|
def get_services_status() -> dict:
|
|
"""Retorna status de todos os serviços."""
|
|
status = {}
|
|
segredos = get_segredos()
|
|
gitea_creds = get_gitea_repo_credentials()
|
|
|
|
for service_id, source in CREDENTIAL_SOURCES.items():
|
|
path = source["path"]
|
|
exists = os.path.exists(path)
|
|
keys_count = 0
|
|
|
|
if exists:
|
|
creds = get_all_credentials(service_id)
|
|
keys_count = len(creds)
|
|
|
|
segredos_keys = len(segredos.get(service_id, {}))
|
|
gitea_keys = len(gitea_creds.get(service_id, {}))
|
|
|
|
status[service_id] = {
|
|
"description": source["description"],
|
|
"path": path,
|
|
"exists": exists,
|
|
"keys_count": keys_count,
|
|
"from_gitea_repo": gitea_keys > 0,
|
|
"gitea_keys": gitea_keys,
|
|
"from_segredos": segredos_keys > 0,
|
|
"segredos_keys": segredos_keys
|
|
}
|
|
|
|
status["gitea_repo"] = {
|
|
"description": "Repo Git (admtracksteel/Keys)",
|
|
"repo": GITEA_CREDS_REPO,
|
|
"file": GITEA_CREDS_FILE,
|
|
"available": len(gitea_creds) > 0,
|
|
"services_count": len(gitea_creds)
|
|
}
|
|
|
|
return status
|
|
|
|
# ============================================================
|
|
# MAIN TEST
|
|
# ============================================================
|
|
|
|
if __name__ == "__main__":
|
|
print("=== Credential Manager Test ===")
|
|
print(f"\nStatus dos serviços:")
|
|
for service, info in get_services_status().items():
|
|
print(f" {service}: {'✅' if info['exists'] else '❌'} ({info['keys_count']} chaves)")
|
|
|
|
print(f"\nCredenciais carregadas:")
|
|
print(f" Gitea URL: {gitea_url()}")
|
|
print(f" Gitea Token: {'***' + gitea_token()[-8:] if gitea_token() else 'N/A'}")
|
|
print(f" Supabase URL: {supabase_url()}")
|
|
print(f" Supabase Anon Key: {'***' + supabase_anon_key()[-8:] if supabase_anon_key() else 'N/A'}")
|
|
print(f" Coolify API: {coolify_api_base()}")
|