marriott-box-image-video-ta.../main.py
DJP 9e6a75feb6 Manual-only runs, DB-based skip check, backfill-from-Box
Previously a nightly APScheduler container fired the tagger on every
file in the configured Box folder. With ~5000 files coming, that's
~5000 Box HTTP calls every night just to ask "is this tagged?". Move
to manual-only mode and source the skip decision from the local DB.

- `db.is_file_already_tagged(conn, file_id)` — returns True iff the
  DB has a row with status IN ('success','backfilled'). Used by both
  image and video loops in main.py instead of the previous
  `check_existing_metadata(box_client, file_id)` Box round-trip.
- `fetch_existing_metadata(box_client, file_id)` (main.py) — returns
  the user-defined template fields as a flat dict by stripping the
  Box `$id`/`$type`/etc. attrs from the SDK response.
- `_run_backfill(run_id, db_conn)` (main.py) — walks the Box folder
  and inserts a `status='backfilled'` row for every file Box already
  has marriottUsa metadata for. Read-only against Box; safe to re-run.
  Use this after first deploy, or to repopulate the DB from Box.
- `POST /api/backfill` mirrors `POST /api/runs` (background thread,
  same live-state record).
- SPA: new "Backfill from Box" button next to "Run now" (with a
  confirm dialog and a yellow `.status-backfilled` event treatment).
- docker-compose.yml: removed the `tagger` (scheduler) service.
  Manual triggers via the SPA / `POST /api/runs` only. scheduler.py
  stays in the repo for archival / opt-back-in.
- deploy.sh: readiness now checks the `api` container instead of
  `tagger`; `--logs` tails api logs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 15:41:10 -04:00

1184 lines
51 KiB
Python

