# ============================================================ # CREDENTIAL_MANAGER.PY - Gestão de Credenciais # Lê credenciais da fonte original (.env do Coolify/Docker) # NÃO ARMAZENA CREDENCIAIS - SEMPRE LÊ DA FONTE # ============================================================ import os import configparser import time from typing import Optional, Dict # ============================================================ # FONTES DE CREDENCIAIS # ============================================================ CREDENTIAL_SOURCES = { "coolify": { "path": "/data/coolify/source/.env", "parser": "env", "description": "Coolify (Orquestrador)" }, "supabase": { "path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env", "parser": "env", "description": "Supabase (BaaS)" }, "gitea": { "path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini", "parser": "ini", "section": "security", "description": "Gitea (Git Server)" }, "logto": { "path": "/data/coolify/services/ea4tt75aeibqtu19hjqqw12f/.env", "parser": "env", "description": "Logto (Authentication)" } } # Coolify API COOLIFY_API_BASE = "http://localhost:8000/api" # ============================================================ # CACHE # ============================================================ _cache: Dict[str, str] = {} _cache_time: Dict[str, float] = {} CACHE_TTL = 300 # 5 minutos # ============================================================ # PARSER FUNCTIONS # ============================================================ def _read_env_file(path: str) -> Dict[str, str]: """Lê arquivo .env e retorna dict de variáveis.""" if not os.path.exists(path): return {} result = {} try: with open(path) as f: for line in f: line = line.strip() if line and "=" in line and not line.startswith("#"): key, _, value = line.partition("=") result[key.strip()] = value.strip() except Exception as e: print(f"Erro ao ler {path}: {e}") return result def _read_ini_file(path: str, section: str = "security") -> Dict[str, str]: """Lê arquivo INI (tipo Gitea) e retorna dict.""" if not os.path.exists(path): return {} parser = configparser.ConfigParser() try: parser.read(path) if parser.has_section(section): return dict(parser.items(section)) except Exception as e: print(f"Erro ao ler INI {path}: {e}") return {} def _get_cache_key(service: str, key: str) -> str: return f"{service}:{key}" # ============================================================ # CREDENTIAL FUNCTIONS # ============================================================ def get_credential(service: str, key: str, use_cache: bool = True, force_reload: bool = False) -> Optional[str]: """ Busca credencial diretamente da fonte original. Args: service: Nome do serviço (coolify, gitea, supabase, logto) key: Nome da variável/campo use_cache: Se True, usa cache em memória (TTL 5 min) force_reload: Se True, ignora cache e recarrega Returns: Valor da credencial ou None se não encontrada """ global _cache, _cache_time cache_key = _get_cache_key(service, key) # Verifica cache if use_cache and not force_reload and cache_key in _cache: if time.time() - _cache_time.get(cache_key, 0) < CACHE_TTL: return _cache[cache_key] # Busca na fonte source = CREDENTIAL_SOURCES.get(service) if not source: return None if source["parser"] == "env": data = _read_env_file(source["path"]) else: # ini section = source.get("section", "security") data = _read_ini_file(source["path"], section) value = data.get(key) # Atualiza cache if value is not None: _cache[cache_key] = value _cache_time[cache_key] = time.time() return value def get_all_credentials(service: str, use_cache: bool = True) -> Dict[str, str]: """Retorna todas as credenciais de um serviço.""" source = CREDENTIAL_SOURCES.get(service) if not source: return {} if source["parser"] == "env": return _read_env_file(source["path"]) return _read_ini_file(source["path"], source.get("section", "security")) def get_multiple(service: str, keys: list, use_cache: bool = True) -> Dict[str, Optional[str]]: """Busca múltiplas credenciais de um serviço.""" return {key: get_credential(service, key, use_cache) for key in keys} def clear_cache(): """Limpa cache de credenciais (útil após update no Coolify).""" global _cache, _cache_time _cache = {} _cache_time = {} def reload_credential(service: str, key: str) -> Optional[str]: """Recarrega uma credencial específica, ignorando cache.""" return get_credential(service, key, use_cache=False, force_reload=True) # ============================================================ # HELPER FUNCTIONS - SERVIÇOS COMUNS # ============================================================ def gitea_token() -> str: """Retorna token de acesso do Gitea.""" # Tenta buscar do app.ini (INSTALL_LOCK ou TOKEN) token = get_credential("gitea", "INSTALL_LOCK") if not token: token = get_credential("gitea", "TOKEN") return token or "" def gitea_url() -> str: """Retorna URL base do Gitea.""" return "https://git.reifonas.cloud" def gitea_api_url() -> str: """Retorna URL da API do Gitea.""" return f"{gitea_url()}/api/v1" def supabase_url() -> str: """Retorna URL base do Supabase.""" return "https://supabase.reifonas.cloud" def supabase_anon_key() -> str: """Retorna ANON_KEY do Supabase.""" return get_credential("supabase", "ANON_KEY") or "" def supabase_service_role_key() -> str: """Retorna SERVICE_ROLE_KEY do Supabase.""" return get_credential("supabase", "SERVICE_ROLE_KEY") or "" def supabase_jwt_secret() -> str: """Retorna JWT_SECRET do Supabase.""" return get_credential("supabase", "JWT_SECRET") or "" def coolify_app_key() -> str: """Retorna APP_KEY do Coolify.""" return get_credential("coolify", "APP_KEY") or "" def coolify_api_base() -> str: """Retorna URL base da API do Coolify.""" return COOLIFY_API_BASE # ============================================================ # COOLIFY API HELPERS # ============================================================ def coolify_api(endpoint: str, method: str = "GET", data: dict = None) -> dict: """ Faz requisição à API do Coolify. Args: endpoint: Endpoint da API (ex: "/deployments", "/applications") method: GET, POST, DELETE, etc. data: Dados para enviar (JSON) Returns: Resposta da API como dict """ import requests url = f"{COOLIFY_API_BASE}{endpoint}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {coolify_app_key()}" } try: if method == "GET": res = requests.get(url, headers=headers, timeout=30) elif method == "POST": res = requests.post(url, headers=headers, json=data, timeout=30) elif method == "DELETE": res = requests.delete(url, headers=headers, timeout=30) else: return {"error": f"Método {method} não suportado"} if res.status_code in [200, 201]: return res.json() if res.text else {"success": True} return {"error": f"Status {res.status_code}", "detail": res.text} except Exception as e: return {"error": str(e)} def coolify_list_applications() -> list: """Lista aplicações no Coolify.""" result = coolify_api("/applications") if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] def coolify_list_deployments() -> list: """Lista deployments recentes.""" result = coolify_api("/deployments") if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] def coolify_get_status() -> dict: """Retorna status geral do Coolify.""" return coolify_api("/status") # ============================================================ # SYNC FUNCTION # ============================================================ def sync_credentials() -> dict: """ Força sync de todas as credenciais. Limpa cache e recarrega. Returns: Status do sync """ clear_cache() result = { "status": "synced", "services": {}, "timestamp": time.time() } for service in CREDENTIAL_SOURCES: try: creds = get_all_credentials(service, use_cache=False) result["services"][service] = { "status": "ok", "keys": len(creds) } except Exception as e: result["services"][service] = { "status": "error", "error": str(e) } return result # ============================================================ # STATUS # ============================================================ def get_services_status() -> dict: """Retorna status de todos os serviços.""" status = {} for service_id, source in CREDENTIAL_SOURCES.items(): path = source["path"] exists = os.path.exists(path) status[service_id] = { "description": source["description"], "path": path, "exists": exists, "keys_count": 0 } if exists: creds = get_all_credentials(service_id) status[service_id]["keys_count"] = len(creds) return status # ============================================================ # MAIN TEST # ============================================================ if __name__ == "__main__": print("=== Credential Manager Test ===") print(f"\nStatus dos serviços:") for service, info in get_services_status().items(): print(f" {service}: {'✅' if info['exists'] else '❌'} ({info['keys_count']} chaves)") print(f"\nCredenciais carregadas:") print(f" Gitea URL: {gitea_url()}") print(f" Gitea Token: {'***' + gitea_token()[-8:] if gitea_token() else 'N/A'}") print(f" Supabase URL: {supabase_url()}") print(f" Supabase Anon Key: {'***' + supabase_anon_key()[-8:] if supabase_anon_key() else 'N/A'}") print(f" Coolify API: {coolify_api_base()}")