Add Gitea repo sync for credentials (admtracksteel/Keys)
This commit is contained in:
@@ -6,8 +6,10 @@
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import configparser
|
||||
import time
|
||||
import requests
|
||||
from typing import Optional, Dict
|
||||
|
||||
# ============================================================
|
||||
@@ -17,6 +19,63 @@ from typing import Optional, Dict
|
||||
SEGREDOS_PATH = "/data/segredos.md"
|
||||
BOTVPS_HOST_PATH = "/app"
|
||||
|
||||
# ============================================================
|
||||
# GITEA REPO CREDENTIALS (FONTE PRINCIPAL)
|
||||
# ============================================================
|
||||
|
||||
GITEA_CREDS_REPO = "admtracksteel/Keys"
|
||||
GITEA_CREDS_FILE = "credentials.json"
|
||||
_gitea_creds_cache: Dict[str, str] = {}
|
||||
_gitea_creds_cache_time: float = 0
|
||||
|
||||
def get_gitea_creds_url() -> str:
|
||||
"""Retorna URL da API do Gitea."""
|
||||
return "https://git.reifonas.cloud/api/v1"
|
||||
|
||||
def fetch_from_gitea_repo(force: bool = False) -> Dict[str, Dict[str, str]]:
|
||||
"""
|
||||
Busca credenciais do repo Gitea admtracksteel/Keys.
|
||||
Faz cache com TTL de 5 minutos.
|
||||
"""
|
||||
global _gitea_creds_cache, _gitea_creds_cache_time
|
||||
|
||||
# Verifica cache
|
||||
if not force and time.time() - _gitea_creds_cache_time < CACHE_TTL and _gitea_creds_cache:
|
||||
return _gitea_creds_cache
|
||||
|
||||
try:
|
||||
# Obtém token do Gitea
|
||||
from credential_manager import gitea_token
|
||||
token = gitea_token()
|
||||
|
||||
# Busca arquivo no repo
|
||||
url = f"{get_gitea_creds_url()}/repos/admtracksteel/Keys/contents/{GITEA_CREDS_FILE}"
|
||||
headers = {"Authorization": f"token {token}"} if token else {}
|
||||
|
||||
response = requests.get(url, headers=headers, timeout=30)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
# Conteúdo está em base64
|
||||
import base64
|
||||
content_b64 = data.get("content", "").replace("\n", "")
|
||||
content = base64.b64decode(content_b64).decode("utf-8")
|
||||
_gitea_creds_cache = json.loads(content)
|
||||
_gitea_creds_cache_time = time.time()
|
||||
print(f"[CREDMAN] Credenciais carregadas do repo Gitea ({len(_gitea_creds_cache)} serviços)")
|
||||
return _gitea_creds_cache
|
||||
else:
|
||||
print(f"[CREDMAN] Erro ao buscar repo Gitea: {response.status_code}")
|
||||
except Exception as e:
|
||||
print(f"[CREDMAN] Erro ao fetch_from_gitea_repo: {e}")
|
||||
|
||||
return _gitea_creds_cache if _gitea_creds_cache else {}
|
||||
|
||||
def get_gitea_cred(service: str, key: str, force: bool = False) -> Optional[str]:
|
||||
"""Busca credencial específica do repo Gitea."""
|
||||
creds = fetch_from_gitea_repo(force)
|
||||
return creds.get(service, {}).get(key)
|
||||
|
||||
# ============================================================
|
||||
# FONTES DE CREDENCIAIS
|
||||
# ============================================================
|
||||
@@ -297,7 +356,9 @@ def reload_credential(service: str, key: str) -> Optional[str]:
|
||||
|
||||
def gitea_token() -> str:
|
||||
"""Retorna token de acesso do Gitea."""
|
||||
token = get_credential("gitea", "INSTALL_LOCK")
|
||||
token = get_gitea_cred("gitea", "TOKEN")
|
||||
if not token:
|
||||
token = get_credential("gitea", "INSTALL_LOCK")
|
||||
if not token:
|
||||
token = get_credential("gitea", "TOKEN")
|
||||
if not token:
|
||||
@@ -318,28 +379,36 @@ def supabase_url() -> str:
|
||||
|
||||
def supabase_anon_key() -> str:
|
||||
"""Retorna ANON_KEY do Supabase."""
|
||||
key = get_credential("supabase", "ANON_KEY")
|
||||
key = get_gitea_cred("supabase", "ANON_KEY")
|
||||
if not key:
|
||||
key = get_credential("supabase", "ANON_KEY")
|
||||
if not key:
|
||||
key = get_segredo("supabase", "ANON_KEY")
|
||||
return key or ""
|
||||
|
||||
def supabase_service_role_key() -> str:
|
||||
"""Retorna SERVICE_ROLE_KEY do Supabase."""
|
||||
key = get_credential("supabase", "SERVICE_ROLE_KEY")
|
||||
key = get_gitea_cred("supabase", "SERVICE_ROLE_KEY")
|
||||
if not key:
|
||||
key = get_credential("supabase", "SERVICE_ROLE_KEY")
|
||||
if not key:
|
||||
key = get_segredo("supabase", "SERVICE_ROLE_KEY")
|
||||
return key or ""
|
||||
|
||||
def supabase_jwt_secret() -> str:
|
||||
"""Retorna JWT_SECRET do Supabase."""
|
||||
secret = get_credential("supabase", "JWT_SECRET")
|
||||
secret = get_gitea_cred("supabase", "JWT_SECRET")
|
||||
if not secret:
|
||||
secret = get_credential("supabase", "JWT_SECRET")
|
||||
if not secret:
|
||||
secret = get_segredo("supabase", "JWT_SECRET")
|
||||
return secret or ""
|
||||
|
||||
def coolify_app_key() -> str:
|
||||
"""Retorna APP_KEY do Coolify."""
|
||||
key = get_credential("coolify", "APP_KEY")
|
||||
key = get_gitea_cred("coolify", "APP_KEY")
|
||||
if not key:
|
||||
key = get_credential("coolify", "APP_KEY")
|
||||
if not key:
|
||||
key = get_segredo("coolify", "APP_KEY")
|
||||
return key or ""
|
||||
@@ -442,6 +511,37 @@ def sync_credentials() -> dict:
|
||||
|
||||
return result
|
||||
|
||||
# ============================================================
|
||||
# GITEA REPO SYNC
|
||||
# ============================================================
|
||||
|
||||
def sync_from_gitea_repo(force: bool = False) -> dict:
|
||||
"""
|
||||
Força sincronização do repo Gitea admtracksteel/Keys.
|
||||
Retorna status do sync.
|
||||
"""
|
||||
global _gitea_creds_cache, _gitea_creds_cache_time
|
||||
|
||||
clear_cache()
|
||||
_gitea_creds_cache_time = 0
|
||||
|
||||
creds = fetch_from_gitea_repo(force=force)
|
||||
|
||||
services = list(creds.keys())
|
||||
|
||||
return {
|
||||
"status": "synced" if creds else "failed",
|
||||
"repo": GITEA_CREDS_REPO,
|
||||
"file": GITEA_CREDS_FILE,
|
||||
"services_count": len(creds),
|
||||
"services": services,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
|
||||
def get_gitea_repo_credentials() -> Dict[str, Dict[str, str]]:
|
||||
"""Retorna todas as credenciais do repo Gitea."""
|
||||
return fetch_from_gitea_repo()
|
||||
|
||||
# ============================================================
|
||||
# STATUS
|
||||
# ============================================================
|
||||
@@ -450,6 +550,7 @@ def get_services_status() -> dict:
|
||||
"""Retorna status de todos os serviços."""
|
||||
status = {}
|
||||
segredos = get_segredos()
|
||||
gitea_creds = get_gitea_repo_credentials()
|
||||
|
||||
for service_id, source in CREDENTIAL_SOURCES.items():
|
||||
path = source["path"]
|
||||
@@ -461,17 +562,27 @@ def get_services_status() -> dict:
|
||||
keys_count = len(creds)
|
||||
|
||||
segredos_keys = len(segredos.get(service_id, {}))
|
||||
from_segredos = segredos_keys > 0
|
||||
gitea_keys = len(gitea_creds.get(service_id, {}))
|
||||
|
||||
status[service_id] = {
|
||||
"description": source["description"],
|
||||
"path": path,
|
||||
"exists": exists,
|
||||
"keys_count": keys_count,
|
||||
"from_segredos": from_segredos,
|
||||
"from_gitea_repo": gitea_keys > 0,
|
||||
"gitea_keys": gitea_keys,
|
||||
"from_segredos": segredos_keys > 0,
|
||||
"segredos_keys": segredos_keys
|
||||
}
|
||||
|
||||
status["gitea_repo"] = {
|
||||
"description": "Repo Git (admtracksteel/Keys)",
|
||||
"repo": GITEA_CREDS_REPO,
|
||||
"file": GITEA_CREDS_FILE,
|
||||
"available": len(gitea_creds) > 0,
|
||||
"services_count": len(gitea_creds)
|
||||
}
|
||||
|
||||
return status
|
||||
|
||||
# ============================================================
|
||||
|
||||
Reference in New Issue
Block a user