151 lines
5.8 KiB
Python
151 lines
5.8 KiB
Python
import os
|
|
import re
|
|
import json
|
|
import configparser
|
|
import time
|
|
import httpx
|
|
import asyncio
|
|
from typing import Optional, Dict
|
|
|
|
# ============================================================
|
|
# CONFIGURATIONS & PATHS
|
|
# ============================================================
|
|
|
|
SEGREDOS_PATH = "/data/segredos.md"
|
|
BOTVPS_HOST_PATH = "/app"
|
|
CACHE_TTL = 300 # 5 minutos
|
|
GITEA_API_URL = "https://git.reifonas.cloud/api/v1"
|
|
|
|
# CACHES
|
|
_gitea_creds_cache: Dict[str, Dict] = {}
|
|
_gitea_creds_cache_time: float = 0
|
|
_local_cache: Dict[str, str] = {}
|
|
_local_cache_time: Dict[str, float] = {}
|
|
|
|
# ============================================================
|
|
# GITEA CORE (FONTE PRINCIPAL)
|
|
# ============================================================
|
|
|
|
async def fetch_from_gitea_repo_async(force: bool = False) -> Dict:
|
|
global _gitea_creds_cache, _gitea_creds_cache_time
|
|
|
|
if not force and time.time() - _gitea_creds_cache_time < CACHE_TTL and _gitea_creds_cache:
|
|
return _gitea_creds_cache
|
|
|
|
try:
|
|
from credential_manager import gitea_token
|
|
token = gitea_token()
|
|
url = f"{GITEA_API_URL}/repos/admtracksteel/Keys/contents/credentials.json"
|
|
headers = {"Authorization": f"token {token}"} if token else {}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
res = await client.get(url, headers=headers, timeout=20)
|
|
if res.status_code == 200:
|
|
import base64
|
|
content_b64 = res.json().get("content", "").replace("\n", "")
|
|
_gitea_creds_cache = json.loads(base64.b64decode(content_b64).decode())
|
|
_gitea_creds_cache_time = time.time()
|
|
return _gitea_creds_cache
|
|
except Exception as e:
|
|
print(f"Error fetching Gitea creds: {e}")
|
|
return _gitea_creds_cache
|
|
|
|
def gitea_token() -> str:
|
|
# Ordem de prioridade: Gitea INI -> segredos.md -> Env
|
|
token = get_credential("gitea", "INTERNAL_TOKEN") # Exemplo
|
|
if not token: token = get_segredo("gitea", "PAT")
|
|
return token or os.getenv("GITEA_TOKEN", "")
|
|
|
|
# ============================================================
|
|
# FALLBACK: SEGREDOS.MD PARSER
|
|
# ============================================================
|
|
|
|
def get_segredos() -> Dict:
|
|
paths = [SEGREDOS_PATH, "/root/segredos.md", "/app/segredos.md"]
|
|
for p in paths:
|
|
if os.path.exists(p):
|
|
try:
|
|
with open(p, 'r') as f:
|
|
content = f.read()
|
|
return _parse_content(content)
|
|
except: pass
|
|
return {}
|
|
|
|
def _parse_content(content: str) -> Dict:
|
|
# Parser simplificado por regex
|
|
res = {"coolify": {}, "supabase": {}, "gitea": {}, "telegram": {}}
|
|
patterns = {
|
|
"coolify": [("APP_KEY", r"APP_KEY[:\s]+[`']?([^\s`']+)")],
|
|
"supabase": [("ANON_KEY", r"ANON_KEY[:\s]+[`']?([^\s`']+)")],
|
|
"telegram": [("BOT_TOKEN", r"Bot Token[:\s]+[`']?([^\s`']+)")],
|
|
"gitea": [("PAT", r"Token de Acesso Pessoal[:\s]+[`']?([^\s`']+)")],
|
|
}
|
|
for svc, pairs in patterns.items():
|
|
for key, pat in pairs:
|
|
m = re.search(pat, content, re.I)
|
|
if m: res[svc][key] = m.group(1)
|
|
return res
|
|
|
|
def get_segredo(service: str, key: str) -> Optional[str]:
|
|
return get_segredos().get(service, {}).get(key)
|
|
|
|
# ============================================================
|
|
# LOCAL FILES (.ENV / .INI)
|
|
# ============================================================
|
|
|
|
CREDENTIAL_SOURCES = {
|
|
"coolify": {"path": "/data/coolify/source/.env", "type": "env"},
|
|
"supabase": {"path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env", "type": "env"},
|
|
"gitea": {"path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini", "type": "ini", "section": "security"}
|
|
}
|
|
|
|
def get_credential(service: str, key: str) -> Optional[str]:
|
|
source = CREDENTIAL_SOURCES.get(service)
|
|
if not source or not os.path.exists(source["path"]): return None
|
|
|
|
try:
|
|
if source["type"] == "env":
|
|
with open(source["path"]) as f:
|
|
for line in f:
|
|
if line.startswith(f"{key}="): return line.split("=")[1].strip()
|
|
elif source["type"] == "ini":
|
|
cp = configparser.ConfigParser()
|
|
cp.read(source["path"])
|
|
return cp.get(source.get("section", "DEFAULT"), key, fallback=None)
|
|
except: pass
|
|
return None
|
|
|
|
# ============================================================
|
|
# API HELPERS (ASYNC)
|
|
# ============================================================
|
|
|
|
async def coolify_api_async(endpoint: str, method: str = "GET", data: dict = None) -> dict:
|
|
from credential_manager import coolify_app_key
|
|
url = f"http://localhost:8000/api{endpoint}"
|
|
headers = {"Authorization": f"Bearer {coolify_app_key()}"}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
if method == "GET": res = await client.get(url, headers=headers)
|
|
else: res = await client.request(method, url, headers=headers, json=data)
|
|
return res.json() if res.status_code == 200 else {"error": res.status_code}
|
|
except Exception as e: return {"error": str(e)}
|
|
|
|
def coolify_app_key():
|
|
return asyncio.run(fetch_from_gitea_repo_async()).get("coolify", {}).get("APP_KEY") or get_segredo("coolify", "APP_KEY")
|
|
|
|
# --- SYNC WRAPPERS ---
|
|
def sync_credentials():
|
|
return asyncio.run(fetch_from_gitea_repo_async(force=True))
|
|
|
|
def sync_from_gitea_repo(force=False):
|
|
return asyncio.run(fetch_from_gitea_repo_async(force=force))
|
|
|
|
def get_services_status():
|
|
return {"gitea_repo": "active", "local_files": "checked", "segredos": "available"}
|
|
|
|
def gitea_api_url(): return GITEA_API_URL
|
|
def supabase_url(): return "https://supabase.reifonas.cloud"
|
|
def supabase_anon_key(): return get_segredo("supabase", "ANON_KEY")
|
|
def supabase_service_role_key(): return get_segredo("supabase", "SERVICE_ROLE_KEY")
|