diff --git a/.env.test b/.env.test new file mode 100644 index 0000000..5c61289 --- /dev/null +++ b/.env.test @@ -0,0 +1,6 @@ +# Configuração Local de Teste +TELEGRAM_BOT_TOKEN=8272877120:AAEKPhLGZPyj8XL9HGSowGLAFzXZPtXHMa4 +TELEGRAM_CHAT_ID=8768212834 +GEMINI_API_KEY=AIzaSyDummyKeyForTesting123456789 +OLLAMA_HOST=http://localhost:11434 +OLLAMA_MODEL=qwen2.5-coder:1.5b diff --git a/ai_agent.py b/ai_agent.py index 111cc4a..ae0d76f 100644 --- a/ai_agent.py +++ b/ai_agent.py @@ -11,21 +11,30 @@ def get_llm_response(prompt: str, provider: str, cfg: dict) -> str: api_key = cfg.get("gemini_api_key") or os.getenv("GEMINI_API_KEY") url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={api_key}" payload = {"contents": [{"parts": [{"text": prompt}]}]} - res = requests.post(url, json=payload) - if res.status_code == 200: - return res.json()["candidates"][0]["content"]["parts"][0]["text"] - return f"Erro Gemini: {res.text}" + try: + res = requests.post(url, json=payload, timeout=30) + if res.status_code == 200: + data = res.json() + if "candidates" in data and len(data["candidates"]) > 0: + return data["candidates"][0]["content"]["parts"][0]["text"] + return f"Erro Gemini (Dados Vazios): {res.text}" + return f"Erro Gemini (Status {res.status_code}): {res.text}" + except Exception as e: + return f"Erro de Conexão Gemini: {str(e)}" elif provider == "ollama": ollama_host = os.getenv("OLLAMA_HOST", "http://ollama-lw4s8g4gc8gss4gkc4gg0wk4:11434") - res = requests.post(f"{ollama_host}/api/generate", json={ - "model": os.getenv("OLLAMA_MODEL", "qwen2.5-coder:1.5b"), - "prompt": prompt, - "stream": False - }) - if res.status_code == 200: - return res.json().get("response", "") - return f"Erro Ollama: {res.text}" + try: + res = requests.post(f"{ollama_host}/api/generate", json={ + "model": os.getenv("OLLAMA_MODEL", "qwen2.5-coder:1.5b"), + "prompt": prompt, + "stream": False + }, timeout=30) + if res.status_code == 200: + return res.json().get("response", "") + return f"Erro Ollama (Status {res.status_code}): {res.text}" + except Exception as e: + return f"Erro de Conexão Ollama: {str(e)}" return "Provedor desconhecido." @@ -92,6 +101,8 @@ Resposta: A memória RAM está operando com 20% de uso. print(f"--- INICIANDO AGENTE ({provider}) ---") for i in range(max_loops): + import time + time.sleep(0.5) # Respiro para a CPU print(f"\n[LOOP {i+1}/{max_loops}]") full_prompt = system_prompt + current_iteration_history response = get_llm_response(full_prompt, provider, cfg) diff --git a/cmd_payload.json b/cmd_payload.json new file mode 100644 index 0000000..0095c2f --- /dev/null +++ b/cmd_payload.json @@ -0,0 +1,4 @@ +{ + "password": "@@Gi05Br;;", + "message": "Agente, o serviço Ollama no Coolify está com status unhealthy. Por favor, rode o comando bash: docker logs ollama-lw4s8g4gc8gss4gkc4gg0wk4 --tail 100 e depois docker logs open-webui-lw4s8g4gc8gss4gkc4gg0wk4 --tail 100. Me diga qual erro você encontrou." +} diff --git a/credential_manager.py b/credential_manager.py new file mode 100644 index 0000000..12d8b00 --- /dev/null +++ b/credential_manager.py @@ -0,0 +1,339 @@ +# ============================================================ +# CREDENTIAL_MANAGER.PY - Gestão de Credenciais +# Lê credenciais da fonte original (.env do Coolify/Docker) +# NÃO ARMAZENA CREDENCIAIS - SEMPRE LÊ DA FONTE +# ============================================================ + +import os +import configparser +import time +from typing import Optional, Dict + +# ============================================================ +# FONTES DE CREDENCIAIS +# ============================================================ + +CREDENTIAL_SOURCES = { + "coolify": { + "path": "/data/coolify/source/.env", + "parser": "env", + "description": "Coolify (Orquestrador)" + }, + "supabase": { + "path": "/data/coolify/services/h0oggskgs0ws0sco8kc4s8ws/.env", + "parser": "env", + "description": "Supabase (BaaS)" + }, + "gitea": { + "path": "/var/lib/docker/volumes/yccsckck4g004gosccwc4kg4_gitea-data/_data/gitea/conf/app.ini", + "parser": "ini", + "section": "security", + "description": "Gitea (Git Server)" + }, + "logto": { + "path": "/data/coolify/services/ea4tt75aeibqtu19hjqqw12f/.env", + "parser": "env", + "description": "Logto (Authentication)" + } +} + +# Coolify API +COOLIFY_API_BASE = "http://localhost:8000/api" + +# ============================================================ +# CACHE +# ============================================================ + +_cache: Dict[str, str] = {} +_cache_time: Dict[str, float] = {} +CACHE_TTL = 300 # 5 minutos + +# ============================================================ +# PARSER FUNCTIONS +# ============================================================ + +def _read_env_file(path: str) -> Dict[str, str]: + """Lê arquivo .env e retorna dict de variáveis.""" + if not os.path.exists(path): + return {} + + result = {} + try: + with open(path) as f: + for line in f: + line = line.strip() + if line and "=" in line and not line.startswith("#"): + key, _, value = line.partition("=") + result[key.strip()] = value.strip() + except Exception as e: + print(f"Erro ao ler {path}: {e}") + + return result + +def _read_ini_file(path: str, section: str = "security") -> Dict[str, str]: + """Lê arquivo INI (tipo Gitea) e retorna dict.""" + if not os.path.exists(path): + return {} + + parser = configparser.ConfigParser() + try: + parser.read(path) + + if parser.has_section(section): + return dict(parser.items(section)) + except Exception as e: + print(f"Erro ao ler INI {path}: {e}") + + return {} + +def _get_cache_key(service: str, key: str) -> str: + return f"{service}:{key}" + +# ============================================================ +# CREDENTIAL FUNCTIONS +# ============================================================ + +def get_credential(service: str, key: str, use_cache: bool = True, force_reload: bool = False) -> Optional[str]: + """ + Busca credencial diretamente da fonte original. + + Args: + service: Nome do serviço (coolify, gitea, supabase, logto) + key: Nome da variável/campo + use_cache: Se True, usa cache em memória (TTL 5 min) + force_reload: Se True, ignora cache e recarrega + + Returns: + Valor da credencial ou None se não encontrada + """ + global _cache, _cache_time + + cache_key = _get_cache_key(service, key) + + # Verifica cache + if use_cache and not force_reload and cache_key in _cache: + if time.time() - _cache_time.get(cache_key, 0) < CACHE_TTL: + return _cache[cache_key] + + # Busca na fonte + source = CREDENTIAL_SOURCES.get(service) + if not source: + return None + + if source["parser"] == "env": + data = _read_env_file(source["path"]) + else: # ini + section = source.get("section", "security") + data = _read_ini_file(source["path"], section) + + value = data.get(key) + + # Atualiza cache + if value is not None: + _cache[cache_key] = value + _cache_time[cache_key] = time.time() + + return value + +def get_all_credentials(service: str, use_cache: bool = True) -> Dict[str, str]: + """Retorna todas as credenciais de um serviço.""" + source = CREDENTIAL_SOURCES.get(service) + if not source: + return {} + + if source["parser"] == "env": + return _read_env_file(source["path"]) + return _read_ini_file(source["path"], source.get("section", "security")) + +def get_multiple(service: str, keys: list, use_cache: bool = True) -> Dict[str, Optional[str]]: + """Busca múltiplas credenciais de um serviço.""" + return {key: get_credential(service, key, use_cache) for key in keys} + +def clear_cache(): + """Limpa cache de credenciais (útil após update no Coolify).""" + global _cache, _cache_time + _cache = {} + _cache_time = {} + +def reload_credential(service: str, key: str) -> Optional[str]: + """Recarrega uma credencial específica, ignorando cache.""" + return get_credential(service, key, use_cache=False, force_reload=True) + +# ============================================================ +# HELPER FUNCTIONS - SERVIÇOS COMUNS +# ============================================================ + +def gitea_token() -> str: + """Retorna token de acesso do Gitea.""" + # Tenta buscar do app.ini (INSTALL_LOCK ou TOKEN) + token = get_credential("gitea", "INSTALL_LOCK") + if not token: + token = get_credential("gitea", "TOKEN") + return token or "" + +def gitea_url() -> str: + """Retorna URL base do Gitea.""" + return "https://git.reifonas.cloud" + +def gitea_api_url() -> str: + """Retorna URL da API do Gitea.""" + return f"{gitea_url()}/api/v1" + +def supabase_url() -> str: + """Retorna URL base do Supabase.""" + return "https://supabase.reifonas.cloud" + +def supabase_anon_key() -> str: + """Retorna ANON_KEY do Supabase.""" + return get_credential("supabase", "ANON_KEY") or "" + +def supabase_service_role_key() -> str: + """Retorna SERVICE_ROLE_KEY do Supabase.""" + return get_credential("supabase", "SERVICE_ROLE_KEY") or "" + +def supabase_jwt_secret() -> str: + """Retorna JWT_SECRET do Supabase.""" + return get_credential("supabase", "JWT_SECRET") or "" + +def coolify_app_key() -> str: + """Retorna APP_KEY do Coolify.""" + return get_credential("coolify", "APP_KEY") or "" + +def coolify_api_base() -> str: + """Retorna URL base da API do Coolify.""" + return COOLIFY_API_BASE + +# ============================================================ +# COOLIFY API HELPERS +# ============================================================ + +def coolify_api(endpoint: str, method: str = "GET", data: dict = None) -> dict: + """ + Faz requisição à API do Coolify. + + Args: + endpoint: Endpoint da API (ex: "/deployments", "/applications") + method: GET, POST, DELETE, etc. + data: Dados para enviar (JSON) + + Returns: + Resposta da API como dict + """ + import requests + + url = f"{COOLIFY_API_BASE}{endpoint}" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {coolify_app_key()}" + } + + try: + if method == "GET": + res = requests.get(url, headers=headers, timeout=30) + elif method == "POST": + res = requests.post(url, headers=headers, json=data, timeout=30) + elif method == "DELETE": + res = requests.delete(url, headers=headers, timeout=30) + else: + return {"error": f"Método {method} não suportado"} + + if res.status_code in [200, 201]: + return res.json() if res.text else {"success": True} + return {"error": f"Status {res.status_code}", "detail": res.text} + + except Exception as e: + return {"error": str(e)} + +def coolify_list_applications() -> list: + """Lista aplicações no Coolify.""" + result = coolify_api("/applications") + if isinstance(result, dict) and "error" in result: + return [] + return result if isinstance(result, list) else [] + +def coolify_list_deployments() -> list: + """Lista deployments recentes.""" + result = coolify_api("/deployments") + if isinstance(result, dict) and "error" in result: + return [] + return result if isinstance(result, list) else [] + +def coolify_get_status() -> dict: + """Retorna status geral do Coolify.""" + return coolify_api("/status") + +# ============================================================ +# SYNC FUNCTION +# ============================================================ + +def sync_credentials() -> dict: + """ + Força sync de todas as credenciais. + Limpa cache e recarrega. + + Returns: + Status do sync + """ + clear_cache() + + result = { + "status": "synced", + "services": {}, + "timestamp": time.time() + } + + for service in CREDENTIAL_SOURCES: + try: + creds = get_all_credentials(service, use_cache=False) + result["services"][service] = { + "status": "ok", + "keys": len(creds) + } + except Exception as e: + result["services"][service] = { + "status": "error", + "error": str(e) + } + + return result + +# ============================================================ +# STATUS +# ============================================================ + +def get_services_status() -> dict: + """Retorna status de todos os serviços.""" + status = {} + + for service_id, source in CREDENTIAL_SOURCES.items(): + path = source["path"] + exists = os.path.exists(path) + status[service_id] = { + "description": source["description"], + "path": path, + "exists": exists, + "keys_count": 0 + } + + if exists: + creds = get_all_credentials(service_id) + status[service_id]["keys_count"] = len(creds) + + return status + +# ============================================================ +# MAIN TEST +# ============================================================ + +if __name__ == "__main__": + print("=== Credential Manager Test ===") + print(f"\nStatus dos serviços:") + for service, info in get_services_status().items(): + print(f" {service}: {'✅' if info['exists'] else '❌'} ({info['keys_count']} chaves)") + + print(f"\nCredenciais carregadas:") + print(f" Gitea URL: {gitea_url()}") + print(f" Gitea Token: {'***' + gitea_token()[-8:] if gitea_token() else 'N/A'}") + print(f" Supabase URL: {supabase_url()}") + print(f" Supabase Anon Key: {'***' + supabase_anon_key()[-8:] if supabase_anon_key() else 'N/A'}") + print(f" Coolify API: {coolify_api_base()}") diff --git a/llm_providers.py b/llm_providers.py new file mode 100644 index 0000000..8089bdf --- /dev/null +++ b/llm_providers.py @@ -0,0 +1,363 @@ +# ============================================================ +# LLM_PROVIDERS.PY - Abstração de Provedores de LLM +# Suporta: Gemini, OpenAI, Anthropic, Ollama (Local) +# ============================================================ + +import os +import requests +import json +from typing import Optional, Dict, List + +# ============================================================ +# CONFIGURAÇÃO DE PROVIDERS +# ============================================================ + +LLM_PROVIDERS = { + "gemini": { + "name": "Google Gemini", + "type": "api", + "models": ["gemini-2.5-flash", "gemini-2.0-pro", "gemini-1.5-flash"], + "default": "gemini-2.5-flash", + "endpoint": "https://generativelanguage.googleapis.com/v1beta/models" + }, + "openai": { + "name": "OpenAI GPT", + "type": "api", + "models": ["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"], + "default": "gpt-4o", + "endpoint": "https://api.openai.com/v1" + }, + "anthropic": { + "name": "Anthropic Claude", + "type": "api", + "models": ["claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022", "claude-3-opus-20240229"], + "default": "claude-3-5-sonnet-20241022", + "endpoint": "https://api.anthropic.com/v1" + }, + "ollama": { + "name": "Ollama (Local)", + "type": "local", + "endpoint": os.getenv("OLLAMA_HOST", "http://localhost:11434"), + "models": None, + "default": "qwen2.5-coder:1.5b" + } +} + +# ============================================================ +# CONFIG MANAGER +# ============================================================ + +CONFIG_FILE = "/app/data/config.json" + +def get_config() -> dict: + """Carrega configuração do orchestrator.""" + if not os.path.exists("/app/data"): + os.makedirs("/app/data", exist_ok=True) + + if os.path.exists(CONFIG_FILE): + try: + with open(CONFIG_FILE, "r") as f: + return json.load(f) + except Exception: + pass + + return { + "orchestrator": { + "planner": {"provider": "gemini", "model": "gemini-2.5-flash"}, + "executor": {"provider": "ollama", "model": "qwen2.5-coder:1.5b"} + }, + "api_keys": { + "openai": "", + "anthropic": "", + "gemini": "" + } + } + +def save_config(cfg: dict): + """Salva configuração do orchestrator.""" + if not os.path.exists("/app/data"): + os.makedirs("/app/data", exist_ok=True) + with open(CONFIG_FILE, "w") as f: + json.dump(cfg, f, indent=4) + +def get_orchestrator_config() -> dict: + """Retorna config do orchestrator.""" + cfg = get_config() + return cfg.get("orchestrator", { + "planner": {"provider": "gemini", "model": "gemini-2.5-flash"}, + "executor": {"provider": "ollama", "model": "qwen2.5-coder:1.5b"} + }) + +def set_planner(provider: str = None, model: str = None) -> dict: + """Define o provider e modelo do planner.""" + cfg = get_config() + if "orchestrator" not in cfg: + cfg["orchestrator"] = {} + + if provider: + cfg["orchestrator"]["planner"] = { + "provider": provider, + "model": model or LLM_PROVIDERS[provider]["default"] + } + save_config(cfg) + + return cfg["orchestrator"].get("planner", {"provider": "gemini", "model": "gemini-2.5-flash"}) + +def set_executor(provider: str = None, model: str = None) -> dict: + """Define o provider e modelo do executor.""" + cfg = get_config() + if "orchestrator" not in cfg: + cfg["orchestrator"] = {} + + if provider: + cfg["orchestrator"]["executor"] = { + "provider": provider, + "model": model or LLM_PROVIDERS[provider]["default"] + } + save_config(cfg) + + return cfg["orchestrator"].get("executor", {"provider": "ollama", "model": "qwen2.5-coder:1.5b"}) + return cfg["orchestrator"]["executor"] + +def set_api_key(provider: str, key: str): + """Armazena API key de um provider.""" + cfg = get_config() + if "api_keys" not in cfg: + cfg["api_keys"] = {} + cfg["api_keys"][provider] = key + save_config(cfg) + +def get_api_key(provider: str) -> str: + """Busca API key de um provider (config ou env var).""" + cfg = get_config() + + # Primeiro verifica config + api_keys = cfg.get("api_keys", {}) + if api_keys.get(provider): + return api_keys[provider] + + # Fallback para environment variable + env_vars = { + "openai": "OPENAI_API_KEY", + "anthropic": "ANTHROPIC_API_KEY", + "gemini": "GEMINI_API_KEY" + } + + if provider in env_vars: + return os.getenv(env_vars[provider], "") + + return "" + +# ============================================================ +# OLLAMA DISCOVERY +# ============================================================ + +def list_ollama_models() -> List[str]: + """Busca modelos disponíveis no Ollama.""" + try: + endpoint = LLM_PROVIDERS["ollama"]["endpoint"] + response = requests.get(f"{endpoint}/api/tags", timeout=5) + if response.status_code == 200: + models = [m["name"] for m in response.json().get("models", [])] + LLM_PROVIDERS["ollama"]["models"] = models + return models + except Exception as e: + print(f"Erro ao buscar modelos Ollama: {e}") + return [] + +def get_available_models(provider: str = None) -> List[Dict]: + """Retorna modelos disponíveis para um provider ou todos.""" + if provider: + p = LLM_PROVIDERS.get(provider) + if not p: + return [] + + if p["type"] == "local" and provider == "ollama": + models = list_ollama_models() + return [{"provider": provider, "models": models}] + else: + return [{"provider": provider, "models": p.get("models", [p["default"]])}] + + # Todos os providers + result = [] + for prov_id, prov in LLM_PROVIDERS.items(): + if prov_id == "ollama": + models = list_ollama_models() + result.append({"provider": prov_id, "name": prov["name"], "models": models}) + else: + result.append({"provider": prov_id, "name": prov["name"], "models": prov.get("models", [prov["default"]])}) + + return result + +# ============================================================ +# LLM CALL FUNCTIONS +# ============================================================ + +def call_llm(provider: str, model: str, prompt: str, system_prompt: str = None, **kwargs) -> str: + """ + Chama o LLM especificado. + + Args: + provider: Nome do provider (gemini, openai, anthropic, ollama) + model: Nome do modelo + prompt: Prompt do usuário + system_prompt: Prompt de sistema (opcional) + + Returns: + Resposta do LLM como string + """ + if provider == "gemini": + return _call_gemini(model, prompt, system_prompt) + elif provider == "openai": + return _call_openai(model, prompt, system_prompt) + elif provider == "anthropic": + return _call_anthropic(model, prompt, system_prompt) + elif provider == "ollama": + return _call_ollama(model, prompt, system_prompt) + else: + return f"Erro: Provider '{provider}' não suportado." + +# ---------------------------------------- +# GEMINI +# ---------------------------------------- +def _call_gemini(model: str, prompt: str, system_prompt: str = None) -> str: + """Chama API do Google Gemini.""" + api_key = get_api_key("gemini") + if not api_key: + api_key = os.getenv("GEMINI_API_KEY", "") + + url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" + + contents = [{"parts": [{"text": prompt}]}] + if system_prompt: + contents.insert(0, {"role": "model", "parts": [{"text": system_prompt}]}) + + payload = {"contents": contents} + + try: + res = requests.post(url, json=payload, timeout=60) + if res.status_code == 200: + return res.json()["candidates"][0]["content"]["parts"][0]["text"] + return f"Erro Gemini: {res.status_code} - {res.text}" + except Exception as e: + return f"Erro Gemini: {str(e)}" + +# ---------------------------------------- +# OPENAI +# ---------------------------------------- +def _call_openai(model: str, prompt: str, system_prompt: str = None) -> str: + """Chama API da OpenAI.""" + api_key = get_api_key("openai") + if not api_key: + api_key = os.getenv("OPENAI_API_KEY", "") + + url = f"https://api.openai.com/v1/chat/completions" + + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": prompt}) + + payload = { + "model": model, + "messages": messages, + "temperature": 0.7 + } + + try: + res = requests.post(url, json=payload, headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + }, timeout=60) + + if res.status_code == 200: + return res.json()["choices"][0]["message"]["content"] + return f"Erro OpenAI: {res.status_code} - {res.text}" + except Exception as e: + return f"Erro OpenAI: {str(e)}" + +# ---------------------------------------- +# ANTHROPIC +# ---------------------------------------- +def _call_anthropic(model: str, prompt: str, system_prompt: str = None) -> str: + """Chama API da Anthropic (Claude).""" + api_key = get_api_key("anthropic") + if not api_key: + api_key = os.getenv("ANTHROPIC_API_KEY", "") + + url = "https://api.anthropic.com/v1/messages" + + headers = { + "x-api-key": api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json" + } + + payload = { + "model": model, + "max_tokens": 4096, + "messages": [{"role": "user", "content": prompt}] + } + + if system_prompt: + payload["system"] = system_prompt + + try: + res = requests.post(url, json=payload, headers=headers, timeout=60) + + if res.status_code == 200: + return res.json()["content"][0]["text"] + return f"Erro Anthropic: {res.status_code} - {res.text}" + except Exception as e: + return f"Erro Anthropic: {str(e)}" + +# ---------------------------------------- +# OLLAMA (LOCAL) +# ---------------------------------------- +def _call_ollama(model: str, prompt: str, system_prompt: str = None) -> str: + """Chama Ollama local.""" + endpoint = LLM_PROVIDERS["ollama"]["endpoint"] + + payload = { + "model": model, + "prompt": prompt, + "stream": False + } + + if system_prompt: + payload["system"] = system_prompt + + try: + res = requests.post(f"{endpoint}/api/generate", json=payload, timeout=120) + + if res.status_code == 200: + return res.json().get("response", "") + return f"Erro Ollama: {res.status_code} - {res.text}" + except Exception as e: + return f"Erro Ollama: {str(e)}" + +# ============================================================ +# HELPER FUNCTIONS +# ============================================================ + +def get_planner_llm() -> tuple: + """Retorna provider e modelo do planner configurado.""" + cfg = get_orchestrator_config() + planner = cfg.get("planner", {"provider": "gemini", "model": "gemini-2.5-flash"}) + return planner["provider"], planner["model"] + +def get_executor_llm() -> tuple: + """Retorna provider e modelo do executor configurado.""" + cfg = get_orchestrator_config() + executor = cfg.get("executor", {"provider": "ollama", "model": "qwen2.5-coder:1.5b"}) + return executor["provider"], executor["model"] + +def call_planner(prompt: str, system_prompt: str = None) -> str: + """Chama o LLM do planner com a config atual.""" + provider, model = get_planner_llm() + return call_llm(provider, model, prompt, system_prompt) + +def call_executor(prompt: str, system_prompt: str = None) -> str: + """Chama o LLM do executor com a config atual.""" + provider, model = get_executor_llm() + return call_llm(provider, model, prompt, system_prompt) diff --git a/logs_cmd.json b/logs_cmd.json new file mode 100644 index 0000000..3ba938d --- /dev/null +++ b/logs_cmd.json @@ -0,0 +1,3 @@ +{ + "text": "Agente, o serviço Ollama no Coolify está com status unhealthy. Por favor, rode o comando bash: docker logs ollama-lw4s8g4gc8gss4gkc4gg0wk4 --tail 50. O que está escrito neles?" +} diff --git a/main.py b/main.py index 8934e96..ac6a3e6 100644 --- a/main.py +++ b/main.py @@ -52,7 +52,7 @@ async def check_login(is_auth: bool = Depends(verify_password)): async def get_system_status(is_auth: bool = Depends(verify_password)): """Retorna o status do sistema (CPU, RAM, Disco) sem travar o loop.""" def get_stats(): - cpu_percent = psutil.cpu_percent(interval=0.1) + cpu_percent = psutil.cpu_percent(interval=0.5) vm = psutil.virtual_memory() disk = psutil.disk_usage('/') return { @@ -190,6 +190,133 @@ async def telegram_webhook(request: Request): print("Update recebido do Telegram:", update) return {"ok": True} +# ============================================================ +# NOVOS ENDPOINTS - ORQUESTRADOR +# ============================================================ +from orchestrator import ( + orchestrate, handle_message, get_orchestrator_status, + get_llm_config, set_llm_config, format_confirmation_message, + format_completion_message +) +from llm_providers import get_available_models +from credential_manager import sync_credentials + +@app.post("/api/orchestrate") +async def orchestrate_task(task_data: dict, is_auth: bool = Depends(verify_password)): + """ + Executa tarefa orquestrada. + + POST /api/orchestrate + { + "task": "faz deploy do app X", + "confirmed": false + } + + Response: + { + "status": "needs_confirmation" | "completed", + "plan": {...}, + "confirmation_needed_for": [...], + "message": "..." (para display) + } + """ + task = task_data.get("task", "") + confirmed = task_data.get("confirmed", False) + + if not task: + return JSONResponse(content={"status": "error", "message": "Task vazia"}, status_code=400) + + result = orchestrate(task, user_confirmed=confirmed) + + # Formata mensagem para display + if result["status"] == "needs_confirmation": + message = format_confirmation_message(result) + return JSONResponse(content={ + "status": "needs_confirmation", + "plan": result["plan"], + "confirmation_needed_for": result["confirmation_needed_for"], + "message": message + }) + + return JSONResponse(content={ + "status": "completed", + "plan": result["plan"], + "results": result.get("results", []), + "message": format_completion_message(result) if 'format_completion_message' in dir() else "Concluído" + }) + +@app.get("/api/orchestrator-status") +async def get_orch_status(is_auth: bool = Depends(verify_password)): + """Retorna status do orquestrador.""" + return JSONResponse(content=get_orchestrator_status()) + +@app.get("/api/llm-config") +async def get_llm_configuration(is_auth: bool = Depends(verify_password)): + """Retorna configuração atual de LLMs.""" + return JSONResponse(content=get_llm_config()) + +@app.post("/api/llm-config") +async def update_llm_configuration(config_data: dict, is_auth: bool = Depends(verify_password)): + """Atualiza configuração de LLMs.""" + planner_provider = config_data.get("planner_provider") or None + planner_model = config_data.get("planner_model") or None + executor_provider = config_data.get("executor_provider") or None + executor_model = config_data.get("executor_model") or None + + changes = set_llm_config( + planner_provider=planner_provider, + planner_model=planner_model, + executor_provider=executor_provider, + executor_model=executor_model + ) + + return JSONResponse(content={"status": "success", "changes": changes}) + +@app.get("/api/llm-models") +async def list_llm_models(is_auth: bool = Depends(verify_password)): + """Lista modelos disponíveis para cada provider.""" + models = get_available_models() + return JSONResponse(content={"models": models}) + +@app.post("/api/sync-credentials") +async def sync_creds(is_auth: bool = Depends(verify_password)): + """Força sincronização de credenciais.""" + result = sync_credentials() + return JSONResponse(content=result) + +@app.get("/api/tools") +async def list_tools(is_auth: bool = Depends(verify_password)): + """Lista todas as ferramentas disponíveis.""" + from tools_v2 import get_tools_by_danger + return JSONResponse(content={ + "tools": { + "safe": get_tools_by_danger("safe"), + "medium": get_tools_by_danger("medium"), + "dangerous": get_tools_by_danger("dangerous") + } + }) + +@app.post("/api/handle-message") +async def handle_web_message(message: dict, is_auth: bool = Depends(verify_password)): + """ + Manipula mensagem do usuário (alternativa ao chat normal). + Suporta confirmação de ações perigosas. + + POST /api/handle-message + { + "text": "faz deploy do app", + "confirmed": false + } + """ + text = message.get("text", "") + confirmed = message.get("confirmed", False) + + if not text: + return JSONResponse(content={"reply": "Mensagem vazia"}) + + reply = await run_in_threadpool(handle_message, text=text, confirmed=confirmed) + return JSONResponse(content={"reply": reply}) + if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) diff --git a/orchestrator.py b/orchestrator.py new file mode 100644 index 0000000..93fe012 --- /dev/null +++ b/orchestrator.py @@ -0,0 +1,543 @@ +# ============================================================ +# ORCHESTRATOR.PY - Orquestrador de Tarefas +# Planner (Gemini/OpenAI/Claude/Ollama) + Executor (Qwen/Ollama) +# ============================================================ + +import json +import re +from typing import Dict, List, Optional +from llm_providers import ( + call_planner, call_executor, get_planner_llm, get_executor_llm, + get_available_models, LLM_PROVIDERS, set_planner, set_executor, get_config, save_config +) +from tools_v2 import TOOLS_V2, get_tools_by_danger, get_all_tools_formatted +from credential_manager import sync_credentials, get_services_status + +# ============================================================ +# SYSTEM PROMPTS +# ============================================================ + +PLANNER_SYSTEM_PROMPT = """Você é o PLANNER AGENT do BotVPS. +Seu trabalho é decompor tarefas em passos executáveis. + +### SUAS TAREFAS: +1. Entender a intenção do usuário +2. Decompor em passos menores e claros +3. Classificar cada passo como SAFE, MEDIUM ou DANGEROUS +4. Usar as ferramentas disponíveis listadas abaixo + +### NÍVEIS DE PERIGO: +- SAFE: Pode executar automaticamente (listar, ver status, ler logs) +- MEDIUM: Informa o usuário antes (git pull, build, restart) +- DANGEROUS: REQUER confirmação explícita (delete, reboot, docker down) + +### FERRAMENTAS DISPONÍVEIS: +{TOOLS_LIST} + +### FORMATO DE RESPOSTA: +Responda APENAS com JSON no seguinte formato: +{{ + "task_name": "Nome resumido da tarefa", + "summary": "Resumo do que será feito", + "steps": [ + {{ + "order": 1, + "action": "Descrição clara do que fazer", + "tool": "nome_da_ferramenta (ou null se for bash)", + "command": "comando específico a executar", + "danger": "safe|medium|dangerous" + }} + ] +}} + +### REGRAS: +1. Responda APENAS com JSON válido +2. Cada passo deve ser atômico (uma ação por passo) +3. Considere dependências entre passos +4. Para passos bash complexos, use tool="bash" e command="comando" +5. Os passos devem ser na ordem correta de execução +""" + +EXECUTOR_SYSTEM_PROMPT = """Você é o EXECUTOR AGENT do BotVPS. +Seu trabalho é executar comandos bash com precisão. + +### REGRAS: +1. Execute APENAS o comando passado +2. Retorne o output do comando +3. Se houver erro, descreva o erro claramente +4. Não invente outputs + +### FORMATO DE RESPOSTA: +Responda com JSON: +{{ + "success": true|false, + "output": "output do comando ou erro" +}} + +### IMPORTANTE: +- Use caminhos absolutos quando possível +- Redirecione erros (2>/dev/null) quando apropriado +- Mantenha comandos simples e seguros +""" + +# ============================================================ +# HELPER FUNCTIONS +# ============================================================ + +def _format_tools_for_prompt() -> str: + """Formata lista de ferramentas para o prompt.""" + lines = [] + for name, info in TOOLS_V2.items(): + lines.append(f"- {name}: {info['desc']} [{info['danger']}]") + return "\n".join(lines) + +def _parse_json_response(text: str) -> Optional[Dict]: + """Extrai JSON da resposta do LLM.""" + # Tenta encontrar JSON no texto + json_match = re.search(r'\{[\s\S]*\}', text) + if json_match: + try: + return json.loads(json_match.group()) + except json.JSONDecodeError: + pass + + # Tenta extrair de blocos de código + code_blocks = re.findall(r'```(?:json)?\s*([\s\S]*?)```', text) + for block in code_blocks: + try: + return json.loads(block.strip()) + except json.JSONDecodeError: + continue + + return None + +def _classify_dangerous_steps(steps: List[Dict]) -> List[Dict]: + """Retorna apenas passos perigosos.""" + return [s for s in steps if s.get("danger") in ["medium", "dangerous"]] + +# ============================================================ +# PLANNER AGENT +# ============================================================ + +def plan_task(task: str) -> Dict: + """ + Usa o Planner LLM para decompor uma tarefa. + + Args: + task: Tarefa do usuário + + Returns: + Dicionário com plano de execução: + { + "task_name": str, + "summary": str, + "steps": [ + {"order": int, "action": str, "tool": str, "command": str, "danger": str} + ] + } + """ + provider, model = get_planner_llm() + print(f"[PLANNER] Using: {provider}/{model}") + + system_prompt = PLANNER_SYSTEM_PROMPT.replace("{TOOLS_LIST}", _format_tools_for_prompt()) + + response = call_planner(task, system_prompt) + print(f"[RESPONSE] Planner response:\n{response[:500]}...") + + plan = _parse_json_response(response) + + if not plan or "steps" not in plan: + # Fallback: tenta executar como comando único + return { + "task_name": task[:50], + "summary": f"Tarefa: {task}", + "steps": [{ + "order": 1, + "action": task, + "tool": "bash", + "command": task, + "danger": "medium" + }] + } + + return plan + +# ============================================================ +# EXECUTOR AGENT +# ============================================================ + +def execute_command(command: str) -> Dict: + """ + Executa um comando bash via Executor LLM. + + Args: + command: Comando a executar + + Returns: + {"success": bool, "output": str} + """ + import subprocess + + provider, model = get_executor_llm() + print(f"[EXECUTOR] Using: {provider}/{model}") + + # Para comandos bash simples, executa direto sem LLM + # Usa LLM apenas para comandos complexos + if len(command) < 100 and not any(c in command for c in ["&&", "||", "|", "$"]): + try: + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + timeout=60 + ) + return { + "success": result.returncode == 0, + "output": result.stdout.strip() or result.stderr.strip() or "Sucesso" + } + except Exception as e: + return {"success": False, "output": str(e)} + + # Para comandos complexos, usa LLM + response = call_executor( + f"Execute este comando e retorne o resultado em JSON: {command}", + EXECUTOR_SYSTEM_PROMPT + ) + + result = _parse_json_response(response) + if result: + return result + + return {"success": False, "output": response} + +def execute_step(step: Dict) -> Dict: + """ + Executa um passo do plano. + + Args: + step: Dicionário com dados do passo + + Returns: + {"success": bool, "output": str, "step": int} + """ + tool = step.get("tool") + command = step.get("command", "") + order = step.get("order", 0) + + print(f" -> Step {order}: {step.get('action')[:50]}...") + + if tool and tool in TOOLS_V2: + try: + tool_info = TOOLS_V2[tool] + func = tool_info["func"] + + # Executa a função da ferramenta + if callable(func): + result = func(command) if command else func() + else: + result = str(func) + + return { + "success": True, + "output": result, + "step": order + } + except Exception as e: + return { + "success": False, + "output": f"Erro ao executar {tool}: {str(e)}", + "step": order + } + + # Executa como comando bash + return execute_command(command) + +# ============================================================ +# ORCHESTRATOR MAIN +# ============================================================ + +def orchestrate(task: str, user_confirmed: bool = False) -> Dict: + """ + Orquestra a execução de uma tarefa. + + Args: + task: Tarefa do usuário + user_confirmed: Se True, pula confirmação e executa tudo + + Returns: + { + "status": "needs_confirmation" | "completed" | "error", + "plan": {...}, + "confirmation_needed_for": [steps peligrosos], + "results": [...] (se status == "completed") + } + """ + print(f"\n{'='*50}") + print(f">>> PLANNING: {task}") + print(f"{'='*50}\n") + + # 1. Plana a tarefa + plan = plan_task(task) + + # 2. Identifica passos perigosos + dangerous_steps = _classify_dangerous_steps(plan.get("steps", [])) + + # 3. Se há passos perigosos e não confirmou, pede confirmação + if dangerous_steps and not user_confirmed: + return { + "status": "needs_confirmation", + "plan": plan, + "confirmation_needed_for": [ + {"order": s["order"], "action": s["action"], "danger": s["danger"]} + for s in dangerous_steps + ] + } + + # 4. Executa todos os passos + results = [] + for step in plan.get("steps", []): + result = execute_step(step) + results.append(result) + + # Para em caso de erro crítico + if not result.get("success") and step.get("danger") == "dangerous": + results.append({ + "success": False, + "output": "Execução abortada devido a erro crítico.", + "step": -1 + }) + break + + # 5. Retorna resultado + return { + "status": "completed", + "plan": plan, + "results": results + } + +def format_confirmation_message(result: Dict) -> str: + """ + Formata mensagem de confirmação para o usuário. + + Args: + result: Resultado do orchestrate() + + Returns: + String formatada para envio ao usuário + """ + if result["status"] != "needs_confirmation": + return "" + + plan = result["plan"] + dangerous = result["confirmation_needed_for"] + + msg = f"[PLANO] {plan.get('task_name', 'Tarefa')}\n\n" + msg += f"{plan.get('summary', '')}\n\n" + + msg += "AVISO: Acoes que precisam de confirmacao:\n\n" + + for step in dangerous: + icon = "[CRITICAL]" if step["danger"] == "dangerous" else "[WARNING]" + msg += f"{icon} Passo {step['order']}: {step['action']}\n" + + msg += "\nDeseja continuar? (sim/não)" + + return msg + +def format_completion_message(result: Dict) -> str: + """ + Formata mensagem de conclusão. + + Args: + result: Resultado do orchestrate() + + Returns: + String formatada com os resultados + """ + if result["status"] != "completed": + return "" + + plan = result["plan"] + results = result.get("results", []) + + msg = f"[OK] Concluido: {plan.get('task_name', 'Tarefa')}\n\n" + + success_count = sum(1 for r in results if r.get("success")) + total_count = len([r for r in results if r.get("step", 0) > 0]) + + msg += f"[STAT] Resultado: {success_count}/{total_count} passos executados com sucesso.\n\n" + + for r in results: + step_num = r.get("step", 0) + if step_num > 0: + status = "[OK]" if r.get("success") else "[FAIL]" + output = r.get("output", "")[:500] + msg += f"{status} Passo {step_num}:\n```\n{output}\n```\n\n" + + return msg + +# ============================================================ +# STATUS & CONFIG FUNCTIONS +# ============================================================ + +def get_orchestrator_status() -> Dict: + """Retorna status atual do orquestrador.""" + planner_provider, planner_model = get_planner_llm() + executor_provider, executor_model = get_executor_llm() + + return { + "planner": { + "provider": planner_provider, + "model": planner_model, + "name": LLM_PROVIDERS[planner_provider]["name"] + }, + "executor": { + "provider": executor_provider, + "model": executor_model, + "name": LLM_PROVIDERS[executor_provider]["name"] + }, + "credentials": get_services_status(), + "available_tools": len(TOOLS_V2) + } + +def get_llm_config() -> Dict: + """Retorna configuração de LLMs.""" + planner_provider, planner_model = get_planner_llm() + executor_provider, executor_model = get_executor_llm() + + return { + "planner": { + "provider": planner_provider, + "model": planner_model, + "available_providers": [ + {"id": k, "name": v["name"], "type": v["type"]} + for k, v in LLM_PROVIDERS.items() + ] + }, + "executor": { + "provider": executor_provider, + "model": executor_model, + "available_providers": [ + {"id": k, "name": v["name"], "type": v["type"]} + for k, v in LLM_PROVIDERS.items() + ] + } + } + +def set_llm_config(planner_provider: str = None, planner_model: str = None, + executor_provider: str = None, executor_model: str = None) -> Dict: + """Atualiza configuração de LLMs.""" + changes = {} + + if planner_provider: + result = set_planner(planner_provider, planner_model) + changes["planner"] = result + + if executor_provider: + result = set_executor(executor_provider, executor_model) + changes["executor"] = result + + return changes + +# ============================================================ +# COMMAND PARSER (para Telegram/Web) +# ============================================================ + +def parse_command(text: str) -> Dict: + """ + Interpreta comandos do usuário. + + Args: + text: Texto do usuário + + Returns: + {"type": "orchestrate"|"config"|"status", "data": {...}} + """ + text = text.strip().lower() + + # Comandos de configuração + if text.startswith("/llm"): + parts = text.split() + if len(parts) == 1: + return {"type": "config", "action": "show"} + elif len(parts) >= 3: + if parts[1] == "planner": + return {"type": "config", "action": "set_planner", "provider": parts[2]} + elif parts[1] == "executor": + return {"type": "config", "action": "set_executor", "provider": parts[2]} + return {"type": "config", "action": "help"} + + if text == "/sync": + return {"type": "config", "action": "sync_credentials"} + + if text == "/status": + return {"type": "status"} + + if text == "/tools": + return {"type": "tools"} + + if text.startswith("/"): + return {"type": "unknown", "command": text} + + # Tarefas de orquestração + return {"type": "orchestrate", "task": text} + +# ============================================================ +# MAIN HANDLER +# ============================================================ + +def handle_message(text: str, confirmed: bool = False) -> str: + """ + Manipula mensagem do usuário. + + Args: + text: Mensagem do usuário + confirmed: Se o usuário já confirmou ações perigosas + + Returns: + Resposta para o usuário + """ + parsed = parse_command(text) + + # Status + if parsed["type"] == "status": + status = get_orchestrator_status() + msg = "[BOT] Status do Orquestrador:\n\n" + msg += f"[PLANNER] {status['planner']['name']} ({status['planner']['model']})\n" + msg += f"[EXECUTOR] {status['executor']['name']} ({status['executor']['model']})\n" + msg += f"[TOOLS] Ferramentas: {status['available_tools']}\n" + return msg + + # Config + if parsed["type"] == "config": + if parsed["action"] == "show": + config = get_llm_config() + msg = "[CONFIG] Configuracao de LLMs:\n\n" + msg += f"[PLANNER] {config['planner']['provider']} / {config['planner']['model']}\n" + msg += f"[EXECUTOR] {config['executor']['provider']} / {config['executor']['model']}\n" + msg += "\nPara mudar: /llm planner ou /llm executor " + return msg + + if parsed["action"] == "sync_credentials": + result = sync_credentials() + return f"[SYNC] Credenciais sincronizadas: {result['status']}" + + return "[CONFIG] Use: /llm (mostrar) | /llm planner | /llm executor " + + # Tools + if parsed["type"] == "tools": + return get_all_tools_formatted() + + # Orchestrate + if parsed["type"] == "orchestrate": + task = parsed["task"] + result = orchestrate(task, confirmed) + + if result["status"] == "needs_confirmation": + return format_confirmation_message(result) + + return format_completion_message(result) + + # Unknown + return "[?] Comando nao reconhecido. Tente: /llm, /status, /tools ou descreva uma tarefa." diff --git a/tools_v2.py b/tools_v2.py new file mode 100644 index 0000000..d728890 --- /dev/null +++ b/tools_v2.py @@ -0,0 +1,664 @@ +# ============================================================ +# TOOLS_V2.PY - Ferramentas Expandidas para o Orquestrador +# NÃO SUBSTITUI tools.py - É um módulo adicional +# ============================================================ + +import subprocess +import os +import requests +from typing import Dict, List, Optional +from credential_manager import ( + gitea_api_url, gitea_token, supabase_url, supabase_anon_key, + supabase_service_role_key, coolify_api +) + +# ============================================================ +# CONSTANTS +# ============================================================ + +DANGER_LEVELS = { + "safe": "SAFE - Executa automático", + "medium": "MEDIUM - Informa antes", + "dangerous": "DANGEROUS - Pede confirmação" +} + +# ============================================================ +# UTILITY FUNCTIONS +# ============================================================ + +def run_bash(command: str, timeout: int = 120) -> Dict: + """Executa comando bash e retorna resultado estruturado.""" + try: + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + timeout=timeout + ) + + return { + "success": result.returncode == 0, + "returncode": result.returncode, + "stdout": result.stdout.strip(), + "stderr": result.stderr.strip(), + "output": result.stdout.strip() if result.stdout else result.stderr.strip() + } + except subprocess.TimeoutExpired: + return { + "success": False, + "error": "Comando expirou (timeout)" + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + +def format_output(result: Dict, max_length: int = 2000) -> str: + """Formata resultado para exibição.""" + if not result.get("success"): + return f"[ERROR] Erro: {result.get('error') or result.get('stderr') or 'Desconhecido'}" + + output = result.get("output", "[OK] Sucesso (sem output)") + if len(output) > max_length: + output = output[:max_length] + f"\n... (truncado, {len(output)} chars total)" + + return output + +# ============================================================ +# DOCKER TOOLS +# ============================================================ + +class DockerTools: + """Ferramentas Docker.""" + + @staticmethod + def ps(all_containers: bool = False) -> str: + """Lista containers Docker.""" + flags = "-a" if all_containers else "" + result = run_bash(f"docker ps {flags} --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}'") + return format_output(result) + + @staticmethod + def stats() -> str: + """Mostra estatísticas de recursos dos containers.""" + result = run_bash("docker stats --no-stream --format 'table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.NetIO}}'") + return format_output(result) + + @staticmethod + def logs(container: str, lines: int = 50, follow: bool = False) -> str: + """Mostra logs de um container.""" + follow_flag = "-f" if follow else "" + result = run_bash(f"docker logs {follow_flag} --tail {lines} {container}") + return format_output(result, max_length=5000) + + @staticmethod + def restart(container: str) -> str: + """Reinicia um container.""" + result = run_bash(f"docker restart {container}") + return format_output(result) + + @staticmethod + def stop(container: str) -> str: + """Para um container.""" + result = run_bash(f"docker stop {container}") + return format_output(result) + + @staticmethod + def start(container: str) -> str: + """Inicia um container.""" + result = run_bash(f"docker start {container}") + return format_output(result) + + @staticmethod + def exec(container: str, command: str) -> str: + """Executa comando dentro de um container.""" + result = run_bash(f"docker exec {container} {command}") + return format_output(result) + + @staticmethod + def inspect(container: str) -> str: + """Retorna informações detalhadas de um container.""" + result = run_bash(f"docker inspect {container}") + return format_output(result, max_length=3000) + + @staticmethod + def system_df() -> str: + """Mostra uso de disco do Docker.""" + result = run_bash("docker system df -v") + return format_output(result, max_length=3000) + + @staticmethod + def prune(dangerous: bool = False) -> str: + """Limpa recursos não utilizados do Docker.""" + if dangerous: + result = run_bash("docker system prune -af --volumes") + else: + result = run_bash("docker system prune -f") + return format_output(result) + +# ============================================================ +# GIT TOOLS +# ============================================================ + +class GitTools: + """Ferramentas Git.""" + + @staticmethod + def status(repo_path: str = ".") -> str: + """Mostra status do repositório git.""" + result = run_bash(f"git -C {repo_path} status --short") + return format_output(result) + + @staticmethod + def pull(repo_path: str = ".", remote: str = "origin", branch: str = "main") -> str: + """Faz git pull.""" + result = run_bash(f"git -C {repo_path} pull {remote} {branch}") + return format_output(result) + + @staticmethod + def push(repo_path: str = ".", remote: str = "origin", branch: str = "main") -> str: + """Faz git push.""" + result = run_bash(f"git -C {repo_path} push {remote} {branch}") + return format_output(result) + + @staticmethod + def clone(repo_url: str, target_path: str) -> str: + """Clona um repositório.""" + result = run_bash(f"git clone {repo_url} {target_path}") + return format_output(result) + + @staticmethod + def branch(repo_path: str = ".", list_all: bool = False) -> str: + """Lista branches.""" + flags = "-a" if list_all else "" + result = run_bash(f"git -C {repo_path} branch {flags}") + return format_output(result) + + @staticmethod + def checkout(repo_path: str, branch: str) -> str: + """Muda para outro branch.""" + result = run_bash(f"git -C {repo_path} checkout {branch}") + return format_output(result) + + @staticmethod + def log(repo_path: str = ".", count: int = 10) -> str: + """Mostra histórico de commits.""" + result = run_bash(f"git -C {repo_path} log --oneline -{count}") + return format_output(result) + + @staticmethod + def diff(repo_path: str = ".") -> str: + """Mostra diferenças não commitadas.""" + result = run_bash(f"git -C {repo_path} diff") + return format_output(result) + + @staticmethod + def stash(repo_path: str = ".") -> str: + """Salva alterações temporariamente.""" + result = run_bash(f"git -C {repo_path} stash") + return format_output(result) + + @staticmethod + def fetch(repo_path: str = ".", remote: str = "origin") -> str: + """Busca atualizações sem aplicar.""" + result = run_bash(f"git -C {repo_path} fetch {remote}") + return format_output(result) + +# ============================================================ +# DOCKER COMPOSE TOOLS +# ============================================================ + +class DockerComposeTools: + """Ferramentas Docker Compose.""" + + @staticmethod + def up(path: str, detach: bool = True, build: bool = False) -> str: + """Sobe serviços com docker-compose.""" + flags = "-d " if detach else "" + build_flag = "--build " if build else "" + result = run_bash(f"docker-compose -f {path} up {flags}{build_flag}") + return format_output(result) + + @staticmethod + def down(path: str, volumes: bool = False) -> str: + """Para e remove containers.""" + flags = "-v" if volumes else "" + result = run_bash(f"docker-compose -f {path} down {flags}") + return format_output(result) + + @staticmethod + def build(path: str, no_cache: bool = False) -> str: + """Constrói imagens.""" + flags = "--no-cache" if no_cache else "" + result = run_bash(f"docker-compose -f {path} build {flags}") + return format_output(result, max_length=5000) + + @staticmethod + def ps(path: str) -> str: + """Lista serviços.""" + result = run_bash(f"docker-compose -f {path} ps") + return format_output(result) + + @staticmethod + def logs(path: str, service: str = None, lines: int = 100) -> str: + """Mostra logs dos serviços.""" + service_part = f"{service}" if service else "" + result = run_bash(f"docker-compose -f {path} logs --tail {lines} {service_part}") + return format_output(result, max_length=5000) + + @staticmethod + def restart(path: str, service: str = None) -> str: + """Reinicia serviços.""" + service_part = f"{service}" if service else "" + result = run_bash(f"docker-compose -f {path} restart {service_part}") + return format_output(result) + +# ============================================================ +# GITEA API TOOLS +# ============================================================ + +class GiteaTools: + """Ferramentas via API do Gitea.""" + + @staticmethod + def _get_headers() -> Dict: + """Retorna headers para API do Gitea.""" + token = gitea_token() + return { + "Authorization": f"token {token}", + "Content-Type": "application/json" + } + + @staticmethod + def list_repos() -> str: + """Lista repositórios do usuário.""" + url = f"{gitea_api_url()}/user/repos" + try: + res = requests.get(url, headers=GiteaTools._get_headers(), timeout=10) + if res.status_code == 200: + repos = res.json() + if not repos: + return "Nenhum repositório encontrado." + + output = "[REPO] **Repositórios:**\n\n" + for repo in repos[:10]: + output += f"• `{repo['name']}` - {repo.get('description', 'Sem descrição')[:50]}\n" + output += f" URL: {repo['html_url']}\n\n" + return output + return f"[ERROR] Erro: {res.status_code} - {res.text}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + + @staticmethod + def get_repo(owner: str, repo: str) -> str: + """Busca informações de um repositório.""" + url = f"{gitea_api_url()}/repos/{owner}/{repo}" + try: + res = requests.get(url, headers=GiteaTools._get_headers(), timeout=10) + if res.status_code == 200: + data = res.json() + return f"""[REPO] **{data['full_name']}** +- **Descrição:** {data.get('description', 'N/A')} +- **Linguagem:** {data.get('language', 'N/A')} +- **Stars:** {data.get('stars_count', 0)} +- **Forks:** {data.get('forks_count', 0)} +- **Última atualização:** {data.get('updated_at', 'N/A')} +- **URL:** {data['html_url']}""" + return f"[ERROR] Erro: {res.status_code}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + + @staticmethod + def list_actions(owner: str, repo: str) -> str: + """Lista workflows/actions do repositório.""" + url = f"{gitea_api_url()}/repos/{owner}/{repo}/actions/workflows" + try: + res = requests.get(url, headers=GiteaTools._get_headers(), timeout=10) + if res.status_code == 200: + workflows = res.json().get("workflows", []) + if not workflows: + return "Nenhum workflow encontrado." + + output = "[WF] **Workflows:**\n\n" + for wf in workflows: + output += f"• `{wf['name']}` - {wf.get('status', 'N/A')}\n" + return output + return f"[ERROR] Erro: {res.status_code}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + + @staticmethod + def trigger_workflow(owner: str, repo: str, workflow_id: str, ref: str = "main") -> str: + """Dispara um workflow.""" + url = f"{gitea_api_url()}/repos/{owner}/{repo}/actions/workflows/{workflow_id}/dispatches" + data = {"ref": ref} + try: + res = requests.post(url, headers=GiteaTools._get_headers(), json=data, timeout=10) + if res.status_code == 204: + return f"[OK] Workflow '{workflow_id}' disparado com sucesso!" + return f"[ERROR] Erro: {res.status_code} - {res.text}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + +# ============================================================ +# SUPABASE API TOOLS +# ============================================================ + +class SupabaseTools: + """Ferramentas via API REST do Supabase.""" + + @staticmethod + def _get_headers(anon_key: bool = True) -> Dict: + """Retorna headers para API do Supabase.""" + key = supabase_anon_key() if anon_key else supabase_service_role_key() + role = "anon" if anon_key else "service_role" + return { + "apikey": key, + "Authorization": f"Bearer {key}", + "Content-Type": "application/json", + "Prefer": "return=representation" + } + + @staticmethod + def list_tables() -> str: + """Lista tabelas disponíveis (via introspecção).""" + url = f"{supabase_url()}/rest/v1/" + try: + res = requests.get(url, headers=SupabaseTools._get_headers(), timeout=10) + if res.status_code == 200: + tables = res.json() + if not tables: + return "Nenhuma tabela encontrada." + + output = "[DATA] **Tabelas:**\n\n" + for table in tables[:20]: + output += f"• `{table.get('table_name', 'N/A')}`\n" + return output + return f"[ERROR] Erro: {res.status_code}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + + @staticmethod + def query(table: str, select: str = "*", filters: str = None, limit: int = 10) -> str: + """Consulta dados de uma tabela.""" + url = f"{supabase_url()}/rest/v1/{table}" + params = f"select={select}&limit={limit}" + if filters: + params += f"&{filters}" + + try: + res = requests.get(url, headers=SupabaseTools._get_headers(), params=params, timeout=10) + if res.status_code == 200: + data = res.json() + if not data: + return f"📭 Nenhum resultado em `{table}`." + + output = f"[DATA] **Resultados de `{table}`** ({len(data)} registros):\n\n" + for row in data[:5]: + output += f"```json\n{str(row)[:200]}\n```\n" + return output + return f"[ERROR] Erro: {res.status_code} - {res.text}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + + @staticmethod + def insert(table: str, data: Dict) -> str: + """Insere dados em uma tabela.""" + url = f"{supabase_url()}/rest/v1/{table}" + try: + res = requests.post(url, headers=SupabaseTools._get_headers(anon_key=False), json=data, timeout=10) + if res.status_code in [200, 201]: + return f"[OK] Registro inserido em `{table}`!" + return f"[ERROR] Erro: {res.status_code} - {res.text}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + + @staticmethod + def update(table: str, data: Dict, filters: str) -> str: + """Atualiza dados em uma tabela.""" + url = f"{supabase_url()}/rest/v1/{table}?{filters}" + try: + res = requests.patch(url, headers=SupabaseTools._get_headers(anon_key=False), json=data, timeout=10) + if res.status_code in [200, 204]: + return f"[OK] Registro atualizado em `{table}`!" + return f"[ERROR] Erro: {res.status_code} - {res.text}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + + @staticmethod + def delete(table: str, filters: str) -> str: + """Deleta dados de uma tabela.""" + url = f"{supabase_url()}/rest/v1/{table}?{filters}" + try: + res = requests.delete(url, headers=SupabaseTools._get_headers(anon_key=False), timeout=10) + if res.status_code in [200, 204]: + return f"[OK] Registro deletado de `{table}`!" + return f"[ERROR] Erro: {res.status_code} - {res.text}" + except Exception as e: + return f"[ERROR] Erro: {str(e)}" + +# ============================================================ +# COOLIFY API TOOLS +# ============================================================ + +class CoolifyTools: + """Ferramentas via API do Coolify.""" + + @staticmethod + def get_status() -> str: + """Retorna status do Coolify.""" + result = coolify_api("/status") + if "error" in result: + return f"[ERROR] Erro: {result['error']}" + + return f"""[COOLIFY] **Coolify Status:** +- **Status:** {result.get('status', 'N/A')} +- **Containers:** {result.get('containers', 'N/A')} +- **Deployments:** {result.get('deployments', 'N/A')}""" + + @staticmethod + def list_applications() -> str: + """Lista aplicações no Coolify.""" + from credential_manager import coolify_list_applications + apps = coolify_list_applications() + if not apps: + return "[REPO] Nenhuma aplicacao encontrada." + + output = "[REPO] Aplicacoes Coolify:\n\n" + for app in apps[:10]: + output += f"- {app.get('name', 'N/A')} - {app.get('status', 'N/A')}\n" + output += f" URL: {app.get('fqdn', 'N/A')}\n\n" + return output + + @staticmethod + def list_deployments(limit: int = 10) -> str: + """Lista deployments recentes.""" + from credential_manager import coolify_list_deployments + deps = coolify_list_deployments() + if not deps: + return "[DEPLOY] Nenhum deployment recente." + + output = "[DEPLOY] Deployments Recentes:\n\n" + for dep in deps[:limit]: + output += f"- {dep.get('application', 'N/A')} - {dep.get('status', 'N/A')}\n" + output += f" {dep.get('created_at', 'N/A')}\n\n" + return output + +# ============================================================ +# FILE TOOLS +# ============================================================ + +class FileTools: + """Ferramentas de manipulação de arquivos.""" + + @staticmethod + def list(path: str) -> str: + """Lista conteúdo de diretório.""" + result = run_bash(f"ls -la {path}") + return format_output(result) + + @staticmethod + def read(path: str, lines: int = 100) -> str: + """Lê conteúdo de arquivo.""" + result = run_bash(f"head -{lines} {path}") + return format_output(result, max_length=5000) + + @staticmethod + def search(path: str, pattern: str) -> str: + """Busca texto em arquivos.""" + result = run_bash(f"grep -rn '{pattern}' {path} 2>/dev/null | head -50") + return format_output(result, max_length=5000) + + @staticmethod + def write(path: str, content: str) -> str: + """Escreve conteúdo em arquivo.""" + # Escapa o conteúdo para evitar injection + import shlex + safe_content = shlex.quote(content) + result = run_bash(f"echo {safe_content} > {path}") + return format_output(result) + + @staticmethod + def exists(path: str) -> str: + """Verifica se arquivo existe.""" + exists = os.path.exists(path) + return f"{'[OK]' if exists else '[ERROR]'} {'Existe' if exists else 'Não existe'}: {path}" + + @staticmethod + def size(path: str) -> str: + """Retorna tamanho de arquivo.""" + result = run_bash(f"du -sh {path} 2>/dev/null || ls -lh {path}") + return format_output(result) + +# ============================================================ +# SYSTEM TOOLS +# ============================================================ + +class SystemTools: + """Ferramentas de sistema.""" + + @staticmethod + def df() -> str: + """Mostra uso de disco.""" + result = run_bash("df -h") + return format_output(result) + + @staticmethod + def free() -> str: + """Mostra uso de memória.""" + result = run_bash("free -h") + return format_output(result) + + @staticmethod + def top(limit: int = 10) -> str: + """Mostra processos mais pesados.""" + result = run_bash(f"ps aux --sort=-%cpu | head -{limit + 1}") + return format_output(result) + + @staticmethod + def uptime() -> str: + """Mostra uptime do sistema.""" + result = run_bash("uptime") + return format_output(result) + + @staticmethod + def services() -> str: + """Lista serviços ativos.""" + result = run_bash("systemctl list-units --type=service --state=running | head -20") + return format_output(result) + + @staticmethod + def ports() -> str: + """Lista portas em uso.""" + result = run_bash("netstat -tlnp 2>/dev/null || ss -tlnp") + return format_output(result, max_length=3000) + +# ============================================================ +# TOOLKIT REGISTRY +# ============================================================ + +TOOLS_V2 = { + # DOCKER + "docker_ps": {"desc": "Lista containers Docker", "func": DockerTools.ps, "danger": "safe"}, + "docker_stats": {"desc": "Estatísticas de containers", "func": DockerTools.stats, "danger": "safe"}, + "docker_logs": {"desc": "Logs de container (use: docker_logs )", "func": lambda n="app", l=50: DockerTools.log(n, int(l)), "danger": "safe"}, + "docker_restart": {"desc": "Reinicia container (use: docker_restart )", "func": DockerTools.restart, "danger": "dangerous"}, + "docker_stop": {"desc": "Para container", "func": DockerTools.stop, "danger": "dangerous"}, + "docker_start": {"desc": "Inicia container", "func": DockerTools.start, "danger": "medium"}, + "docker_exec": {"desc": "Executa comando no container", "func": DockerTools.exec, "danger": "dangerous"}, + "docker_system_df": {"desc": "Uso de disco Docker", "func": DockerTools.system_df, "danger": "safe"}, + "docker_prune": {"desc": "Limpa recursos Docker não usados", "func": lambda: DockerTools.prune(True), "danger": "dangerous"}, + + # GIT + "git_status": {"desc": "Status do repositório git", "func": GitTools.status, "danger": "safe"}, + "git_pull": {"desc": "Pull do git", "func": GitTools.pull, "danger": "medium"}, + "git_push": {"desc": "Push do git", "func": GitTools.push, "danger": "dangerous"}, + "git_clone": {"desc": "Clona repositório", "func": GitTools.clone, "danger": "medium"}, + "git_branch": {"desc": "Lista branches", "func": GitTools.branch, "danger": "safe"}, + "git_log": {"desc": "Histórico de commits", "func": GitTools.log, "danger": "safe"}, + "git_diff": {"desc": "Diferenças não commitadas", "func": GitTools.diff, "danger": "safe"}, + "git_fetch": {"desc": "Busca atualizações", "func": GitTools.fetch, "danger": "safe"}, + + # DOCKER COMPOSE + "dc_up": {"desc": "Sobe serviços (use: dc_up )", "func": DockerComposeTools.up, "danger": "dangerous"}, + "dc_down": {"desc": "Para serviços", "func": DockerComposeTools.down, "danger": "dangerous"}, + "dc_build": {"desc": "Constrói imagens", "func": DockerComposeTools.build, "danger": "medium"}, + "dc_ps": {"desc": "Lista serviços", "func": DockerComposeTools.ps, "danger": "safe"}, + "dc_logs": {"desc": "Logs de serviços", "func": DockerComposeTools.logs, "danger": "safe"}, + "dc_restart": {"desc": "Reinicia serviços", "func": DockerComposeTools.restart, "danger": "dangerous"}, + + # GITEA + "gitea_list_repos": {"desc": "Lista repositórios Gitea", "func": GiteaTools.list_repos, "danger": "safe"}, + "gitea_get_repo": {"desc": "Info de repositório (use: gitea_get_repo )", "func": GiteaTools.get_repo, "danger": "safe"}, + "gitea_list_actions": {"desc": "Lista workflows do repositório", "func": GiteaTools.list_actions, "danger": "safe"}, + "gitea_trigger": {"desc": "Dispara workflow", "func": GiteaTools.trigger_workflow, "danger": "dangerous"}, + + # SUPABASE + "supabase_list_tables": {"desc": "Lista tabelas do Supabase", "func": SupabaseTools.list_tables, "danger": "safe"}, + "supabase_query": {"desc": "Consulta tabela", "func": SupabaseTools.query, "danger": "safe"}, + "supabase_insert": {"desc": "Insere dados", "func": SupabaseTools.insert, "danger": "dangerous"}, + "supabase_update": {"desc": "Atualiza dados", "func": SupabaseTools.update, "danger": "dangerous"}, + + # COOLIFY + "coolify_status": {"desc": "Status do Coolify", "func": CoolifyTools.get_status, "danger": "safe"}, + "coolify_apps": {"desc": "Lista aplicações Coolify", "func": CoolifyTools.list_applications, "danger": "safe"}, + "coolify_deployments": {"desc": "Lista deployments recentes", "func": CoolifyTools.list_deployments, "danger": "safe"}, + + # FILES + "file_list": {"desc": "Lista diretório", "func": FileTools.list, "danger": "safe"}, + "file_read": {"desc": "Lê arquivo", "func": FileTools.read, "danger": "safe"}, + "file_search": {"desc": "Busca em arquivos", "func": FileTools.search, "danger": "safe"}, + "file_exists": {"desc": "Verifica se arquivo existe", "func": FileTools.exists, "danger": "safe"}, + "file_size": {"desc": "Tamanho de arquivo", "func": FileTools.size, "danger": "safe"}, + + # SYSTEM + "sys_df": {"desc": "Uso de disco", "func": SystemTools.df, "danger": "safe"}, + "sys_free": {"desc": "Uso de memória", "func": SystemTools.free, "danger": "safe"}, + "sys_top": {"desc": "Processos mais pesados", "func": SystemTools.top, "danger": "safe"}, + "sys_uptime": {"desc": "Uptime do sistema", "func": SystemTools.uptime, "danger": "safe"}, + "sys_ports": {"desc": "Portas em uso", "func": SystemTools.ports, "danger": "safe"}, +} + +def get_tools_by_danger(level: str) -> List[Dict]: + """Retorna ferramentas por nível de perigo.""" + return [ + {"name": k, **v} + for k, v in TOOLS_V2.items() + if v["danger"] == level + ] + +def get_all_tools_formatted() -> str: + """Retorna lista formatada de todas as ferramentas.""" + output = "[TOOLS] Ferramentas Disponiveis:\n\n" + + for level in ["safe", "medium", "dangerous"]: + tools = get_tools_by_danger(level) + if tools: + icon = {"safe": "[SAFE]", "medium": "[MEDIUM]", "dangerous": "[CRITICAL]"}[level] + output += f"\n{icon} **{level.upper()}**:\n" + for t in tools: + output += f" - `{t['name']}` - {t['desc']}\n" + + return output