""" Postgres logging for the Marriott Box tagger. One row per file Gemini was called on (success or error). The DB is auxiliary — all functions swallow exceptions and print to stderr so a Postgres outage cannot stop the tagging pass. Box remains the source of truth. """ import json import os import sys from pathlib import Path import psycopg from psycopg.types.json import Jsonb SCHEMA_PATH = Path(__file__).parent / "schema.sql" INSERT_SQL = """ INSERT INTO tagging_events ( run_id, file_id, file_name, folder_path, media_type, gemini_model, prompt, raw_response, description, scenes, validated_metadata, metadata_write_success, description_write_success, scene_comment_write_success, status, error_message, duration_ms ) VALUES ( %(run_id)s, %(file_id)s, %(file_name)s, %(folder_path)s, %(media_type)s, %(gemini_model)s, %(prompt)s, %(raw_response)s, %(description)s, %(scenes)s, %(validated_metadata)s, %(metadata_write_success)s, %(description_write_success)s, %(scene_comment_write_success)s, %(status)s, %(error_message)s, %(duration_ms)s ) """ def _dsn(): dsn = os.getenv("DATABASE_URL") if not dsn: raise RuntimeError("DATABASE_URL not set") return dsn def get_conn(): """Open a Postgres connection. Caller owns close().""" return psycopg.connect(_dsn(), autocommit=True) def ensure_schema(conn): """Apply schema.sql idempotently.""" sql = SCHEMA_PATH.read_text() with conn.cursor() as cur: cur.execute(sql) _UPSERT_FILE_ASSET_SQL = """ INSERT INTO file_assets ( file_id, thumbnail_bytes, thumbnail_content_type, thumbnail_size, search_terms, updated_at ) VALUES ( %(file_id)s, %(thumbnail_bytes)s, %(thumbnail_content_type)s, %(thumbnail_size)s, %(search_terms)s, now() ) ON CONFLICT (file_id) DO UPDATE SET -- only overwrite the thumbnail when we have new bytes; preserves -- previously-captured thumbs across runs where the fetch failed. thumbnail_bytes = COALESCE(EXCLUDED.thumbnail_bytes, file_assets.thumbnail_bytes), thumbnail_content_type = COALESCE(EXCLUDED.thumbnail_content_type, file_assets.thumbnail_content_type), thumbnail_size = COALESCE(EXCLUDED.thumbnail_size, file_assets.thumbnail_size), search_terms = COALESCE(EXCLUDED.search_terms, file_assets.search_terms), updated_at = now() """ def upsert_file_asset( conn, *, file_id, thumbnail_bytes=None, thumbnail_content_type=None, thumbnail_size=None, search_terms=None, ): """ Idempotently insert/update the per-file row. Failures are swallowed — a thumbnail or search-blob hiccup must never stop a tagging pass. """ if conn is None or not file_id: return try: with conn.cursor() as cur: cur.execute(_UPSERT_FILE_ASSET_SQL, { "file_id": str(file_id), "thumbnail_bytes": thumbnail_bytes, "thumbnail_content_type": thumbnail_content_type, "thumbnail_size": thumbnail_size, "search_terms": search_terms, }) except Exception as e: print( f" WARN: DB upsert_file_asset failed ({type(e).__name__}: {e}) — continuing", file=sys.stderr, ) def get_thumbnail(conn, file_id): """Return (bytes, content_type) for the file's stored thumbnail, or None.""" if conn is None or not file_id: return None try: with conn.cursor() as cur: cur.execute( "SELECT thumbnail_bytes, thumbnail_content_type " "FROM file_assets WHERE file_id = %s AND thumbnail_bytes IS NOT NULL", (str(file_id),), ) row = cur.fetchone() if not row: return None return bytes(row[0]), row[1] except Exception as e: print( f" WARN: DB get_thumbnail failed ({type(e).__name__}: {e})", file=sys.stderr, ) return None def is_file_already_tagged(conn, file_id) -> bool: """ Skip-check oracle. A file counts as "already tagged" if we have any row in tagging_events for it with a terminal-good status — either a real Gemini-driven success or a backfilled row that mirrors Box's existing metadata. Error/validation rows do NOT count, so a previously failed file gets retried on the next pass. """ if conn is None or not file_id: return False try: with conn.cursor() as cur: cur.execute( "SELECT 1 FROM tagging_events " "WHERE file_id = %s AND status IN ('success','backfilled') LIMIT 1", (str(file_id),), ) return cur.fetchone() is not None except Exception as e: print( f" WARN: DB is_file_already_tagged failed ({type(e).__name__}: {e}) — assuming NOT tagged", file=sys.stderr, ) return False def _jsonable(value): if value is None: return None return Jsonb(value) def log_event( conn, *, run_id, file_id, file_name, folder_path, media_type, gemini_model, status, prompt=None, raw_response=None, description=None, scenes=None, validated_metadata=None, metadata_write_success=None, description_write_success=None, scene_comment_write_success=None, error_message=None, duration_ms=None, ): """ Insert one tagging_events row. Never raises — DB problems are reported to stderr and the tagger continues. """ if conn is None: return params = { "run_id": str(run_id), "file_id": str(file_id) if file_id is not None else None, "file_name": file_name, "folder_path": folder_path, "media_type": media_type, "gemini_model": gemini_model, "prompt": prompt, "raw_response": _jsonable(raw_response), "description": description, "scenes": _jsonable(scenes), "validated_metadata": _jsonable(validated_metadata), "metadata_write_success": metadata_write_success, "description_write_success": description_write_success, "scene_comment_write_success": scene_comment_write_success, "status": status, "error_message": error_message, "duration_ms": duration_ms, } try: with conn.cursor() as cur: cur.execute(INSERT_SQL, params) except Exception as e: print( f" WARN: DB log_event failed ({type(e).__name__}: {e}) — continuing", file=sys.stderr, )