From 62f90a381c10f7ef20a7622793ae6e0397d15bda Mon Sep 17 00:00:00 2001 From: Marcos Date: Sun, 22 Mar 2026 16:18:18 -0300 Subject: [PATCH] Add segredos.md parser as fallback for credentials --- credential_manager.py | 173 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 163 insertions(+), 10 deletions(-) diff --git a/credential_manager.py b/credential_manager.py index 12d8b00..e9a70c6 100644 --- a/credential_manager.py +++ b/credential_manager.py @@ -5,10 +5,18 @@ # ============================================================ import os +import re import configparser import time from typing import Optional, Dict +# ============================================================ +# CAMINHO DO ARQUIVO DE SEGREDOS (FALLBACK) +# ============================================================ + +SEGREDOS_PATH = "/data/segredos.md" +BOTVPS_HOST_PATH = "/app" + # ============================================================ # FONTES DE CREDENCIAIS # ============================================================ @@ -89,6 +97,130 @@ def _read_ini_file(path: str, section: str = "security") -> Dict[str, str]: def _get_cache_key(service: str, key: str) -> str: return f"{service}:{key}" +# ============================================================ +# SEGREDOS.MD PARSER (FALLBACK) +# ============================================================ + +def _parse_segredos_md() -> Dict[str, Dict[str, str]]: + """ + Parsea o arquivo segredos.md e retorna credenciais estruturadas. + Usa como fallback quando os caminhos originais não existem. + """ + # Tenta múltiplos caminhos possíveis + paths_to_try = [ + SEGREDOS_PATH, + "/root/segredos.md", + "/host/segredos.md", + "/data/segredos.md", + f"{BOTVPS_HOST_PATH}/segredos.md", + "/app/segredos.md" + ] + + segredos_path = None + for p in paths_to_try: + if os.path.exists(p): + segredos_path = p + break + + if not segredos_path: + return {} + + try: + with open(segredos_path, 'r', encoding='utf-8') as f: + content = f.read() + except Exception as e: + print(f"Erro ao ler {segredos_path}: {e}") + return {} + + result = { + "coolify": {}, + "supabase": {}, + "gitea": {}, + "logto": {}, + "telegram": {}, + "anthropic": {}, + "elevenlabs": {}, + "gpi": {} + } + + # Padrões para extrair valores + patterns = { + "coolify": [ + (r"APP_KEY[:\s]+[`']?([^\s`']+)", "APP_KEY"), + (r"Database Password.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"), + (r"Redis Password.*[:\s]+[`']?([^\s`']+)", "REDIS_PASSWORD"), + (r"Pusher App ID.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_ID"), + (r"Pusher App Key.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_KEY"), + (r"Pusher App Secret.*[:\s]+[`']?([^\s`']+)", "PUSHER_APP_SECRET"), + ], + "supabase": [ + (r"SERVICE_ROLE_KEY.*[:\s]+[`']?([^\s`']+)", "SERVICE_ROLE_KEY"), + (r"ANON_KEY.*[:\s]+[`']?([^\s`']+)", "ANON_KEY"), + (r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"), + (r"MinIO.*Access Key.*[:\s]+[`']?([^\s`']+)", "MINIO_ACCESS_KEY"), + (r"MinIO.*Secret Key.*[:\s]+[`']?([^\s`']+)", "MINIO_SECRET_KEY"), + (r"Vault Encryption Key.*[:\s]+[`']?([^\s`']+)", "VAULT_KEY"), + (r"Logflare API Key.*[:\s]+[`']?([^\s`']+)", "LOGFLARE_KEY"), + ], + "gitea": [ + (r"Token de Acesso Pessoal.*[:\s]+[`']?([^\s`']+)", "PAT"), + (r"Internal Token.*[:\s]+[`']?([^\s`']+)", "INTERNAL_TOKEN"), + (r"OAuth2 JWT Secret.*[:\s]+[`']?([^\s`']+)", "OAUTH2_SECRET"), + (r"LFS JWT Secret.*[:\s]+[`']?([^\s`']+)", "LFS_SECRET"), + ], + "logto": [ + (r"Logto.*Usuário.*[:\s]+[`']?([^\s`']+)", "DB_USER"), + (r"Logto.*Senha.*[:\s]+[`']?([^\s`']+)", "DB_PASSWORD"), + ], + "telegram": [ + (r"Bot Token.*[:\s]+[`']?([^\s`']+)", "BOT_TOKEN"), + (r"Chat ID.*[:\s]+[`']?([^\s`']+)", "CHAT_ID"), + ], + "anthropic": [ + (r"ANTHROPIC_API_KEY.*[:\s]+[`']?([^\s`']+)", "ANTHROPIC_API_KEY"), + ], + "elevenlabs": [ + (r"ELEVENLABS_API_KEY.*[:\s]+[`']?([^\s`']+)", "ELEVENLABS_API_KEY"), + (r"Voz Escolhida.*[:\s]+[`']?([^\s`']+)", "VOICE_ID"), + ], + "gpi": [ + (r"MongoDB URI.*[:\s]+[`']?([^\s`']+)", "MONGODB_URI"), + (r"Clerk Publishable Key.*[:\s]+[`']?([^\s`']+)", "CLERK_KEY"), + (r"JWT Secret.*[:\s]+[`']?([^\s`']+)", "JWT_SECRET"), + ] + } + + for service, service_patterns in patterns.items(): + for pattern, key_name in service_patterns: + match = re.search(pattern, content, re.IGNORECASE) + if match: + result[service][key_name] = match.group(1) + + return result + +# Cache para segredos parseados +_segredos_cache: Dict[str, Dict[str, str]] = {} +_segredos_cache_time: float = 0 + +def get_segredos() -> Dict[str, Dict[str, str]]: + """Retorna credenciais parseadas do segredos.md com cache.""" + global _segredos_cache, _segredos_cache_time + + if time.time() - _segredos_cache_time < CACHE_TTL and _segredos_cache: + return _segredos_cache + + _segredos_cache = _parse_segredos_md() + _segredos_cache_time = time.time() + return _segredos_cache + +def get_segredo(service: str, key: str) -> Optional[str]: + """Busca uma credencial específica do segredos.md.""" + segredos = get_segredos() + service_creds = segredos.get(service) + if service_creds: + return service_creds.get(key) + return None + # ============================================================ # CREDENTIAL FUNCTIONS # ============================================================ @@ -165,10 +297,11 @@ def reload_credential(service: str, key: str) -> Optional[str]: def gitea_token() -> str: """Retorna token de acesso do Gitea.""" - # Tenta buscar do app.ini (INSTALL_LOCK ou TOKEN) token = get_credential("gitea", "INSTALL_LOCK") if not token: token = get_credential("gitea", "TOKEN") + if not token: + token = get_segredo("gitea", "PAT") return token or "" def gitea_url() -> str: @@ -185,19 +318,31 @@ def supabase_url() -> str: def supabase_anon_key() -> str: """Retorna ANON_KEY do Supabase.""" - return get_credential("supabase", "ANON_KEY") or "" + key = get_credential("supabase", "ANON_KEY") + if not key: + key = get_segredo("supabase", "ANON_KEY") + return key or "" def supabase_service_role_key() -> str: """Retorna SERVICE_ROLE_KEY do Supabase.""" - return get_credential("supabase", "SERVICE_ROLE_KEY") or "" + key = get_credential("supabase", "SERVICE_ROLE_KEY") + if not key: + key = get_segredo("supabase", "SERVICE_ROLE_KEY") + return key or "" def supabase_jwt_secret() -> str: """Retorna JWT_SECRET do Supabase.""" - return get_credential("supabase", "JWT_SECRET") or "" + secret = get_credential("supabase", "JWT_SECRET") + if not secret: + secret = get_segredo("supabase", "JWT_SECRET") + return secret or "" def coolify_app_key() -> str: """Retorna APP_KEY do Coolify.""" - return get_credential("coolify", "APP_KEY") or "" + key = get_credential("coolify", "APP_KEY") + if not key: + key = get_segredo("coolify", "APP_KEY") + return key or "" def coolify_api_base() -> str: """Retorna URL base da API do Coolify.""" @@ -304,20 +449,28 @@ def sync_credentials() -> dict: def get_services_status() -> dict: """Retorna status de todos os serviços.""" status = {} + segredos = get_segredos() for service_id, source in CREDENTIAL_SOURCES.items(): path = source["path"] exists = os.path.exists(path) + keys_count = 0 + + if exists: + creds = get_all_credentials(service_id) + keys_count = len(creds) + + segredos_keys = len(segredos.get(service_id, {})) + from_segredos = segredos_keys > 0 + status[service_id] = { "description": source["description"], "path": path, "exists": exists, - "keys_count": 0 + "keys_count": keys_count, + "from_segredos": from_segredos, + "segredos_keys": segredos_keys } - - if exists: - creds = get_all_credentials(service_id) - status[service_id]["keys_count"] = len(creds) return status