239 lines
7.3 KiB
Python
239 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Browser Cloud Service — Playwright-based browser automation server
|
|
Exposes a simple REST API for spawning/controlling browser sessions.
|
|
"""
|
|
|
|
import asyncio
|
|
import uuid
|
|
import json
|
|
import base64
|
|
from typing import Optional
|
|
from contextlib import asynccontextmanager
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError, BrowserContext, Page, Playwright
|
|
|
|
# Max concurrent browsers
|
|
MAX_BROWSERS = 5
|
|
BROWSER_TIMEOUT = 30000 # ms
|
|
|
|
# Active browser contexts
|
|
active_contexts: dict[str, BrowserContext] = {}
|
|
active_pages: dict[str, Page] = {}
|
|
active_playwrights: dict[str, Playwright] = {}
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup
|
|
print("[BrowserCloud] Iniciando Playwright service...")
|
|
yield
|
|
# Shutdown: close all browsers
|
|
print(f"[BrowserCloud] Fechando {len(active_contexts)} contextos...")
|
|
for ctx_id in list(active_contexts.keys()):
|
|
try:
|
|
await active_contexts[ctx_id].close()
|
|
except Exception:
|
|
pass
|
|
active_contexts.clear()
|
|
active_pages.clear()
|
|
print("[BrowserCloud] Done.")
|
|
|
|
|
|
app = FastAPI(title="BrowserCloud", lifespan=lifespan)
|
|
|
|
|
|
# ─── Models ──────────────────────────────────────────────────────────────────
|
|
|
|
class BrowserOptions(BaseModel):
|
|
headless: bool = True
|
|
viewport_width: int = 1280
|
|
viewport_height: int = 720
|
|
user_agent: Optional[str] = None
|
|
|
|
|
|
class NavigateOptions(BaseModel):
|
|
url: str
|
|
wait_until: str = "load" # load|domcontentloaded|networkidle
|
|
|
|
|
|
class ScreenshotOptions(BaseModel):
|
|
full_page: bool = False
|
|
format: str = "png" # png|jpg
|
|
|
|
|
|
# ─── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _get_page(ctx_id: str) -> Page:
|
|
if ctx_id not in active_pages:
|
|
raise HTTPException(404, f"Sessão {ctx_id} não encontrada")
|
|
return active_pages[ctx_id]
|
|
|
|
|
|
async def _cleanup(ctx_id: str):
|
|
"""Close a session and remove from active dicts."""
|
|
if ctx_id in active_pages:
|
|
try:
|
|
await active_pages[ctx_id].close()
|
|
except Exception:
|
|
pass
|
|
del active_pages[ctx_id]
|
|
if ctx_id in active_contexts:
|
|
try:
|
|
await active_contexts[ctx_id].close()
|
|
except Exception:
|
|
pass
|
|
del active_contexts[ctx_id]
|
|
if ctx_id in active_playwrights:
|
|
try:
|
|
await active_playwrights[ctx_id].stop()
|
|
except Exception:
|
|
pass
|
|
del active_playwrights[ctx_id]
|
|
|
|
|
|
# ─── Endpoints ─────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {
|
|
"status": "ok",
|
|
"active_sessions": len(active_contexts),
|
|
"max_sessions": MAX_BROWSERS
|
|
}
|
|
|
|
|
|
@app.post("/session")
|
|
async def create_session(opts: BrowserOptions = BrowserOptions()):
|
|
"""Cria uma nova sessão de browser (contexto + página)."""
|
|
if len(active_contexts) >= MAX_BROWSERS:
|
|
raise HTTPException(503, f"Limite de {MAX_BROWSERS} sessões atingido")
|
|
|
|
ctx_id = uuid.uuid4().hex[:12]
|
|
|
|
try:
|
|
pw = await async_playwright().start()
|
|
browser = await pw.chromium.launch(headless=opts.headless)
|
|
context = await browser.new_context(
|
|
viewport={"width": opts.viewport_width, "height": opts.viewport_height},
|
|
user_agent=opts.user_agent or "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
|
|
accept_downloads=True,
|
|
)
|
|
page = await context.new_page()
|
|
except Exception as e:
|
|
await pw.stop()
|
|
raise HTTPException(500, f"Falha ao iniciar browser: {e}")
|
|
|
|
active_contexts[ctx_id] = context
|
|
active_pages[ctx_id] = page
|
|
active_playwrights[ctx_id] = pw
|
|
|
|
# Auto-cleanup on context close
|
|
context.on("close", lambda: asyncio.create_task(_cleanup(ctx_id)))
|
|
|
|
return {"session_id": ctx_id, "url": page.url}
|
|
|
|
|
|
@app.get("/session/{ctx_id}")
|
|
async def get_session(ctx_id: str):
|
|
page = _get_page(ctx_id)
|
|
return {
|
|
"session_id": ctx_id,
|
|
"url": page.url,
|
|
"title": await page.title(),
|
|
}
|
|
|
|
|
|
@app.delete("/session/{ctx_id}")
|
|
async def close_session(ctx_id: str):
|
|
await _cleanup(ctx_id)
|
|
return {"status": "closed", "session_id": ctx_id}
|
|
|
|
|
|
@app.post("/session/{ctx_id}/navigate")
|
|
async def navigate(ctx_id: str, opts: NavigateOptions):
|
|
page = _get_page(ctx_id)
|
|
try:
|
|
await page.goto(opts.url, wait_until=opts.wait_until, timeout=BROWSER_TIMEOUT)
|
|
except PWTimeoutError:
|
|
raise HTTPException(408, "Navegação deu timeout")
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
|
|
return {"ok": True, "url": page.url, "title": await page.title()}
|
|
|
|
|
|
@app.post("/session/{ctx_id}/click")
|
|
async def click(ctx_id: str, selector: str, timeout: int = 10000):
|
|
page = _get_page(ctx_id)
|
|
try:
|
|
await page.click(selector, timeout=timeout)
|
|
except PWTimeoutError:
|
|
raise HTTPException(408, f"Elemento não encontrado: {selector}")
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/session/{ctx_id}/fill")
|
|
async def fill(ctx_id: str, selector: str, value: str, submit: bool = False):
|
|
page = _get_page(ctx_id)
|
|
try:
|
|
await page.fill(selector, value, timeout=10000)
|
|
if submit:
|
|
await page.press(selector, "Enter")
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
return {"ok": True}
|
|
|
|
|
|
@app.get("/session/{ctx_id}/screenshot")
|
|
async def screenshot(ctx_id: str, opts: ScreenshotOptions = ScreenshotOptions()):
|
|
page = _get_page(ctx_id)
|
|
try:
|
|
img = await page.screenshot(full_page=opts.full_page, type=opts.format)
|
|
b64 = base64.b64encode(img).decode()
|
|
return {"format": opts.format, "data": b64}
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/session/{ctx_id}/html")
|
|
async def get_html(ctx_id: str):
|
|
page = _get_page(ctx_id)
|
|
return {"html": await page.content()}
|
|
|
|
|
|
@app.get("/session/{ctx_id}/text")
|
|
async def get_text(ctx_id: str, selector: str = "body"):
|
|
page = _get_page(ctx_id)
|
|
try:
|
|
el = await page.query_selector(selector)
|
|
if not el:
|
|
raise HTTPException(404, f"Elemento {selector} não encontrado")
|
|
text = await el.inner_text()
|
|
return {"selector": selector, "text": text}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/session/{ctx_id}/evaluate")
|
|
async def evaluate(ctx_id: str, script: str):
|
|
page = _get_page(ctx_id)
|
|
try:
|
|
result = await page.evaluate(script)
|
|
return {"result": result}
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
port = int(__import__("os").getenv("PORT", 8088))
|
|
uvicorn.run("browser_cloud:app", host="0.0.0.0", port=port, reload=False)
|