# ============================================================ # CREDENTIAL_MANAGER.PY - Gestão de Credenciais # Lê credenciais da fonte original (.env do Coolify/Docker) # NÃO ARMAZENA CREDENCIAIS - SEMPRE LÊ DA FONTE # ============================================================ import os import re import configparser import time from typing import Optional, Dict # ============================================================ # CAMINHO DO ARQUIVO DE SEGREDOS (FALLBACK) # ============================================================ SEGREDOS_PATH = "/data/segredos.md" BOTVPS_HOST_PATH = "/app" # ============================================================ # FONTES DE CREDENCIAIS # ============================================================ CREDENTIAL_SOURCES = { "coolify": { "path": "/data/coolify/source/.env", "parser": "env", "description": "Coolify (Orquestrador)" }, "supabase": { "path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env", "parser": "env", "description": "Supabase (BaaS)" }, "gitea": { "path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini", "parser": "ini", "section": "security", "description": "Gitea (Git Server)" }, "logto": { "path": "/data/coolify/services/ea4tt75aeibqtu19hjqqw12f/.env", "parser": "env", "description": "Logto (Authentication)" } } # Coolify API COOLIFY_API_BASE = "http://localhost:8000/api" # ============================================================ # CACHE # ============================================================ _cache: Dict[str, str] = {} _cache_time: Dict[str, float] = {} CACHE_TTL = 300 # 5 minutos # ============================================================ # PARSER FUNCTIONS # ============================================================ def _read_env_file(path: str) -> Dict[str, str]: """Lê arquivo .env e retorna dict de variáveis.""" if not os.path.exists(path): return {} result = {} try: with open(path) as f: for line in f: line = line.strip() if line and "=" in line and not line.startswith("#"): key, _, value = line.partition("=") result[key.strip()] = value.strip() except Exception as e: print(f"Erro ao ler {path}: {e}") return result def _read_ini_file(path: str, section: str = "security") -> Dict[str, str]: """Lê arquivo INI (tipo Gitea) e retorna dict.""" if not os.path.exists(path): return {} parser = configparser.ConfigParser() try: parser.read(path) if parser.has_section(section): return dict(parser.items(section)) except Exception as e: print(f"Erro ao ler INI {path}: {e}") return {} def _get_cache_key(service: str, key: str) -> str: return f"{service}:{key}" # ============================================================ # SEGREDOS.MD PARSER (FALLBACK) # ============================================================ def _parse_segredos_md() -> Dict[str, Dict[str, str]]: """ Parsea o arquivo segredos.md e retorna credenciais estruturadas. Usa como fallback quando os caminhos originais não existem. """ # Tenta múltiplos caminhos possíveis paths_to_try = [ SEGREDOS_PATH, "/root/segredos.md", "/host/segredos.md", "/data/segredos.md", f"{BOTVPS_HOST_PATH}/segredos.md", "/app/segredos.md" ] segredos_path = None for p in paths_to_try: if os.path.exists(p): segredos_path = p break if not segredos_path: return {} try: with open(segredos_path, 'r', encoding='utf-8') as f: content = f.read() except Exception as e: print(f"Erro ao ler {segredos_path}: {e}") return {} result = { "coolify": {}, "supabase": {}, "gitea": {}, "logto": {}, "telegram": {}, "anthropic": {}, "elevenlabs": {}, "gpi": {} } # Padrões para extrair valores patterns = { "coolify": [ (r"APP_KEY[:\s]+[`']?([^\s`']+)", "APP_KEY"), (r"Database Password.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"), (r"Redis Password.*[:\s]+[`']?([^\s`']+)", "REDIS_PASSWORD"), (r"Pusher App ID.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_ID"), (r"Pusher App Key.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_KEY"), (r"Pusher App Secret.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_SECRET"), ], "supabase": [ (r"SERVICE_ROLE_KEY.*[:\s]+[`']?([^\s`']+)", "SERVICE_ROLE_KEY"), (r"ANON_KEY.*[:\s]+[`']?([^\s`']+)", "ANON_KEY"), (r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"), (r"MinIO.*Access Key.*[:\s]+[`']?([^\s`']+)", "MINIO_ACCESS_KEY"), (r"MinIO.*Secret Key.*[:\s]+[`']?([^\s`']+)", "MINIO_SECRET_KEY"), (r"Vault Encryption Key.*[:\s]+[`']?([^\s`']+)", "VAULT_KEY"), (r"Logflare API Key.*[:\s]+[`']?([^\s`']+)", "LOGFLARE_KEY"), ], "gitea": [ (r"Token de Acesso Pessoal.*[:\s]+[`']?([^\s`']+)", "PAT"), (r"Internal Token.*[:\s]+[`']?([^\s`']+)", "INTERNAL_TOKEN"), (r"OAuth2 JWT Secret.*[:\s]+[`']?([^\s`']+)", "OAUTH2_SECRET"), (r"LFS JWT Secret.*[:\s]+[`']?([^\s`']+)", "LFS_SECRET"), ], "logto": [ (r"Logto.*Usuário.*[:\s]+[`']?([^\s`']+)", "DB_USER"), (r"Logto.*Senha.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"), ], "telegram": [ (r"Bot Token.*[:\s]+[`']?([^\s`']+)", "BOT_TOKEN"), (r"Chat ID.*[:\s]+[`']?([^\s`']+)", "CHAT_ID"), ], "anthropic": [ (r"ANTHROPIC_API_KEY.*[:\s]+[`']?([^\s`']+)", "ANTHROPIC_API_KEY"), ], "elevenlabs": [ (r"ELEVENLABS_API_KEY.*[:\s]+[`']?([^\s`']+)", "ELEVENLABS_API_KEY"), (r"Voz Escolhida.*[:\s]+[`']?([^\s`']+)", "VOICE_ID"), ], "gpi": [ (r"MongoDB URI.*[:\s]+[`']?([^\s`']+)", "MONGODB_URI"), (r"Clerk Publishable Key.*[:\s]+[`']?([^\s`']+)", "CLERK_KEY"), (r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"), ] } for service, service_patterns in patterns.items(): for pattern, key_name in service_patterns: match = re.search(pattern, content, re.IGNORECASE) if match: result[service][key_name] = match.group(1) return result # Cache para segredos parseados _segredos_cache: Dict[str, Dict[str, str]] = {} _segredos_cache_time: float = 0 def get_segredos() -> Dict[str, Dict[str, str]]: """Retorna credenciais parseadas do segredos.md com cache.""" global _segredos_cache, _segredos_cache_time if time.time() - _segredos_cache_time < CACHE_TTL and _segredos_cache: return _segredos_cache _segredos_cache = _parse_segredos_md() _segredos_cache_time = time.time() return _segredos_cache def get_segredo(service: str, key: str) -> Optional[str]: """Busca uma credencial específica do segredos.md.""" segredos = get_segredos() service_creds = segredos.get(service) if service_creds: return service_creds.get(key) return None # ============================================================ # CREDENTIAL FUNCTIONS # ============================================================ def get_credential(service: str, key: str, use_cache: bool = True, force_reload: bool = False) -> Optional[str]: """ Busca credencial diretamente da fonte original. Args: service: Nome do serviço (coolify, gitea, supabase, logto) key: Nome da variável/campo use_cache: Se True, usa cache em memória (TTL 5 min) force_reload: Se True, ignora cache e recarrega Returns: Valor da credencial ou None se não encontrada """ global _cache, _cache_time cache_key = _get_cache_key(service, key) # Verifica cache if use_cache and not force_reload and cache_key in _cache: if time.time() - _cache_time.get(cache_key, 0) < CACHE_TTL: return _cache[cache_key] # Busca na fonte source = CREDENTIAL_SOURCES.get(service) if not source: return None if source["parser"] == "env": data = _read_env_file(source["path"]) else: # ini section = source.get("section", "security") data = _read_ini_file(source["path"], section) value = data.get(key) # Atualiza cache if value is not None: _cache[cache_key] = value _cache_time[cache_key] = time.time() return value def get_all_credentials(service: str, use_cache: bool = True) -> Dict[str, str]: """Retorna todas as credenciais de um serviço.""" source = CREDENTIAL_SOURCES.get(service) if not source: return {} if source["parser"] == "env": return _read_env_file(source["path"]) return _read_ini_file(source["path"], source.get("section", "security")) def get_multiple(service: str, keys: list, use_cache: bool = True) -> Dict[str, Optional[str]]: """Busca múltiplas credenciais de um serviço.""" return {key: get_credential(service, key, use_cache) for key in keys} def clear_cache(): """Limpa cache de credenciais (útil após update no Coolify).""" global _cache, _cache_time _cache = {} _cache_time = {} def reload_credential(service: str, key: str) -> Optional[str]: """Recarrega uma credencial específica, ignorando cache.""" return get_credential(service, key, use_cache=False, force_reload=True) # ============================================================ # HELPER FUNCTIONS - SERVIÇOS COMUNS # ============================================================ def gitea_token() -> str: """Retorna token de acesso do Gitea.""" token = get_credential("gitea", "INSTALL_LOCK") if not token: token = get_credential("gitea", "TOKEN") if not token: token = get_segredo("gitea", "PAT") return token or "" def gitea_url() -> str: """Retorna URL base do Gitea.""" return "https://git.reifonas.cloud" def gitea_api_url() -> str: """Retorna URL da API do Gitea.""" return f"{gitea_url()}/api/v1" def supabase_url() -> str: """Retorna URL base do Supabase.""" return "https://supabase.reifonas.cloud" def supabase_anon_key() -> str: """Retorna ANON_KEY do Supabase.""" key = get_credential("supabase", "ANON_KEY") if not key: key = get_segredo("supabase", "ANON_KEY") return key or "" def supabase_service_role_key() -> str: """Retorna SERVICE_ROLE_KEY do Supabase.""" key = get_credential("supabase", "SERVICE_ROLE_KEY") if not key: key = get_segredo("supabase", "SERVICE_ROLE_KEY") return key or "" def supabase_jwt_secret() -> str: """Retorna JWT_SECRET do Supabase.""" secret = get_credential("supabase", "JWT_SECRET") if not secret: secret = get_segredo("supabase", "JWT_SECRET") return secret or "" def coolify_app_key() -> str: """Retorna APP_KEY do Coolify.""" key = get_credential("coolify", "APP_KEY") if not key: key = get_segredo("coolify", "APP_KEY") return key or "" def coolify_api_base() -> str: """Retorna URL base da API do Coolify.""" return COOLIFY_API_BASE # ============================================================ # COOLIFY API HELPERS # ============================================================ def coolify_api(endpoint: str, method: str = "GET", data: dict = None) -> dict: """ Faz requisição à API do Coolify. Args: endpoint: Endpoint da API (ex: "/deployments", "/applications") method: GET, POST, DELETE, etc. data: Dados para enviar (JSON) Returns: Resposta da API como dict """ import requests url = f"{COOLIFY_API_BASE}{endpoint}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {coolify_app_key()}" } try: if method == "GET": res = requests.get(url, headers=headers, timeout=30) elif method == "POST": res = requests.post(url, headers=headers, json=data, timeout=30) elif method == "DELETE": res = requests.delete(url, headers=headers, timeout=30) else: return {"error": f"Método {method} não suportado"} if res.status_code in [200, 201]: return res.json() if res.text else {"success": True} return {"error": f"Status {res.status_code}", "detail": res.text} except Exception as e: return {"error": str(e)} def coolify_list_applications() -> list: """Lista aplicações no Coolify.""" result = coolify_api("/applications") if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] def coolify_list_deployments() -> list: """Lista deployments recentes.""" result = coolify_api("/deployments") if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] def coolify_get_status() -> dict: """Retorna status geral do Coolify.""" return coolify_api("/status") # ============================================================ # SYNC FUNCTION # ============================================================ def sync_credentials() -> dict: """ Força sync de todas as credenciais. Limpa cache e recarrega. Returns: Status do sync """ clear_cache() result = { "status": "synced", "services": {}, "timestamp": time.time() } for service in CREDENTIAL_SOURCES: try: creds = get_all_credentials(service, use_cache=False) result["services"][service] = { "status": "ok", "keys": len(creds) } except Exception as e: result["services"][service] = { "status": "error", "error": str(e) } return result # ============================================================ # STATUS # ============================================================ def get_services_status() -> dict: """Retorna status de todos os serviços.""" status = {} segredos = get_segredos() for service_id, source in CREDENTIAL_SOURCES.items(): path = source["path"] exists = os.path.exists(path) keys_count = 0 if exists: creds = get_all_credentials(service_id) keys_count = len(creds) segredos_keys = len(segredos.get(service_id, {})) from_segredos = segredos_keys > 0 status[service_id] = { "description": source["description"], "path": path, "exists": exists, "keys_count": keys_count, "from_segredos": from_segredos, "segredos_keys": segredos_keys } return status # ============================================================ # MAIN TEST # ============================================================ if __name__ == "__main__": print("=== Credential Manager Test ===") print(f"\nStatus dos serviços:") for service, info in get_services_status().items(): print(f" {service}: {'✅' if info['exists'] else '❌'} ({info['keys_count']} chaves)") print(f"\nCredenciais carregadas:") print(f" Gitea URL: {gitea_url()}") print(f" Gitea Token: {'***' + gitea_token()[-8:] if gitea_token() else 'N/A'}") print(f" Supabase URL: {supabase_url()}") print(f" Supabase Anon Key: {'***' + supabase_anon_key()[-8:] if supabase_anon_key() else 'N/A'}") print(f" Coolify API: {coolify_api_base()}")