Search results were text-only — hard to scan visually with thousands
of assets coming. Now every file Gemini-tags or backfill mirrors also
gets its Box-generated 160x160 JPG thumbnail (~10-20 KB) pulled and
stashed in Postgres, plus a consolidated `search_terms` blob
(file_name + folder + description + flattened metadata values).
Search results render the thumbnail inline; rows missing one show a
striped placeholder. Search SQL now LEFT JOINs file_assets and hits
search_terms too, so backfilled rows are properly searchable.
- schema.sql: new `file_assets` table (file_id PK, thumbnail_bytes
bytea, search_terms text, updated_at). idempotent.
- db.py: `upsert_file_asset` (INSERT … ON CONFLICT preserving
existing thumbnail bytes if today's fetch failed) and
`get_thumbnail`. Both swallow exceptions per the established
defensive pattern.
- main.py: `fetch_thumbnail` (Box SDK get_file_thumbnail_by_id, JPG
at 160 px, handles BoxAPIError 202/404 as soft misses) and
`build_search_terms` (lowercase, whitespace-collapsed text blob).
`_persist_file_asset` wires both into the image+video success
paths of `_run_pass` and into every iteration of `_run_backfill`.
- Backfill skip logic refined: always upsert file_assets (idempotent
PK), only skip the tagging_events insert if a good row already
exists. Re-running Backfill from Box populates thumbnails for
rows backfilled before this feature shipped.
- api.py: `GET /api/files/{file_id}/thumbnail` streams the bytea
with Cache-Control max-age=86400. Search SQL gains the LEFT JOIN
and emits `has_thumbnail` per row. Search also matches against
fa.search_terms so backfilled rows surface for free-text queries
that hit their metadata.
- frontend: Event type adds `has_thumbnail`; `thumbnailUrl(fileId)`
helper builds the prefix-aware URL via Vite's BASE. EventList
renders the thumbnail (lazy, with onError fallback) or a striped
placeholder. .thumb styling + .event-head layout in styles.css.
Verified locally: schema applies via lifespan; upsert + get_thumbnail
roundtrip; /api/files/999/thumbnail returns 200 with bytes; /api/events
returns has_thumbnail per row; multi-token "female city" search finds
a row whose validated_metadata contains both tokens.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
413 lines
15 KiB
Python
413 lines
15 KiB
Python
"""
|
|
Marriott Box Tagger — FastAPI backend.
|
|
|
|
Endpoints (all under /api/, all behind require_auth except /api/health):
|
|
GET /api/health — liveness + config flags
|
|
GET /api/me — who am I (after auth)
|
|
GET /api/events?q=…&limit=… — search tagging_events across all
|
|
text + JSONB fields
|
|
POST /api/runs — kick off a tagging pass in a
|
|
background thread; returns run_id
|
|
GET /api/runs — recent runs (run_id + counts)
|
|
GET /api/runs/{run_id}/events — events for a single run, newest first
|
|
"""
|
|
|
|
import os
|
|
import threading
|
|
import uuid
|
|
from contextlib import asynccontextmanager, contextmanager
|
|
from typing import Optional
|
|
|
|
from fastapi import Depends, FastAPI, HTTPException, Query, Response
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from psycopg.rows import dict_row
|
|
|
|
import db
|
|
from auth import User, maybe_auth_info, require_admin, require_auth
|
|
|
|
BOX_FILE_URL = "https://app.box.com/file/{file_id}"
|
|
# Frontend builds the actual URL by joining its own API base with file_id —
|
|
# we only signal *whether* a thumbnail exists so the JSON stays prefix-agnostic.
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_app: FastAPI):
|
|
# Apply schema.sql on startup so anything in there (table, indexes, the
|
|
# pg_trgm extension required by /api/events fuzzy search) is in place
|
|
# before the first request. Run-thread paths also call ensure_schema, but
|
|
# search hits the DB without ever spawning one — so without this, fuzzy
|
|
# search 500s on a fresh DB.
|
|
try:
|
|
conn = db.get_conn()
|
|
try:
|
|
db.ensure_schema(conn)
|
|
print("[api] schema ensured")
|
|
finally:
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"[api] WARN: ensure_schema on startup failed ({type(e).__name__}: {e}) — continuing")
|
|
yield
|
|
|
|
|
|
app = FastAPI(title="Marriott Box Tagger API", version="1.0.0", lifespan=lifespan)
|
|
|
|
# CORS: only meaningful in dev (when the Vite dev server hits FastAPI cross-origin).
|
|
# In prod, Apache serves both SPA and API under the same origin.
|
|
_cors_origins = [o.strip() for o in os.getenv("CORS_ORIGINS", "").split(",") if o.strip()]
|
|
if _cors_origins:
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=_cors_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# In-memory record of background runs (run_id → state). Survives only as long as the
|
|
# api container; durable record of what each run produced is in tagging_events.
|
|
_runs: dict[str, dict] = {}
|
|
_runs_lock = threading.Lock()
|
|
|
|
|
|
@contextmanager
|
|
def _conn():
|
|
c = db.get_conn()
|
|
try:
|
|
yield c
|
|
finally:
|
|
c.close()
|
|
|
|
|
|
# ── Health / identity ────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/api/health")
|
|
def health():
|
|
db_ok = False
|
|
db_error = None
|
|
try:
|
|
with _conn() as c:
|
|
with c.cursor() as cur:
|
|
cur.execute("SELECT 1")
|
|
cur.fetchone()
|
|
db_ok = True
|
|
except Exception as e:
|
|
db_error = f"{type(e).__name__}: {e}"
|
|
return {
|
|
"ok": True,
|
|
"db": {"ok": db_ok, "error": db_error},
|
|
"auth": maybe_auth_info(),
|
|
}
|
|
|
|
|
|
@app.get("/api/me")
|
|
def me(user: User = Depends(require_auth)):
|
|
return user.to_dict()
|
|
|
|
|
|
# ── Search ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
# Columns the search ILIKE walks over (substring match, case-insensitive).
|
|
_SEARCH_COLS = [
|
|
"file_name",
|
|
"folder_path",
|
|
"description",
|
|
"status",
|
|
"file_id",
|
|
"coalesce(validated_metadata::text, '')",
|
|
"coalesce(raw_response::text, '')",
|
|
"coalesce(scenes::text, '')",
|
|
]
|
|
|
|
# Single concatenated text used for trigram similarity (fuzzy / typo tolerance).
|
|
_SEARCH_BLOB = (
|
|
"coalesce(file_name,'')||' '||coalesce(folder_path,'')||' '||"
|
|
"coalesce(description,'')||' '||coalesce(validated_metadata::text,'')||' '||"
|
|
"coalesce(scenes::text,'')"
|
|
)
|
|
|
|
# Fuzzy threshold for trigram similarity. 0.3 catches typos like
|
|
# "femalle" → "female" without flooding the results with noise.
|
|
_SIM_THRESHOLD = 0.3
|
|
|
|
# Short tokens (1-2 chars) are too noisy for trigrams — fall back to substring
|
|
# match only for those.
|
|
_MIN_FUZZY_TOKEN_LEN = 3
|
|
|
|
|
|
def _build_search_sql(q: str, limit: int):
|
|
"""
|
|
Tokenise the query on whitespace, AND-match every token across the columns,
|
|
where each token may match by substring OR by trigram similarity. Results
|
|
ranked by summed similarity score, then recency. LEFT JOINs file_assets
|
|
so the response can flag whether a thumbnail is available.
|
|
"""
|
|
tokens = [t for t in q.strip().split() if t]
|
|
common_cols = (
|
|
"e.id, e.run_id, e.created_at, e.file_id, e.file_name, e.folder_path, e.media_type, "
|
|
"e.gemini_model, e.description, e.scenes, e.validated_metadata, e.raw_response, "
|
|
"e.metadata_write_success, e.description_write_success, e.scene_comment_write_success, "
|
|
"e.status, e.error_message, e.duration_ms, "
|
|
"(fa.thumbnail_bytes IS NOT NULL) AS _has_thumbnail"
|
|
)
|
|
join_clause = "FROM tagging_events e LEFT JOIN file_assets fa ON fa.file_id = e.file_id"
|
|
|
|
if not tokens:
|
|
return (
|
|
f"SELECT {common_cols} {join_clause} "
|
|
f"ORDER BY e.created_at DESC LIMIT %(limit)s",
|
|
{"limit": limit},
|
|
)
|
|
|
|
# Per-token search runs across event columns + the file_assets.search_terms
|
|
# blob — gives backfilled rows a richer search surface than just the event
|
|
# row's fields.
|
|
all_search_cols = [
|
|
"e.file_name",
|
|
"e.folder_path",
|
|
"e.description",
|
|
"e.status",
|
|
"e.file_id",
|
|
"coalesce(e.validated_metadata::text, '')",
|
|
"coalesce(e.raw_response::text, '')",
|
|
"coalesce(e.scenes::text, '')",
|
|
"coalesce(fa.search_terms, '')",
|
|
]
|
|
fuzzy_blob = (
|
|
"coalesce(e.file_name,'')||' '||coalesce(e.folder_path,'')||' '||"
|
|
"coalesce(e.description,'')||' '||coalesce(e.validated_metadata::text,'')||' '||"
|
|
"coalesce(e.scenes::text,'')||' '||coalesce(fa.search_terms,'')"
|
|
)
|
|
|
|
params: dict = {"limit": limit}
|
|
clauses: list[str] = []
|
|
score_terms: list[str] = []
|
|
for i, tok in enumerate(tokens):
|
|
like_key = f"like_{i}"
|
|
sim_key = f"sim_{i}"
|
|
params[like_key] = f"%{tok}%"
|
|
params[sim_key] = tok
|
|
col_ors = " OR ".join(f"{c} ILIKE %({like_key})s" for c in all_search_cols)
|
|
if len(tok) >= _MIN_FUZZY_TOKEN_LEN:
|
|
clauses.append(
|
|
f"(({col_ors}) "
|
|
f"OR similarity({fuzzy_blob}, %({sim_key})s) > {_SIM_THRESHOLD})"
|
|
)
|
|
score_terms.append(f"similarity({fuzzy_blob}, %({sim_key})s)")
|
|
else:
|
|
clauses.append(f"({col_ors})")
|
|
|
|
where = " AND ".join(clauses)
|
|
score_sql = " + ".join(score_terms) if score_terms else "0"
|
|
sql = (
|
|
f"SELECT {common_cols}, ({score_sql}) AS _score "
|
|
f"{join_clause} "
|
|
f"WHERE {where} "
|
|
f"ORDER BY _score DESC, e.created_at DESC "
|
|
f"LIMIT %(limit)s"
|
|
)
|
|
return sql, params
|
|
|
|
|
|
def _event_to_dict(row):
|
|
out = dict(row)
|
|
fid = out.get("file_id")
|
|
out["box_url"] = BOX_FILE_URL.format(file_id=fid) if fid else None
|
|
out["has_thumbnail"] = bool(out.pop("_has_thumbnail", False))
|
|
if out.get("run_id") is not None:
|
|
out["run_id"] = str(out["run_id"])
|
|
if out.get("created_at") is not None:
|
|
out["created_at"] = out["created_at"].isoformat()
|
|
return out
|
|
|
|
|
|
@app.get("/api/files/{file_id}/thumbnail")
|
|
def file_thumbnail(file_id: str, user: User = Depends(require_auth)):
|
|
"""
|
|
Serve a previously-cached Box thumbnail (160x160 JPG by default) for a file.
|
|
Bytes live in Postgres bytea on file_assets — see `_persist_file_asset`
|
|
in main.py. Browser is told to cache aggressively; the asset is stable
|
|
until a re-fetch overwrites the row.
|
|
"""
|
|
with _conn() as c:
|
|
row = db.get_thumbnail(c, file_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="no thumbnail")
|
|
data, content_type = row
|
|
return Response(
|
|
content=data,
|
|
media_type=content_type or "image/jpeg",
|
|
headers={"Cache-Control": "private, max-age=86400"},
|
|
)
|
|
|
|
|
|
@app.get("/api/events")
|
|
def search_events(
|
|
q: str = Query("", description="Free-text search across all fields"),
|
|
limit: int = Query(100, ge=1, le=500),
|
|
user: User = Depends(require_auth),
|
|
):
|
|
sql, params = _build_search_sql(q, limit)
|
|
with _conn() as c:
|
|
with c.cursor(row_factory=dict_row) as cur:
|
|
cur.execute(sql, params)
|
|
rows = cur.fetchall()
|
|
return {"q": q, "count": len(rows), "results": [_event_to_dict(r) for r in rows]}
|
|
|
|
|
|
# ── Run-now ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _run_pass_in_thread(run_id: uuid.UUID):
|
|
"""Background worker: open a fresh DB conn and call into the tagger pipeline."""
|
|
# Import inside the thread so we don't pay tagger-side init cost at API startup.
|
|
import main as tagger
|
|
|
|
with _runs_lock:
|
|
_runs[str(run_id)] = {"run_id": str(run_id), "state": "running", "error": None}
|
|
|
|
db_conn = None
|
|
try:
|
|
db_conn = db.get_conn()
|
|
db.ensure_schema(db_conn)
|
|
tagger._run_pass(run_id, db_conn)
|
|
with _runs_lock:
|
|
_runs[str(run_id)]["state"] = "completed"
|
|
except SystemExit as e:
|
|
with _runs_lock:
|
|
_runs[str(run_id)]["state"] = "failed"
|
|
_runs[str(run_id)]["error"] = f"SystemExit({e.code})"
|
|
except Exception as e:
|
|
with _runs_lock:
|
|
_runs[str(run_id)]["state"] = "failed"
|
|
_runs[str(run_id)]["error"] = f"{type(e).__name__}: {e}"
|
|
finally:
|
|
if db_conn is not None:
|
|
try:
|
|
db_conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@app.post("/api/runs")
|
|
def start_run(user: User = Depends(require_admin)):
|
|
run_id = uuid.uuid4()
|
|
t = threading.Thread(target=_run_pass_in_thread, args=(run_id,), daemon=True)
|
|
t.start()
|
|
return {"run_id": str(run_id), "state": "running", "started_by": user.email or user.oid}
|
|
|
|
|
|
def _run_backfill_in_thread(run_id: uuid.UUID):
|
|
import main as tagger
|
|
|
|
with _runs_lock:
|
|
_runs[str(run_id)] = {"run_id": str(run_id), "state": "running", "error": None, "kind": "backfill"}
|
|
|
|
db_conn = None
|
|
try:
|
|
db_conn = db.get_conn()
|
|
db.ensure_schema(db_conn)
|
|
tagger._run_backfill(run_id, db_conn)
|
|
with _runs_lock:
|
|
_runs[str(run_id)]["state"] = "completed"
|
|
except SystemExit as e:
|
|
with _runs_lock:
|
|
_runs[str(run_id)]["state"] = "failed"
|
|
_runs[str(run_id)]["error"] = f"SystemExit({e.code})"
|
|
except Exception as e:
|
|
with _runs_lock:
|
|
_runs[str(run_id)]["state"] = "failed"
|
|
_runs[str(run_id)]["error"] = f"{type(e).__name__}: {e}"
|
|
finally:
|
|
if db_conn is not None:
|
|
try:
|
|
db_conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@app.post("/api/backfill")
|
|
def start_backfill(user: User = Depends(require_admin)):
|
|
"""
|
|
Walk the Box folder and mirror any existing marriottUsa metadata into the
|
|
local DB as `status='backfilled'` rows. Use this after first deploy (or
|
|
after restoring an empty DB) so the per-file skip check doesn't re-tag
|
|
files Box already has metadata for.
|
|
"""
|
|
run_id = uuid.uuid4()
|
|
t = threading.Thread(target=_run_backfill_in_thread, args=(run_id,), daemon=True)
|
|
t.start()
|
|
return {"run_id": str(run_id), "state": "running", "kind": "backfill", "started_by": user.email or user.oid}
|
|
|
|
|
|
@app.get("/api/runs")
|
|
def list_runs(user: User = Depends(require_auth), limit: int = Query(20, ge=1, le=100)):
|
|
"""Recent runs in the DB, plus the in-memory state if the run is still active."""
|
|
with _conn() as c:
|
|
with c.cursor(row_factory=dict_row) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT run_id,
|
|
min(created_at) AS started_at,
|
|
max(created_at) AS last_event_at,
|
|
count(*) AS events,
|
|
count(*) FILTER (WHERE status = 'success') AS successes,
|
|
count(*) FILTER (WHERE status LIKE '%%_error') AS errors
|
|
FROM tagging_events
|
|
GROUP BY run_id
|
|
ORDER BY max(created_at) DESC
|
|
LIMIT %s
|
|
""",
|
|
(limit,),
|
|
)
|
|
rows = cur.fetchall()
|
|
out = []
|
|
for r in rows:
|
|
rid = str(r["run_id"])
|
|
live = _runs.get(rid)
|
|
out.append({
|
|
"run_id": rid,
|
|
"started_at": r["started_at"].isoformat() if r["started_at"] else None,
|
|
"last_event_at": r["last_event_at"].isoformat() if r["last_event_at"] else None,
|
|
"events": r["events"],
|
|
"successes": r["successes"],
|
|
"errors": r["errors"],
|
|
"live_state": live["state"] if live else None,
|
|
"live_error": live["error"] if live else None,
|
|
})
|
|
return {"runs": out}
|
|
|
|
|
|
@app.get("/api/runs/{run_id}/events")
|
|
def run_events(run_id: str, user: User = Depends(require_auth), limit: int = Query(500, ge=1, le=2000)):
|
|
try:
|
|
uuid.UUID(run_id)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="run_id must be a UUID")
|
|
with _conn() as c:
|
|
with c.cursor(row_factory=dict_row) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT e.id, e.run_id, e.created_at, e.file_id, e.file_name, e.folder_path, e.media_type,
|
|
e.gemini_model, e.description, e.scenes, e.validated_metadata,
|
|
e.metadata_write_success, e.description_write_success,
|
|
e.scene_comment_write_success, e.status, e.error_message, e.duration_ms,
|
|
(fa.thumbnail_bytes IS NOT NULL) AS _has_thumbnail
|
|
FROM tagging_events e
|
|
LEFT JOIN file_assets fa ON fa.file_id = e.file_id
|
|
WHERE e.run_id = %s
|
|
ORDER BY e.created_at DESC
|
|
LIMIT %s
|
|
""",
|
|
(run_id, limit),
|
|
)
|
|
rows = cur.fetchall()
|
|
live = _runs.get(run_id)
|
|
return {
|
|
"run_id": run_id,
|
|
"live_state": live["state"] if live else None,
|
|
"live_error": live["error"] if live else None,
|
|
"count": len(rows),
|
|
"events": [_event_to_dict(r) for r in rows],
|
|
}
|