marriott-box-image-video-ta.../api.py
DJP 04440d661d Cache Box thumbnails + search blob; render in UI
Search results were text-only — hard to scan visually with thousands
of assets coming. Now every file Gemini-tags or backfill mirrors also
gets its Box-generated 160x160 JPG thumbnail (~10-20 KB) pulled and
stashed in Postgres, plus a consolidated `search_terms` blob
(file_name + folder + description + flattened metadata values).
Search results render the thumbnail inline; rows missing one show a
striped placeholder. Search SQL now LEFT JOINs file_assets and hits
search_terms too, so backfilled rows are properly searchable.

- schema.sql: new `file_assets` table (file_id PK, thumbnail_bytes
  bytea, search_terms text, updated_at). idempotent.
- db.py: `upsert_file_asset` (INSERT … ON CONFLICT preserving
  existing thumbnail bytes if today's fetch failed) and
  `get_thumbnail`. Both swallow exceptions per the established
  defensive pattern.
- main.py: `fetch_thumbnail` (Box SDK get_file_thumbnail_by_id, JPG
  at 160 px, handles BoxAPIError 202/404 as soft misses) and
  `build_search_terms` (lowercase, whitespace-collapsed text blob).
  `_persist_file_asset` wires both into the image+video success
  paths of `_run_pass` and into every iteration of `_run_backfill`.
- Backfill skip logic refined: always upsert file_assets (idempotent
  PK), only skip the tagging_events insert if a good row already
  exists. Re-running Backfill from Box populates thumbnails for
  rows backfilled before this feature shipped.
- api.py: `GET /api/files/{file_id}/thumbnail` streams the bytea
  with Cache-Control max-age=86400. Search SQL gains the LEFT JOIN
  and emits `has_thumbnail` per row. Search also matches against
  fa.search_terms so backfilled rows surface for free-text queries
  that hit their metadata.
- frontend: Event type adds `has_thumbnail`; `thumbnailUrl(fileId)`
  helper builds the prefix-aware URL via Vite's BASE. EventList
  renders the thumbnail (lazy, with onError fallback) or a striped
  placeholder. .thumb styling + .event-head layout in styles.css.

Verified locally: schema applies via lifespan; upsert + get_thumbnail
roundtrip; /api/files/999/thumbnail returns 200 with bytes; /api/events
returns has_thumbnail per row; multi-token "female city" search finds
a row whose validated_metadata contains both tokens.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 16:20:13 -04:00

413 lines
15 KiB
Python

"""
Marriott Box Tagger — FastAPI backend.
Endpoints (all under /api/, all behind require_auth except /api/health):
GET /api/health — liveness + config flags
GET /api/me — who am I (after auth)
GET /api/events?q=…&limit=… — search tagging_events across all
text + JSONB fields
POST /api/runs — kick off a tagging pass in a
background thread; returns run_id
GET /api/runs — recent runs (run_id + counts)
GET /api/runs/{run_id}/events — events for a single run, newest first
"""
import os
import threading
import uuid
from contextlib import asynccontextmanager, contextmanager
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, Query, Response
from fastapi.middleware.cors import CORSMiddleware
from psycopg.rows import dict_row
import db
from auth import User, maybe_auth_info, require_admin, require_auth
BOX_FILE_URL = "https://app.box.com/file/{file_id}"
# Frontend builds the actual URL by joining its own API base with file_id —
# we only signal *whether* a thumbnail exists so the JSON stays prefix-agnostic.
@asynccontextmanager
async def lifespan(_app: FastAPI):
# Apply schema.sql on startup so anything in there (table, indexes, the
# pg_trgm extension required by /api/events fuzzy search) is in place
# before the first request. Run-thread paths also call ensure_schema, but
# search hits the DB without ever spawning one — so without this, fuzzy
# search 500s on a fresh DB.
try:
conn = db.get_conn()
try:
db.ensure_schema(conn)
print("[api] schema ensured")
finally:
conn.close()
except Exception as e:
print(f"[api] WARN: ensure_schema on startup failed ({type(e).__name__}: {e}) — continuing")
yield
app = FastAPI(title="Marriott Box Tagger API", version="1.0.0", lifespan=lifespan)
# CORS: only meaningful in dev (when the Vite dev server hits FastAPI cross-origin).
# In prod, Apache serves both SPA and API under the same origin.
_cors_origins = [o.strip() for o in os.getenv("CORS_ORIGINS", "").split(",") if o.strip()]
if _cors_origins:
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory record of background runs (run_id → state). Survives only as long as the
# api container; durable record of what each run produced is in tagging_events.
_runs: dict[str, dict] = {}
_runs_lock = threading.Lock()
@contextmanager
def _conn():
c = db.get_conn()
try:
yield c
finally:
c.close()
# ── Health / identity ────────────────────────────────────────────────────────
@app.get("/api/health")
def health():
db_ok = False
db_error = None
try:
with _conn() as c:
with c.cursor() as cur:
cur.execute("SELECT 1")
cur.fetchone()
db_ok = True
except Exception as e:
db_error = f"{type(e).__name__}: {e}"
return {
"ok": True,
"db": {"ok": db_ok, "error": db_error},
"auth": maybe_auth_info(),
}
@app.get("/api/me")
def me(user: User = Depends(require_auth)):
return user.to_dict()
# ── Search ──────────────────────────────────────────────────────────────────
# Columns the search ILIKE walks over (substring match, case-insensitive).
_SEARCH_COLS = [
"file_name",
"folder_path",
"description",
"status",
"file_id",
"coalesce(validated_metadata::text, '')",
"coalesce(raw_response::text, '')",
"coalesce(scenes::text, '')",
]
# Single concatenated text used for trigram similarity (fuzzy / typo tolerance).
_SEARCH_BLOB = (
"coalesce(file_name,'')||' '||coalesce(folder_path,'')||' '||"
"coalesce(description,'')||' '||coalesce(validated_metadata::text,'')||' '||"
"coalesce(scenes::text,'')"
)
# Fuzzy threshold for trigram similarity. 0.3 catches typos like
# "femalle" → "female" without flooding the results with noise.
_SIM_THRESHOLD = 0.3
# Short tokens (1-2 chars) are too noisy for trigrams — fall back to substring
# match only for those.
_MIN_FUZZY_TOKEN_LEN = 3
def _build_search_sql(q: str, limit: int):
"""
Tokenise the query on whitespace, AND-match every token across the columns,
where each token may match by substring OR by trigram similarity. Results
ranked by summed similarity score, then recency. LEFT JOINs file_assets
so the response can flag whether a thumbnail is available.
"""
tokens = [t for t in q.strip().split() if t]
common_cols = (
"e.id, e.run_id, e.created_at, e.file_id, e.file_name, e.folder_path, e.media_type, "
"e.gemini_model, e.description, e.scenes, e.validated_metadata, e.raw_response, "
"e.metadata_write_success, e.description_write_success, e.scene_comment_write_success, "
"e.status, e.error_message, e.duration_ms, "
"(fa.thumbnail_bytes IS NOT NULL) AS _has_thumbnail"
)
join_clause = "FROM tagging_events e LEFT JOIN file_assets fa ON fa.file_id = e.file_id"
if not tokens:
return (
f"SELECT {common_cols} {join_clause} "
f"ORDER BY e.created_at DESC LIMIT %(limit)s",
{"limit": limit},
)
# Per-token search runs across event columns + the file_assets.search_terms
# blob — gives backfilled rows a richer search surface than just the event
# row's fields.
all_search_cols = [
"e.file_name",
"e.folder_path",
"e.description",
"e.status",
"e.file_id",
"coalesce(e.validated_metadata::text, '')",
"coalesce(e.raw_response::text, '')",
"coalesce(e.scenes::text, '')",
"coalesce(fa.search_terms, '')",
]
fuzzy_blob = (
"coalesce(e.file_name,'')||' '||coalesce(e.folder_path,'')||' '||"
"coalesce(e.description,'')||' '||coalesce(e.validated_metadata::text,'')||' '||"
"coalesce(e.scenes::text,'')||' '||coalesce(fa.search_terms,'')"
)
params: dict = {"limit": limit}
clauses: list[str] = []
score_terms: list[str] = []
for i, tok in enumerate(tokens):
like_key = f"like_{i}"
sim_key = f"sim_{i}"
params[like_key] = f"%{tok}%"
params[sim_key] = tok
col_ors = " OR ".join(f"{c} ILIKE %({like_key})s" for c in all_search_cols)
if len(tok) >= _MIN_FUZZY_TOKEN_LEN:
clauses.append(
f"(({col_ors}) "
f"OR similarity({fuzzy_blob}, %({sim_key})s) > {_SIM_THRESHOLD})"
)
score_terms.append(f"similarity({fuzzy_blob}, %({sim_key})s)")
else:
clauses.append(f"({col_ors})")
where = " AND ".join(clauses)
score_sql = " + ".join(score_terms) if score_terms else "0"
sql = (
f"SELECT {common_cols}, ({score_sql}) AS _score "
f"{join_clause} "
f"WHERE {where} "
f"ORDER BY _score DESC, e.created_at DESC "
f"LIMIT %(limit)s"
)
return sql, params
def _event_to_dict(row):
out = dict(row)
fid = out.get("file_id")
out["box_url"] = BOX_FILE_URL.format(file_id=fid) if fid else None
out["has_thumbnail"] = bool(out.pop("_has_thumbnail", False))
if out.get("run_id") is not None:
out["run_id"] = str(out["run_id"])
if out.get("created_at") is not None:
out["created_at"] = out["created_at"].isoformat()
return out
@app.get("/api/files/{file_id}/thumbnail")
def file_thumbnail(file_id: str, user: User = Depends(require_auth)):
"""
Serve a previously-cached Box thumbnail (160x160 JPG by default) for a file.
Bytes live in Postgres bytea on file_assets — see `_persist_file_asset`
in main.py. Browser is told to cache aggressively; the asset is stable
until a re-fetch overwrites the row.
"""
with _conn() as c:
row = db.get_thumbnail(c, file_id)
if not row:
raise HTTPException(status_code=404, detail="no thumbnail")
data, content_type = row
return Response(
content=data,
media_type=content_type or "image/jpeg",
headers={"Cache-Control": "private, max-age=86400"},
)
@app.get("/api/events")
def search_events(
q: str = Query("", description="Free-text search across all fields"),
limit: int = Query(100, ge=1, le=500),
user: User = Depends(require_auth),
):
sql, params = _build_search_sql(q, limit)
with _conn() as c:
with c.cursor(row_factory=dict_row) as cur:
cur.execute(sql, params)
rows = cur.fetchall()
return {"q": q, "count": len(rows), "results": [_event_to_dict(r) for r in rows]}
# ── Run-now ─────────────────────────────────────────────────────────────────
def _run_pass_in_thread(run_id: uuid.UUID):
"""Background worker: open a fresh DB conn and call into the tagger pipeline."""
# Import inside the thread so we don't pay tagger-side init cost at API startup.
import main as tagger
with _runs_lock:
_runs[str(run_id)] = {"run_id": str(run_id), "state": "running", "error": None}
db_conn = None
try:
db_conn = db.get_conn()
db.ensure_schema(db_conn)
tagger._run_pass(run_id, db_conn)
with _runs_lock:
_runs[str(run_id)]["state"] = "completed"
except SystemExit as e:
with _runs_lock:
_runs[str(run_id)]["state"] = "failed"
_runs[str(run_id)]["error"] = f"SystemExit({e.code})"
except Exception as e:
with _runs_lock:
_runs[str(run_id)]["state"] = "failed"
_runs[str(run_id)]["error"] = f"{type(e).__name__}: {e}"
finally:
if db_conn is not None:
try:
db_conn.close()
except Exception:
pass
@app.post("/api/runs")
def start_run(user: User = Depends(require_admin)):
run_id = uuid.uuid4()
t = threading.Thread(target=_run_pass_in_thread, args=(run_id,), daemon=True)
t.start()
return {"run_id": str(run_id), "state": "running", "started_by": user.email or user.oid}
def _run_backfill_in_thread(run_id: uuid.UUID):
import main as tagger
with _runs_lock:
_runs[str(run_id)] = {"run_id": str(run_id), "state": "running", "error": None, "kind": "backfill"}
db_conn = None
try:
db_conn = db.get_conn()
db.ensure_schema(db_conn)
tagger._run_backfill(run_id, db_conn)
with _runs_lock:
_runs[str(run_id)]["state"] = "completed"
except SystemExit as e:
with _runs_lock:
_runs[str(run_id)]["state"] = "failed"
_runs[str(run_id)]["error"] = f"SystemExit({e.code})"
except Exception as e:
with _runs_lock:
_runs[str(run_id)]["state"] = "failed"
_runs[str(run_id)]["error"] = f"{type(e).__name__}: {e}"
finally:
if db_conn is not None:
try:
db_conn.close()
except Exception:
pass
@app.post("/api/backfill")
def start_backfill(user: User = Depends(require_admin)):
"""
Walk the Box folder and mirror any existing marriottUsa metadata into the
local DB as `status='backfilled'` rows. Use this after first deploy (or
after restoring an empty DB) so the per-file skip check doesn't re-tag
files Box already has metadata for.
"""
run_id = uuid.uuid4()
t = threading.Thread(target=_run_backfill_in_thread, args=(run_id,), daemon=True)
t.start()
return {"run_id": str(run_id), "state": "running", "kind": "backfill", "started_by": user.email or user.oid}
@app.get("/api/runs")
def list_runs(user: User = Depends(require_auth), limit: int = Query(20, ge=1, le=100)):
"""Recent runs in the DB, plus the in-memory state if the run is still active."""
with _conn() as c:
with c.cursor(row_factory=dict_row) as cur:
cur.execute(
"""
SELECT run_id,
min(created_at) AS started_at,
max(created_at) AS last_event_at,
count(*) AS events,
count(*) FILTER (WHERE status = 'success') AS successes,
count(*) FILTER (WHERE status LIKE '%%_error') AS errors
FROM tagging_events
GROUP BY run_id
ORDER BY max(created_at) DESC
LIMIT %s
""",
(limit,),
)
rows = cur.fetchall()
out = []
for r in rows:
rid = str(r["run_id"])
live = _runs.get(rid)
out.append({
"run_id": rid,
"started_at": r["started_at"].isoformat() if r["started_at"] else None,
"last_event_at": r["last_event_at"].isoformat() if r["last_event_at"] else None,
"events": r["events"],
"successes": r["successes"],
"errors": r["errors"],
"live_state": live["state"] if live else None,
"live_error": live["error"] if live else None,
})
return {"runs": out}
@app.get("/api/runs/{run_id}/events")
def run_events(run_id: str, user: User = Depends(require_auth), limit: int = Query(500, ge=1, le=2000)):
try:
uuid.UUID(run_id)
except ValueError:
raise HTTPException(status_code=400, detail="run_id must be a UUID")
with _conn() as c:
with c.cursor(row_factory=dict_row) as cur:
cur.execute(
"""
SELECT e.id, e.run_id, e.created_at, e.file_id, e.file_name, e.folder_path, e.media_type,
e.gemini_model, e.description, e.scenes, e.validated_metadata,
e.metadata_write_success, e.description_write_success,
e.scene_comment_write_success, e.status, e.error_message, e.duration_ms,
(fa.thumbnail_bytes IS NOT NULL) AS _has_thumbnail
FROM tagging_events e
LEFT JOIN file_assets fa ON fa.file_id = e.file_id
WHERE e.run_id = %s
ORDER BY e.created_at DESC
LIMIT %s
""",
(run_id, limit),
)
rows = cur.fetchall()
live = _runs.get(run_id)
return {
"run_id": run_id,
"live_state": live["state"] if live else None,
"live_error": live["error"] if live else None,
"count": len(rows),
"events": [_event_to_dict(r) for r in rows],
}