"""
Marriott Box Asset Tagger
Processes images and videos in a Box folder, analyzes them with Gemini AI,
and writes structured metadata back to Box using the marriottUsa template.
Videos use Box's 480p MP4 proxy representations to minimize bandwidth.
"""
import io
import json
import os
import re
import sys
import time
import uuid
import requests
from dotenv import load_dotenv
from PIL import Image
import db
from box_sdk_gen import BoxClient, BoxJWTAuth, JWTConfig, BoxAPIError
from box_sdk_gen.managers.file_metadata import (
CreateFileMetadataByIdScope,
UpdateFileMetadataByIdScope,
UpdateFileMetadataByIdRequestBody,
UpdateFileMetadataByIdRequestBodyOpField,
)
from box_sdk_gen.managers.comments import CreateCommentItem, CreateCommentItemTypeField
from google import genai
from google.genai import types
# ── Configuration ────────────────────────────────────────────────────────────
BOX_FOLDER_ID = "380274488839"
METADATA_TEMPLATE_KEY = "marriottUsa"
METADATA_SCOPE = "enterprise"
GEMINI_MODEL = "gemini-2.5-flash"
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".webm", ".mpg", ".mpeg", ".m4v", ".3gp"}
EXCLUDED_FOLDER_PREFIXES = ("z_", "zz_", "zzz_")
MAX_IMAGE_SIZE = 1000 # longest side in pixels
GEMINI_DELAY = 7 # seconds between Gemini calls
GEMINI_VIDEO_DELAY = 10 # longer delay for video (larger token usage)
VIDEO_SIZE_LIMIT_INLINE = 20 * 1024 * 1024 # 20MB — below this, send inline; above, use File API
VIDEO_SOURCE_SIZE_LIMIT = 5 * 1024 * 1024 * 1024 # 5GB — skip videos whose source file exceeds this
VIDEO_PROXY_SIZE_LIMIT = 400 * 1024 * 1024 # 400MB — skip videos whose 480p proxy exceeds this (≈ >60 min runtime)
VIDEO_SKIP = "VIDEO_SKIP" # sentinel returned by download_video_proxy for intentional (non-error) skips
SKIP_ALREADY_TAGGED = True
DESCRIPTION_MAX_LENGTH = 255
# Per-run limiters: protect against runaway cost / time on a sudden large upload.
# Counts only NEWLY-tagged files (skipped-as-already-tagged is free and doesn't count).
# Shared across images and videos. When either cap is hit, the run exits cleanly
# with a summary; the next scheduled run picks up the remaining untagged files.
MAX_FILES_PER_RUN = 200 # hard cap on newly-tagged files per run
MAX_RUN_DURATION = 4 * 3600 # hard cap in seconds (4 hours; stays well under the 6h systemd timeout)
# ── 1. Box Client ────────────────────────────────────────────────────────────
def init_box_client():
"""Authenticate with Box via JWT and return a BoxClient."""
config_path = os.path.join(os.path.dirname(__file__), "box_config.json")
if not os.path.exists(config_path):
print(f"ERROR: Box JWT config not found at {config_path}")
sys.exit(1)
jwt_config = JWTConfig.from_config_file(config_file_path=config_path)
auth = BoxJWTAuth(config=jwt_config)
client = BoxClient(auth=auth)
user = client.users.get_user_me()
print(f"Authenticated as Box service account: {user.name} (ID: {user.id})")
return client
# ── 2. Gemini Client ─────────────────────────────────────────────────────────
def init_gemini_client():
"""Load API key from .env and return a genai Client."""
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
print("ERROR: GEMINI_API_KEY not set in .env")
sys.exit(1)
client = genai.Client(api_key=api_key)
print("Gemini client initialized")
return client
# ── 3. Fetch Template Schema ─────────────────────────────────────────────────
def fetch_template_schema(box_client):
"""
Fetch the marriottUsa metadata template definition from Box.
Returns dict: field_key -> {display_name, type, options}
"""
try:
template = box_client.metadata_templates.get_metadata_template(
scope=METADATA_SCOPE,
template_key=METADATA_TEMPLATE_KEY,
)
except BoxAPIError as e:
print(f"ERROR: Could not fetch template '{METADATA_TEMPLATE_KEY}': {e}")
sys.exit(1)
schema = {}
for field in template.fields:
field_info = {
"display_name": field.display_name,
"type": field.type.value if hasattr(field.type, "value") else str(field.type),
"options": [],
}
if field.options:
field_info["options"] = [opt.key for opt in field.options]
schema[field.key] = field_info
print(f"Template '{METADATA_TEMPLATE_KEY}' loaded — {len(schema)} fields:")
for key, info in schema.items():
opts = f" ({len(info['options'])} options)" if info["options"] else ""
print(f"{info['display_name']} [{info['type']}]{opts}")
return schema
# ── 4. Build Gemini Prompt ────────────────────────────────────────────────────
def _build_field_lines(template_schema):
"""Build the FIELDS section lines shared by image and video prompts."""
lines = []
for key, info in template_schema.items():
field_type = info["type"]
display = info["display_name"]
options = info["options"]
if field_type == "enum" and options:
lines.append(f' "{key}" ({display}): enum — choose ONE from: {json.dumps(options)}')
elif field_type == "multiSelect" and options:
lines.append(f' "{key}" ({display}): multiSelect — choose any from: {json.dumps(options)}')
elif field_type == "string":
lines.append(f' "{key}" ({display}): string — free text')
elif field_type == "float":
lines.append(f' "{key}" ({display}): float — numeric value')
else:
lines.append(f' "{key}" ({display}): {field_type}')
return lines
def _build_context_lines(file_name, folder_path):
"""Build FILE CONTEXT lines from filename and folder path."""
lines = [
"",
"FILE CONTEXT (use this to help identify the brand, property, and destination):",
f' Filename: "{file_name}"',
]
if folder_path:
lines.append(f' Folder path: "{folder_path}"')
lines.append(" Use the folder names and filename to infer the hotel brand, property name, and location when not obvious from the visual content alone.")
return lines
def build_gemini_prompt(template_schema, file_name="", folder_path=""):
"""
Dynamically build a Gemini prompt from the template schema.
Includes file context (filename/folder) to help identify brand/property.
"""
lines = [
"You are an expert image analyst for Marriott's digital asset library.",
"Analyze this image and return a JSON object with the following fields.",
"Also include a 'description' field that combines a human-readable summary with a search-keyword tail (see format below).",
"",
"RULES:",
"- Return ONLY valid JSON, no markdown, no explanation.",
"- For enum fields, pick exactly ONE value from the allowed list, or null if none fit.",
"- For multiSelect fields, return an array of matching values from the allowed list (can be empty).",
"- For string fields, write a concise relevant value, or null if not applicable.",
"- Be accurate. Only tag what you can clearly see in the image.",
"- Use the file context below to help identify the hotel brand, property, and destination.",
"",
"FIELDS:",
]
lines.extend(_build_field_lines(template_schema))
lines.append("")
lines.append(' "description": string — formatted as "<summary sentence>. <keyword tail>."')
lines.append(' Part 1 (summary): a natural human-readable sentence describing the image (~140 characters max).')
lines.append(' Part 2 (keyword tail): a comma-separated list of search synonyms and broader category terms (~100 characters max).')
lines.append(' The two parts together MUST be 255 characters or fewer total.')
lines.append(' KEYWORD STRATEGY: include synonyms and broader category terms for the main concepts shown — NOT adjacent or contextual terms.')
lines.append(' Example: a buffet image → "buffet, food, dining, eating, meal, breakfast, restaurant" (YES). Do NOT add "hotel, guest, morning" (NO — those are contextual, not synonyms).')
lines.append(' Cover synonym families for whatever is shown: food/dining (food, dining, eating, meal, restaurant, cafe), spaces (room, suite, lobby, bathroom, balcony, terrace, patio), wellness (pool, swimming, gym, fitness, spa, wellness), events (meeting, conference, wedding, ballroom, banquet), settings (beach, oceanfront, waterfront, garden, view, vista), people (couple, family, kids, guest), and time/mood (sunset, evening, morning, night) — only when present in the image.')
lines.append(' Full example: "Couple enjoying breakfast on the terrace overlooking the bay. Dining, eating, food, meal, breakfast, terrace, balcony, patio, view, ocean, waterfront, couple."')
lines.extend(_build_context_lines(file_name, folder_path))
lines.append("")
lines.append("Return ONLY the JSON object.")
return "\n".join(lines)
# ── 4b. Build Video Prompt ───────────────────────────────────────────────────
def build_video_prompt(template_schema, file_name="", folder_path=""):
"""
Dynamically build a Gemini prompt for video analysis from the template schema.
Includes file context and instructs Gemini to consider the full video.
"""
lines = [
"You are an expert video analyst for Marriott's digital asset library.",
"Analyze this video in its entirety — consider all scenes, transitions, and content throughout the full duration.",
"Return a JSON object with the following fields.",
"Also include a 'description' field that combines a human-readable summary with a search-keyword tail (see format below).",
"",
"RULES:",
"- Return ONLY valid JSON, no markdown, no explanation.",
"- For enum fields, pick exactly ONE value from the allowed list, or null if none fit.",
"- For multiSelect fields, return an array of matching values from the allowed list (can be empty).",
"- For string fields, write a concise relevant value, or null if not applicable.",
"- Be accurate. Only tag what you can clearly see or hear in the video.",
"- Base your analysis on the overall content and theme of the video, not just a single frame.",
"- Use the file context below to help identify the hotel brand, property, and destination.",
"",
"FIELDS:",
]
lines.extend(_build_field_lines(template_schema))
lines.append("")
lines.append(' "description": string — formatted as "<summary sentence>. <keyword tail>."')
lines.append(' Part 1 (summary): a natural human-readable sentence describing the video overall (~140 characters max).')
lines.append(' Part 2 (keyword tail): a comma-separated list of search synonyms and broader category terms covering content across the whole video (~100 characters max).')
lines.append(' The two parts together MUST be 255 characters or fewer total.')
lines.append(' KEYWORD STRATEGY: include synonyms and broader category terms for the main concepts shown across the video — NOT adjacent or contextual terms.')
lines.append(' Example: a video featuring a buffet → "buffet, food, dining, eating, meal, breakfast, restaurant" (YES). Do NOT add "hotel, guest, morning" (NO — those are contextual, not synonyms).')
lines.append(' Cover synonym families for what is shown: food/dining (food, dining, eating, meal, restaurant, cafe), spaces (room, suite, lobby, bathroom, balcony, terrace, patio), wellness (pool, swimming, gym, fitness, spa, wellness), events (meeting, conference, wedding, ballroom, banquet), settings (beach, oceanfront, waterfront, garden, view, vista), people (couple, family, kids, guest), and time/mood (sunset, evening, morning, night) — only when present in the video.')
lines.append(' Full example: "Couple enjoys breakfast on the terrace then walks the beach at sunset. Dining, eating, food, breakfast, terrace, beach, oceanfront, sunset, couple, walk."')
lines.append(' "scenes": array of strings — HIGH-LEVEL timestamped scene breakdown.')
lines.append(' Each entry format: "M:SS-M:SS keywords describing scene"')
lines.append(' HARD LIMIT: Never exceed 25 scenes. Aim for 5-20 scenes regardless of video length.')
lines.append(' MINIMUM DURATION: Each scene should span at least 30 seconds of content. Merge shorter moments into the surrounding scene.')
lines.append(' WHAT COUNTS AS A NEW SCENE: Only split when the location, subject, OR primary activity fundamentally changes.')
lines.append(' Same person talking in the same place = ONE scene even if topics shift.')
lines.append(' Same activity in the same setting = ONE scene even if camera angles change.')
lines.append(' DO NOT split for: camera cuts, slight movements, continuation of the same interview/activity/tour.')
lines.append(' Be descriptive with keywords — incomplete sentences are fine.')
lines.append(' Focus on searchable terms: locations, activities, people, objects, mood.')
lines.append(' Example for a 20-min video: ["0:00-2:30 hotel breakfast buffet coffee pastries morning routine", "2:31-5:45 hotel gym treadmill weights workout", "5:46-8:20 hotel room tour bed desk bathroom closet", "8:21-14:00 arena floor volleyball practice drills warmup", "14:01-18:30 fan interviews championship banner trophy poses", "18:31-20:00 arena wide shots jumbotron match highlights"]')
lines.extend(_build_context_lines(file_name, folder_path))
lines.append("")
lines.append("Return ONLY the JSON object.")
return "\n".join(lines)
# ── 5. List Image Files ──────────────────────────────────────────────────────
def _list_folder_media(box_client, folder_id, folder_path=""):
"""List all image and video files in a single Box folder (with pagination)."""
image_files = []
video_files = []
subfolders = []
offset = 0
limit = 100
while True:
items = box_client.folders.get_folder_items(
folder_id=folder_id,
limit=limit,
offset=offset,
)
if not items.entries:
break
for item in items.entries:
if hasattr(item.type, 'value'):
item_type = item.type.value
else:
item_type = str(item.type)
if item_type == "folder":
if item.name.lower().startswith(EXCLUDED_FOLDER_PREFIXES):
print(f" Skipping excluded folder: {item.name}")
else:
subfolders.append({"id": item.id, "name": item.name})
elif item_type == "file":
ext = os.path.splitext(item.name)[1].lower()
file_entry = {"id": item.id, "name": item.name, "folder_path": folder_path}
if ext in IMAGE_EXTENSIONS:
image_files.append(file_entry)
elif ext in VIDEO_EXTENSIONS:
video_files.append(file_entry)
if len(items.entries) < limit:
break
offset += limit
return image_files, video_files, subfolders
def list_all_media(box_client):
"""
Recursively list all image and video files in the target Box folder tree.
Skips folders whose names start with excluded prefixes. Handles pagination.
Returns (image_files, video_files) — each a list of {id, name, folder_path} dicts.
"""
all_images = []
all_videos = []
def _recurse(folder_id, folder_path="", depth=0):
indent = " " * depth
images, videos, subfolders = _list_folder_media(box_client, folder_id, folder_path)
if depth == 0:
print(f"Root folder: {len(images)} images, {len(videos)} videos, {len(subfolders)} subfolders")
else:
print(f"{indent}Found {len(images)} images, {len(videos)} videos, {len(subfolders)} subfolders")
all_images.extend(images)
all_videos.extend(videos)
for folder in subfolders:
print(f"{indent} Scanning subfolder: {folder['name']}...")
child_path = f"{folder_path}/{folder['name']}" if folder_path else folder['name']
_recurse(folder["id"], child_path, depth + 1)
_recurse(BOX_FOLDER_ID)
print(f"Found {len(all_images)} total image files, {len(all_videos)} total video files")
return all_images, all_videos
# ── 6. Download and Resize Image ─────────────────────────────────────────────
def download_and_resize_image(box_client, file_id, file_name):
"""
Download file from Box, resize with Pillow (max 1000px longest side).
Returns (bytes, mime_type) or None on failure.
"""
try:
stream = box_client.downloads.download_file(file_id=file_id)
raw_bytes = stream.read()
img = Image.open(io.BytesIO(raw_bytes))
img.thumbnail((MAX_IMAGE_SIZE, MAX_IMAGE_SIZE), Image.Resampling.LANCZOS)
# Convert to RGB if necessary (e.g. RGBA PNGs)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
resized_bytes = buffer.getvalue()
return resized_bytes, "image/jpeg"
except Exception as e:
print(f" ERROR downloading/resizing '{file_name}': {e}")
return None
# ── 6b. Download Video Proxy ─────────────────────────────────────────────────
def download_video_proxy(box_client, file_id, file_name):
"""
Download Box's 480p MP4 representation of a video file.
Returns (bytes, "video/mp4") or None if proxy not ready, source/proxy too large, or other failure.
Skips videos whose source exceeds VIDEO_SOURCE_SIZE_LIMIT or whose proxy exceeds VIDEO_PROXY_SIZE_LIMIT
(proxy size is a proxy-no-pun-intended for runtime; >400MB ≈ >60 min, beyond Gemini's context budget).
"""
try:
# Source size gate (catches obvious mega-files before requesting representations)
file_info = box_client.files.get_file_by_id(
file_id=file_id,
fields=["size", "representations"],
x_rep_hints="[mp4]",
)
source_size = getattr(file_info, "size", None)
if source_size and source_size > VIDEO_SOURCE_SIZE_LIMIT:
print(f" Skipping — source file size {source_size / (1024*1024):.0f} MB exceeds limit of {VIDEO_SOURCE_SIZE_LIMIT / (1024*1024):.0f} MB")
return VIDEO_SKIP
mp4_entry = None
if file_info.representations and file_info.representations.entries:
for entry in file_info.representations.entries:
if entry.representation == "mp4":
mp4_entry = entry
break
if mp4_entry is None:
print(f" No MP4 representation available — skipping (re-run later)")
return None
# Poll until representation is ready (up to 60s)
state = mp4_entry.status.state if mp4_entry.status else None
if hasattr(state, "value"):
state = state.value
retries = 0
while state in ("pending", "none") and retries < 12:
print(f" MP4 representation is '{state}' — waiting 5s (attempt {retries + 1}/12)...")
time.sleep(5)
retries += 1
file_info = box_client.files.get_file_by_id(
file_id=file_id,
fields=["representations"],
x_rep_hints="[mp4]",
)
for entry in file_info.representations.entries:
if entry.representation == "mp4":
mp4_entry = entry
break
state = mp4_entry.status.state if mp4_entry.status else None
if hasattr(state, "value"):
state = state.value
if state not in ("success", "viewable"):
print(f" MP4 proxy not ready (state: '{state}') — skipping (re-run later)")
return None
# Build download URL from template
url_template = mp4_entry.content.url_template
download_url = url_template.replace("{+asset_path}", "")
# Proxy size gate via HEAD request (avoids downloading hundreds of MB just to skip)
auth_header = box_client.auth.retrieve_authorization_header()
head_resp = requests.head(download_url, headers={"Authorization": auth_header}, allow_redirects=True)
proxy_size = int(head_resp.headers.get("Content-Length", 0))
if proxy_size and proxy_size > VIDEO_PROXY_SIZE_LIMIT:
print(f" Skipping — 480p proxy {proxy_size / (1024*1024):.0f} MB exceeds limit of {VIDEO_PROXY_SIZE_LIMIT / (1024*1024):.0f} MB (likely >60 min runtime, beyond Gemini context budget)")
return VIDEO_SKIP
# Download with auth
resp = requests.get(download_url, headers={"Authorization": auth_header})
resp.raise_for_status()
print(f" Downloaded MP4 proxy: {len(resp.content) / (1024*1024):.1f} MB")
return resp.content, "video/mp4"
except Exception as e:
print(f" ERROR downloading video proxy for '{file_name}': {e}")
return None
# ── 7. Analyze Image with Gemini ──────────────────────────────────────────────
def analyze_image_with_gemini(gemini_client, image_bytes, mime_type, prompt):
"""
Send image + prompt to Gemini, parse JSON response.
Returns dict or None.
"""
try:
response = gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[
prompt,
types.Part.from_bytes(data=image_bytes, mime_type=mime_type),
],
)
return _parse_gemini_json(response.text)
except Exception as e:
print(f" ERROR from Gemini: {e}")
return None
# ── 7b. Analyze Video with Gemini ─────────────────────────────────────────────
def _parse_gemini_json(text):
"""Parse JSON from Gemini response text, stripping markdown fences if present."""
if not text:
print(f" WARNING: Gemini returned no response (likely too large or processing failed)")
return None
text = text.strip()
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r"\{[\s\S]*\}", text)
if match:
return json.loads(match.group())
print(f" WARNING: Could not parse Gemini response as JSON")
return None
def analyze_video_with_gemini(gemini_client, video_bytes, mime_type, prompt):
"""
Send video + prompt to Gemini, parse JSON response.
Uses inline bytes for videos < 20MB, File API for larger ones.
Returns dict or None.
"""
try:
if len(video_bytes) < VIDEO_SIZE_LIMIT_INLINE:
# Small video — send inline
print(f" Sending video inline ({len(video_bytes) / (1024*1024):.1f} MB)")
response = gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[
prompt,
types.Part.from_bytes(data=video_bytes, mime_type=mime_type),
],
)
else:
# Large video — use File API
print(f" Uploading video via File API ({len(video_bytes) / (1024*1024):.1f} MB)...")
uploaded_file = gemini_client.files.upload(
file=io.BytesIO(video_bytes),
config=types.UploadFileConfig(mime_type=mime_type),
)
# Poll until file is ACTIVE
retries = 0
while uploaded_file.state.name == "PROCESSING" and retries < 30:
print(f" File API processing... (attempt {retries + 1}/30)")
time.sleep(5)
uploaded_file = gemini_client.files.get(name=uploaded_file.name)
retries += 1
if uploaded_file.state.name != "ACTIVE":
print(f" ERROR: File API state is '{uploaded_file.state.name}' — aborting")
try:
gemini_client.files.delete(name=uploaded_file.name)
except Exception:
pass
return None
response = gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[prompt, uploaded_file],
)
# Clean up uploaded file
try:
gemini_client.files.delete(name=uploaded_file.name)
except Exception:
pass
return _parse_gemini_json(response.text)
except Exception as e:
print(f" ERROR from Gemini (video): {e}")
return None
# ── 8. Validate and Clean Metadata ────────────────────────────────────────────
def validate_and_clean_metadata(raw_metadata, template_schema):
"""
Validate Gemini output against the template schema.
- Enum: must match an allowed option (case-insensitive)
- MultiSelect: filter to only allowed options
- String: pass through
- Drop unknown keys
"""
cleaned = {}
for key, info in template_schema.items():
if key not in raw_metadata:
continue
value = raw_metadata[key]
field_type = info["type"]
options = info["options"]
if value is None:
continue
if field_type == "enum" and options:
# Case-insensitive match
options_lower = {o.lower(): o for o in options}
if isinstance(value, str) and value.lower() in options_lower:
cleaned[key] = options_lower[value.lower()]
else:
print(f" WARNING: Invalid enum value '{value}' for field '{key}' — dropped")
elif field_type == "multiSelect" and options:
if isinstance(value, list):
options_lower = {o.lower(): o for o in options}
valid = [options_lower[v.lower()] for v in value if isinstance(v, str) and v.lower() in options_lower]
if valid:
cleaned[key] = valid
invalid = [v for v in value if isinstance(v, str) and v.lower() not in options_lower]
if invalid:
print(f" WARNING: Invalid multiSelect values for '{key}': {invalid} — dropped")
else:
print(f" WARNING: Expected list for multiSelect '{key}', got {type(value).__name__} — dropped")
elif field_type == "string":
if isinstance(value, str) and value.strip():
cleaned[key] = value.strip()
elif field_type == "float":
try:
cleaned[key] = float(value)
except (ValueError, TypeError):
print(f" WARNING: Invalid float value '{value}' for field '{key}' — dropped")
return cleaned
# ── 9. Check Existing Metadata ────────────────────────────────────────────────
# Box-managed keys returned alongside template fields in a metadata response.
# After `from_dict`, these become regular attribute names on the response object.
_BOX_META_SYSTEM_ATTRS = {
"id", "type", "type_version", "parent", "template", "scope", "version",
"can_edit",
}
def check_existing_metadata(box_client, file_id):
"""Check if file already has marriottUsa metadata. Returns True/False.
Kept for the backfill code path, which still asks Box. The per-pass skip
check in the main loops now uses the local DB instead (see db.is_file_already_tagged).
"""
try:
box_client.file_metadata.get_file_metadata_by_id(
file_id=file_id,
scope=CreateFileMetadataByIdScope.ENTERPRISE,
template_key=METADATA_TEMPLATE_KEY,
)
return True
except BoxAPIError as e:
if e.response_info.status_code == 404:
return False
raise
def fetch_existing_metadata(box_client, file_id):
"""
Fetch the file's marriottUsa metadata from Box and return a flat dict of
user-defined fields (no `$id`/`$scope`/etc.). Returns {} if metadata exists
but has no user fields, None if Box returns 404, raises on other errors.
"""
try:
resp = box_client.file_metadata.get_file_metadata_by_id(
file_id=file_id,
scope=CreateFileMetadataByIdScope.ENTERPRISE,
template_key=METADATA_TEMPLATE_KEY,
)
except BoxAPIError as e:
if e.response_info.status_code == 404:
return None
raise
out = {}
for k, v in vars(resp).items():
if k.startswith("_"):
continue
if k in _BOX_META_SYSTEM_ATTRS:
continue
out[k] = v
return out
def fetch_file_description(box_client, file_id):
"""Return the file's Box description (string) or None if unavailable / empty."""
try:
f = box_client.files.get_file_by_id(file_id=file_id, fields=["description"])
desc = getattr(f, "description", None)
return desc if isinstance(desc, str) and desc else None
except BoxAPIError:
return None
# ── 10. Write Metadata to Box ─────────────────────────────────────────────────
def write_metadata_to_box(box_client, file_id, metadata, file_name):
"""
Create metadata on file. On 409 conflict, fall back to update
using JSON Patch ADD operations.
"""
try:
box_client.file_metadata.create_file_metadata_by_id(
file_id=file_id,
scope=CreateFileMetadataByIdScope.ENTERPRISE,
template_key=METADATA_TEMPLATE_KEY,
request_body=metadata,
)
print(f" Metadata CREATED on '{file_name}'")
return True
except BoxAPIError as e:
if e.response_info.status_code == 409:
# Metadata already exists — update with JSON Patch
try:
box_client.file_metadata.update_file_metadata_by_id(
file_id=file_id,
scope=UpdateFileMetadataByIdScope.ENTERPRISE,
template_key=METADATA_TEMPLATE_KEY,
request_body=[
UpdateFileMetadataByIdRequestBody(
op=UpdateFileMetadataByIdRequestBodyOpField.ADD,
path=f"/{key}",
value=value,
)
for key, value in metadata.items()
],
)
print(f" Metadata UPDATED on '{file_name}'")
return True
except BoxAPIError as update_err:
print(f" ERROR updating metadata on '{file_name}': {update_err}")
return False
else:
print(f" ERROR creating metadata on '{file_name}': {e}")
return False
# ── 11. Write Description to Box ──────────────────────────────────────────────
def write_description_to_box(box_client, file_id, description, file_name):
"""Write a short AI summary to the Box file description field (max 255 chars)."""
try:
truncated = description[:DESCRIPTION_MAX_LENGTH]
box_client.files.update_file_by_id(
file_id=file_id,
description=truncated,
)
print(f" Description written on '{file_name}'")
return True
except BoxAPIError as e:
print(f" ERROR writing description on '{file_name}': {e}")
return False
# ── 12. Write Scene Breakdown Comment ────────────────────────────────────────
def write_scene_comment_to_box(box_client, file_id, scenes, file_name):
"""Write timestamped scene breakdown as a comment on the Box file.
Accepts a list of scene strings or a single semicolon-separated string.
"""
try:
if isinstance(scenes, list):
scene_lines = "\n".join(scenes)
else:
scene_lines = scenes.replace("; ", "\n")
message = f"Scene breakdown:\n{scene_lines}"
box_client.comments.create_comment(
message=message,
item=CreateCommentItem(
id=file_id,
type=CreateCommentItemTypeField.FILE,
),
)
print(f" Scene comment written on '{file_name}'")
return True
except BoxAPIError as e:
print(f" ERROR writing scene comment on '{file_name}': {e}")
return False
# ── 13. Main Pipeline ─────────────────────────────────────────────────────────
def main():
print("=" * 60)
print("Marriott Box Asset Tagger")
print("=" * 60)
run_id = uuid.uuid4()
print(f"Run ID: {run_id}")
# Open DB connection (best-effort: DB is auxiliary, never blocks the pass).
db_conn = None
try:
db_conn = db.get_conn()
db.ensure_schema(db_conn)
print("Postgres logging enabled.")
except Exception as e:
print(f"WARN: Postgres unavailable ({type(e).__name__}: {e}) — continuing without DB logging.")
try:
_run_pass(run_id, db_conn)
finally:
if db_conn is not None:
try:
db_conn.close()
except Exception:
pass
def _run_pass(run_id, db_conn):
# Initialize clients
box_client = init_box_client()
gemini_client = init_gemini_client()
# Fetch template schema (prompts are built per-file to include context)
template_schema = fetch_template_schema(box_client)
# List all media files
image_files, video_files = list_all_media(box_client)
if not image_files and not video_files:
print("No media files found. Exiting.")
return
# Per-run limiter state
run_start = time.monotonic()
cap_hit_reason = None # set to a string when cap is hit; loops break cleanly
def cap_check(newly_tagged_count):
"""Return a reason string if a cap is hit, else None."""
if newly_tagged_count >= MAX_FILES_PER_RUN:
return f"file cap reached ({newly_tagged_count}/{MAX_FILES_PER_RUN} newly-tagged)"
elapsed = time.monotonic() - run_start
if elapsed >= MAX_RUN_DURATION:
return f"time cap reached ({elapsed/3600:.1f}h / {MAX_RUN_DURATION/3600:.1f}h)"
return None
# ── Process Images ───────────────────────────────────────────────────────
img_total = len(image_files)
img_tagged = 0
img_skipped = 0
img_errored = 0
img_unprocessed = 0 # remaining when cap hit
if image_files:
print(f"\n{'' * 60}")
print(f"PROCESSING {img_total} IMAGES")
print(f"{'' * 60}")
for i, file_info in enumerate(image_files, 1):
# Cap check — exit cleanly before doing any new work
cap_hit_reason = cap_check(img_tagged)
if cap_hit_reason:
img_unprocessed = img_total - i + 1
print(f"\nRun cap hit ({cap_hit_reason}) — {img_unprocessed} images remain. Will resume on next run.")
break
file_id = file_info["id"]
file_name = file_info["name"]
folder_path = file_info.get("folder_path", "")
print(f"\n[Image {i}/{img_total}] Processing: {file_name} (ID: {file_id})")
if folder_path:
print(f" Folder: {folder_path}")
# Skip if the DB already has a success/backfilled row for this file.
# If the DB is empty/lost, run the backfill flow first to repopulate
# from Box — that avoids re-tagging files Box already has metadata for.
if SKIP_ALREADY_TAGGED and db.is_file_already_tagged(db_conn, file_id):
print(f" Already in DB — skipping")
img_skipped += 1
continue
# Download and resize
result = download_and_resize_image(box_client, file_id, file_name)
if result is None:
img_errored += 1
continue
image_bytes, mime_type = result
# Build per-file prompt with context
image_prompt = build_gemini_prompt(template_schema, file_name, folder_path)
# Analyze with Gemini
gemini_started = time.monotonic()
raw_metadata = analyze_image_with_gemini(gemini_client, image_bytes, mime_type, image_prompt)
gemini_elapsed_ms = int((time.monotonic() - gemini_started) * 1000)
if raw_metadata is None:
img_errored += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="image", gemini_model=GEMINI_MODEL,
prompt=image_prompt, status="gemini_error",
error_message="analyze_image_with_gemini returned None",
duration_ms=gemini_elapsed_ms,
)
continue
# Extract description before validation
description = raw_metadata.pop("description", None)
# Snapshot the raw Gemini response (including description) for the DB.
raw_for_log = dict(raw_metadata)
if description is not None:
raw_for_log["description"] = description
# Validate and clean
cleaned_metadata = validate_and_clean_metadata(raw_metadata, template_schema)
if not cleaned_metadata:
print(f" WARNING: No valid metadata fields after validation — skipping")
img_errored += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="image", gemini_model=GEMINI_MODEL,
prompt=image_prompt, raw_response=raw_for_log,
description=description if isinstance(description, str) else None,
validated_metadata=cleaned_metadata,
status="validation_error",
error_message="No valid metadata fields after validation",
duration_ms=gemini_elapsed_ms,
)
continue
print(f" Metadata: {json.dumps(cleaned_metadata, indent=2)}")
# Write metadata to Box
if not write_metadata_to_box(box_client, file_id, cleaned_metadata, file_name):
img_errored += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="image", gemini_model=GEMINI_MODEL,
prompt=image_prompt, raw_response=raw_for_log,
description=description if isinstance(description, str) else None,
validated_metadata=cleaned_metadata,
metadata_write_success=False,
status="metadata_write_error",
error_message="write_metadata_to_box returned False",
duration_ms=gemini_elapsed_ms,
)
continue
# Write description to Box
description_write_success = None
if description and isinstance(description, str):
description_write_success = write_description_to_box(box_client, file_id, description, file_name)
img_tagged += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="image", gemini_model=GEMINI_MODEL,
prompt=image_prompt, raw_response=raw_for_log,
description=description if isinstance(description, str) else None,
validated_metadata=cleaned_metadata,
metadata_write_success=True,
description_write_success=description_write_success,
status="success",
duration_ms=gemini_elapsed_ms,
)
# Rate limit delay (skip after last file)
if i < img_total or video_files:
print(f" Waiting {GEMINI_DELAY}s (rate limit)...")
time.sleep(GEMINI_DELAY)
# ── Process Videos ───────────────────────────────────────────────────────
vid_total = len(video_files)
vid_tagged = 0
vid_skipped = 0
vid_errored = 0
vid_unprocessed = 0
if video_files:
print(f"\n{'' * 60}")
print(f"PROCESSING {vid_total} VIDEOS")
print(f"{'' * 60}")
for i, file_info in enumerate(video_files, 1):
# Cap check (shared with images) — exit cleanly before doing any new work
cap_hit_reason = cap_check(img_tagged + vid_tagged)
if cap_hit_reason:
vid_unprocessed = vid_total - i + 1
print(f"\nRun cap hit ({cap_hit_reason}) — {vid_unprocessed} videos remain. Will resume on next run.")
break
file_id = file_info["id"]
file_name = file_info["name"]
folder_path = file_info.get("folder_path", "")
print(f"\n[Video {i}/{vid_total}] Processing: {file_name} (ID: {file_id})")
if folder_path:
print(f" Folder: {folder_path}")
# Skip if the DB already has a success/backfilled row for this file.
if SKIP_ALREADY_TAGGED and db.is_file_already_tagged(db_conn, file_id):
print(f" Already in DB — skipping")
vid_skipped += 1
continue
# Download video proxy (480p MP4)
result = download_video_proxy(box_client, file_id, file_name)
if result == VIDEO_SKIP:
vid_skipped += 1
continue
if result is None:
vid_errored += 1
continue
video_bytes, mime_type = result
# Build per-file prompt with context
video_prompt = build_video_prompt(template_schema, file_name, folder_path)
# Analyze with Gemini
gemini_started = time.monotonic()
raw_metadata = analyze_video_with_gemini(gemini_client, video_bytes, mime_type, video_prompt)
gemini_elapsed_ms = int((time.monotonic() - gemini_started) * 1000)
if raw_metadata is None:
vid_errored += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="video", gemini_model=GEMINI_MODEL,
prompt=video_prompt, status="gemini_error",
error_message="analyze_video_with_gemini returned None",
duration_ms=gemini_elapsed_ms,
)
continue
# Extract description and scenes before validation
description = raw_metadata.pop("description", None)
scenes = raw_metadata.pop("scenes", None)
raw_for_log = dict(raw_metadata)
if description is not None:
raw_for_log["description"] = description
if scenes is not None:
raw_for_log["scenes"] = scenes
# Validate and clean
cleaned_metadata = validate_and_clean_metadata(raw_metadata, template_schema)
if not cleaned_metadata:
print(f" WARNING: No valid metadata fields after validation — skipping")
vid_errored += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="video", gemini_model=GEMINI_MODEL,
prompt=video_prompt, raw_response=raw_for_log,
description=description if isinstance(description, str) else None,
scenes=scenes,
validated_metadata=cleaned_metadata,
status="validation_error",
error_message="No valid metadata fields after validation",
duration_ms=gemini_elapsed_ms,
)
continue
print(f" Metadata: {json.dumps(cleaned_metadata, indent=2)}")
if scenes:
print(f" Scenes: {scenes}")
# Write metadata to Box
if not write_metadata_to_box(box_client, file_id, cleaned_metadata, file_name):
vid_errored += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="video", gemini_model=GEMINI_MODEL,
prompt=video_prompt, raw_response=raw_for_log,
description=description if isinstance(description, str) else None,
scenes=scenes,
validated_metadata=cleaned_metadata,
metadata_write_success=False,
status="metadata_write_error",
error_message="write_metadata_to_box returned False",
duration_ms=gemini_elapsed_ms,
)
continue
# Write description to Box
description_write_success = None
if description and isinstance(description, str):
description_write_success = write_description_to_box(box_client, file_id, description, file_name)
# Write scene breakdown as comment
scene_comment_write_success = None
if scenes and isinstance(scenes, (str, list)):
scene_comment_write_success = write_scene_comment_to_box(box_client, file_id, scenes, file_name)
vid_tagged += 1
db.log_event(
db_conn, run_id=run_id, file_id=file_id, file_name=file_name,
folder_path=folder_path, media_type="video", gemini_model=GEMINI_MODEL,
prompt=video_prompt, raw_response=raw_for_log,
description=description if isinstance(description, str) else None,
scenes=scenes,
validated_metadata=cleaned_metadata,
metadata_write_success=True,
description_write_success=description_write_success,
scene_comment_write_success=scene_comment_write_success,
status="success",
duration_ms=gemini_elapsed_ms,
)
# Rate limit delay (skip after last video)
if i < vid_total:
print(f" Waiting {GEMINI_VIDEO_DELAY}s (rate limit)...")
time.sleep(GEMINI_VIDEO_DELAY)
# ── Combined Summary ─────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f" Images: {img_total} total | {img_tagged} tagged | {img_skipped} skipped | {img_errored} errors | {img_unprocessed} unprocessed (cap)")
print(f" Videos: {vid_total} total | {vid_tagged} tagged | {vid_skipped} skipped | {vid_errored} errors | {vid_unprocessed} unprocessed (cap)")
print(f" Overall: {img_total + vid_total} total | {img_tagged + vid_tagged} tagged | {img_skipped + vid_skipped} skipped | {img_errored + vid_errored} errors | {img_unprocessed + vid_unprocessed} unprocessed (cap)")
elapsed = time.monotonic() - run_start
print(f" Run time: {elapsed/60:.1f} min")
if img_unprocessed + vid_unprocessed > 0:
print(f" NOTE: Run cap was reached. Remaining files will be processed on the next scheduled run.")
print("=" * 60)
# ── 14. Backfill from Box ─────────────────────────────────────────────────────
def _run_backfill(run_id, db_conn):
"""
Walk the Box folder and, for every file that ALREADY has marriottUsa
metadata, insert a `status='backfilled'` row into tagging_events. No
Gemini calls, no Box writes — purely reads from Box and mirrors into
the local DB so the per-file skip check (which is now DB-based) won't
re-tag files Box has already tagged.
Idempotent: a file that already has a success/backfilled row is left
alone. Safe to re-run after a partial backfill.
"""
print("=" * 60)
print("Marriott Box Asset Tagger — BACKFILL")
print("=" * 60)
print(f"Run ID: {run_id}")
box_client = init_box_client()
image_files, video_files = list_all_media(box_client)
total = len(image_files) + len(video_files)
if not total:
print("No media files found. Exiting.")
return
inserted = 0
no_metadata = 0
already_in_db = 0
errored = 0
combined = [("image", f) for f in image_files] + [("video", f) for f in video_files]
for i, (media_type, file_info) in enumerate(combined, 1):
file_id = file_info["id"]
file_name = file_info["name"]
folder_path = file_info.get("folder_path", "")
print(f"\n[{i}/{total}] {media_type}: {file_name} (ID: {file_id})")
if folder_path:
print(f" Folder: {folder_path}")
if db.is_file_already_tagged(db_conn, file_id):
print(" Already in DB — skipping.")
already_in_db += 1
continue
try:
existing = fetch_existing_metadata(box_client, file_id)
except BoxAPIError as e:
print(f" ERROR reading metadata from Box: {e}")
errored += 1
continue
if existing is None:
print(" No marriottUsa metadata in Box — leaving for normal tagging pass.")
no_metadata += 1
continue
description = fetch_file_description(box_client, file_id)
print(f" Fields from Box: {list(existing.keys()) or '(none)'}")
if description:
print(f" Description: {description[:80]}{'' if len(description) > 80 else ''}")
db.log_event(
db_conn,
run_id=run_id,
file_id=file_id,
file_name=file_name,
folder_path=folder_path,
media_type=media_type,
gemini_model=GEMINI_MODEL,
status="backfilled",
prompt=None,
raw_response=None,
description=description,
scenes=None,
validated_metadata=existing or {},
metadata_write_success=True,
description_write_success=bool(description),
scene_comment_write_success=None,
error_message=None,
duration_ms=None,
)
inserted += 1
print("\n" + "=" * 60)
print("BACKFILL SUMMARY")
print("=" * 60)
print(f" Total files seen: {total}")
print(f" Inserted into DB: {inserted}")
print(f" Already in DB (skipped): {already_in_db}")
print(f" No metadata in Box: {no_metadata}")
print(f" Errors reading Box: {errored}")
print("=" * 60)
if __name__ == "__main__":
main()