Add asset tagger pipeline with keyword-tail descriptions and large-video gating

- Box JWT + Gemini integration for image and video metadata tagging
- Description format includes search-keyword tail to address synonym gaps
  (e.g. "Food" search now hits assets tagged "Dining")
- Skip videos exceeding 5GB source or 400MB proxy (~60min runtime, beyond
  Gemini context budget) — counted as skipped, not errored
- Hardened None-response handling in Gemini JSON parser
- Per-run limiter: 200 newly-tagged files / 4 hour wall-clock cap, with
  clean exit and resumable progress on next run
- systemd service + timer for daily 2am tagging passes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Simeon.Schecter 2026-05-06 14:09:28 -04:00
parent 9a837a33b9
commit a04e8c1e37
7 changed files with 1098 additions and 9 deletions

22
.gitignore vendored
View file

@ -1,8 +1,17 @@
# These are some examples of commonly ignored file patterns.
# You should customize this list as applicable to your project.
# Learn more about .gitignore:
# https://www.atlassian.com/git/tutorials/saving-changes/gitignore
# ── Project-specific (security-critical, do NOT commit) ──────────────────────
# Box JWT keypair + client secrets
box_config.json
# Gemini API key
.env
# Local virtualenv
env/
venv/
# Python bytecode
__pycache__/
*.pyc
*.py[cod]
# ── Bitbucket boilerplate ────────────────────────────────────────────────────
# Node artifact files
node_modules/
dist/
@ -10,9 +19,6 @@ dist/
# Compiled Java class files
*.class
# Compiled Python bytecode
*.py[cod]
# Log files
*.log
@ -21,7 +27,6 @@ dist/
# Maven
target/
dist/
# JetBrains IDE
.idea/
@ -47,4 +52,3 @@ Thumbs.db
*.flv
*.mov
*.wmv

31
CLAUDE.md Normal file
View file

@ -0,0 +1,31 @@
# CLAUDE.md — Box + Gemini Auto-Tagger (Marriott)
## Project Overview
This tool auto-applies AI-generated tags/descriptions to images and videos
stored in a Box enterprise instance for Marriott. It uses Google Gemini's
vision API to analyze media files and write descriptions back to Box as metadata.
## Tech Stack
- Python 3.11+
- Virtual environment: `python -m venv env` / `source env/bin/activate`
- Box SDK: `box-sdk-gen` (NOT the legacy `boxsdk`)
- Image processing: `Pillow`
- Gemini API: `google-genai` (NOT the deprecated `google-generativeai`)
## Key Package Installs
pip install box-sdk-gen[jwt] google-genai Pillow python-dotenv
## Auth Files (DO NOT COMMIT)
- `box_config.json` — Downloaded from Box Developer Console (JWT config)
- `.env` — Contains GEMINI_API_KEY
## Box Folder IDs
- Source media folder: 3155... (confirm exact ID before running)
- JWT config file is stored within this folder or adjacent
## Code Style
- Use try/except blocks on ALL API calls (Box and Gemini)
- Log all errors with the file name and error type
- Use .env for all secrets — never hardcode keys
- Process files one at a time (not bulk) to respect API rate limits
- Always check if a tag already exists before writing to avoid duplicates

68
README.md Normal file
View file

@ -0,0 +1,68 @@
# Marriott Box Asset Tagger
Batch-processes images in a Box folder, analyzes them with Gemini AI, and writes structured metadata back to Box using the `marriottUsa` metadata template.
## Setup
### 1. Clone and create virtual environment
```bash
cd Marriott_Box_Asset_Tagging
python3 -m venv env
source env/bin/activate
pip install -r requirements.txt
```
### 2. Box JWT credentials
Download your Box app's JWT config from the [Box Developer Console](https://app.box.com/developers/console) and save it as `box_config.json` in the project root.
The service account must have:
- Access to folder `370595013246`
- Permission to read/write metadata using the `marriottUsa` template
### 3. Gemini API key
Add your key to `.env`:
```
GEMINI_API_KEY=your_key_here
```
Get a key at [Google AI Studio](https://aistudio.google.com/apikey).
## Usage
```bash
source env/bin/activate
python main.py
```
The script will:
1. Authenticate with Box and Gemini
2. Fetch the `marriottUsa` template schema (fields, types, allowed values)
3. Build a dynamic Gemini prompt from the schema
4. List all image files in the target folder
5. For each image: download, resize, analyze with Gemini, validate metadata, write to Box
6. Print a summary of results
## Configuration
Edit the constants at the top of `main.py`:
| Setting | Default | Description |
|---------|---------|-------------|
| `BOX_FOLDER_ID` | `370595013246` | Box folder to process |
| `METADATA_TEMPLATE_KEY` | `marriottUsa` | Box metadata template key |
| `GEMINI_MODEL` | `gemini-2.5-flash` | Gemini model for analysis |
| `EXCLUDED_FOLDERS` | `{"zz_Working Retouch"}` | Subfolder names to skip |
| `GEMINI_DELAY` | `7` | Seconds between Gemini calls |
| `SKIP_ALREADY_TAGGED` | `True` | Skip files with existing metadata |
| `MAX_IMAGE_SIZE` | `1000` | Max pixel dimension for resize |
## How It Works
- **Dynamic prompt**: The Gemini prompt is built at runtime from the actual Box template definition. If Marriott adds/changes fields or options in Box Admin, the script adapts automatically.
- **Metadata + description**: Each file gets structured metadata (for filtered search) and a short description (visible in Box list views).
- **Resumable**: Files with existing metadata are skipped by default, so the script can be re-run after interruptions or when new images are added.
- **Validation**: Gemini output is validated against the template schema — invalid enum values are dropped, multiSelect arrays are filtered to allowed options only.

914
main.py Normal file
View file

@ -0,0 +1,914 @@
"""
Marriott Box Asset Tagger
Processes images and videos in a Box folder, analyzes them with Gemini AI,
and writes structured metadata back to Box using the marriottUsa template.
Videos use Box's 480p MP4 proxy representations to minimize bandwidth.
"""
import io
import json
import os
import re
import sys
import time
import requests
from dotenv import load_dotenv
from PIL import Image
from box_sdk_gen import BoxClient, BoxJWTAuth, JWTConfig, BoxAPIError
from box_sdk_gen.managers.file_metadata import (
CreateFileMetadataByIdScope,
UpdateFileMetadataByIdScope,
UpdateFileMetadataByIdRequestBody,
UpdateFileMetadataByIdRequestBodyOpField,
)
from box_sdk_gen.managers.comments import CreateCommentItem, CreateCommentItemTypeField
from google import genai
from google.genai import types
# ── Configuration ────────────────────────────────────────────────────────────
BOX_FOLDER_ID = "380274488839"
METADATA_TEMPLATE_KEY = "marriottUsa"
METADATA_SCOPE = "enterprise"
GEMINI_MODEL = "gemini-2.5-flash"
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".webm", ".mpg", ".mpeg", ".m4v", ".3gp"}
EXCLUDED_FOLDER_PREFIXES = ("z_", "zz_", "zzz_")
MAX_IMAGE_SIZE = 1000 # longest side in pixels
GEMINI_DELAY = 7 # seconds between Gemini calls
GEMINI_VIDEO_DELAY = 10 # longer delay for video (larger token usage)
VIDEO_SIZE_LIMIT_INLINE = 20 * 1024 * 1024 # 20MB — below this, send inline; above, use File API
VIDEO_SOURCE_SIZE_LIMIT = 5 * 1024 * 1024 * 1024 # 5GB — skip videos whose source file exceeds this
VIDEO_PROXY_SIZE_LIMIT = 400 * 1024 * 1024 # 400MB — skip videos whose 480p proxy exceeds this (≈ >60 min runtime)
VIDEO_SKIP = "VIDEO_SKIP" # sentinel returned by download_video_proxy for intentional (non-error) skips
SKIP_ALREADY_TAGGED = True
DESCRIPTION_MAX_LENGTH = 255
# Per-run limiters: protect against runaway cost / time on a sudden large upload.
# Counts only NEWLY-tagged files (skipped-as-already-tagged is free and doesn't count).
# Shared across images and videos. When either cap is hit, the run exits cleanly
# with a summary; the next scheduled run picks up the remaining untagged files.
MAX_FILES_PER_RUN = 200 # hard cap on newly-tagged files per run
MAX_RUN_DURATION = 4 * 3600 # hard cap in seconds (4 hours; stays well under the 6h systemd timeout)
# ── 1. Box Client ────────────────────────────────────────────────────────────
def init_box_client():
"""Authenticate with Box via JWT and return a BoxClient."""
config_path = os.path.join(os.path.dirname(__file__), "box_config.json")
if not os.path.exists(config_path):
print(f"ERROR: Box JWT config not found at {config_path}")
sys.exit(1)
jwt_config = JWTConfig.from_config_file(config_file_path=config_path)
auth = BoxJWTAuth(config=jwt_config)
client = BoxClient(auth=auth)
user = client.users.get_user_me()
print(f"Authenticated as Box service account: {user.name} (ID: {user.id})")
return client
# ── 2. Gemini Client ─────────────────────────────────────────────────────────
def init_gemini_client():
"""Load API key from .env and return a genai Client."""
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
print("ERROR: GEMINI_API_KEY not set in .env")
sys.exit(1)
client = genai.Client(api_key=api_key)
print("Gemini client initialized")
return client
# ── 3. Fetch Template Schema ─────────────────────────────────────────────────
def fetch_template_schema(box_client):
"""
Fetch the marriottUsa metadata template definition from Box.
Returns dict: field_key -> {display_name, type, options}
"""
try:
template = box_client.metadata_templates.get_metadata_template(
scope=METADATA_SCOPE,
template_key=METADATA_TEMPLATE_KEY,
)
except BoxAPIError as e:
print(f"ERROR: Could not fetch template '{METADATA_TEMPLATE_KEY}': {e}")
sys.exit(1)
schema = {}
for field in template.fields:
field_info = {
"display_name": field.display_name,
"type": field.type.value if hasattr(field.type, "value") else str(field.type),
"options": [],
}
if field.options:
field_info["options"] = [opt.key for opt in field.options]
schema[field.key] = field_info
print(f"Template '{METADATA_TEMPLATE_KEY}' loaded — {len(schema)} fields:")
for key, info in schema.items():
opts = f" ({len(info['options'])} options)" if info["options"] else ""
print(f"{info['display_name']} [{info['type']}]{opts}")
return schema
# ── 4. Build Gemini Prompt ────────────────────────────────────────────────────
def _build_field_lines(template_schema):
"""Build the FIELDS section lines shared by image and video prompts."""
lines = []
for key, info in template_schema.items():
field_type = info["type"]
display = info["display_name"]
options = info["options"]
if field_type == "enum" and options:
lines.append(f' "{key}" ({display}): enum — choose ONE from: {json.dumps(options)}')
elif field_type == "multiSelect" and options:
lines.append(f' "{key}" ({display}): multiSelect — choose any from: {json.dumps(options)}')
elif field_type == "string":
lines.append(f' "{key}" ({display}): string — free text')
elif field_type == "float":
lines.append(f' "{key}" ({display}): float — numeric value')
else:
lines.append(f' "{key}" ({display}): {field_type}')
return lines
def _build_context_lines(file_name, folder_path):
"""Build FILE CONTEXT lines from filename and folder path."""
lines = [
"",
"FILE CONTEXT (use this to help identify the brand, property, and destination):",
f' Filename: "{file_name}"',
]
if folder_path:
lines.append(f' Folder path: "{folder_path}"')
lines.append(" Use the folder names and filename to infer the hotel brand, property name, and location when not obvious from the visual content alone.")
return lines
def build_gemini_prompt(template_schema, file_name="", folder_path=""):
"""
Dynamically build a Gemini prompt from the template schema.
Includes file context (filename/folder) to help identify brand/property.
"""
lines = [
"You are an expert image analyst for Marriott's digital asset library.",
"Analyze this image and return a JSON object with the following fields.",
"Also include a 'description' field that combines a human-readable summary with a search-keyword tail (see format below).",
"",
"RULES:",
"- Return ONLY valid JSON, no markdown, no explanation.",
"- For enum fields, pick exactly ONE value from the allowed list, or null if none fit.",
"- For multiSelect fields, return an array of matching values from the allowed list (can be empty).",
"- For string fields, write a concise relevant value, or null if not applicable.",
"- Be accurate. Only tag what you can clearly see in the image.",
"- Use the file context below to help identify the hotel brand, property, and destination.",
"",
"FIELDS:",
]
lines.extend(_build_field_lines(template_schema))
lines.append("")
lines.append(' "description": string — formatted as "<summary sentence>. <keyword tail>."')
lines.append(' Part 1 (summary): a natural human-readable sentence describing the image (~140 characters max).')
lines.append(' Part 2 (keyword tail): a comma-separated list of search synonyms and broader category terms (~100 characters max).')
lines.append(' The two parts together MUST be 255 characters or fewer total.')
lines.append(' KEYWORD STRATEGY: include synonyms and broader category terms for the main concepts shown — NOT adjacent or contextual terms.')
lines.append(' Example: a buffet image → "buffet, food, dining, eating, meal, breakfast, restaurant" (YES). Do NOT add "hotel, guest, morning" (NO — those are contextual, not synonyms).')
lines.append(' Cover synonym families for whatever is shown: food/dining (food, dining, eating, meal, restaurant, cafe), spaces (room, suite, lobby, bathroom, balcony, terrace, patio), wellness (pool, swimming, gym, fitness, spa, wellness), events (meeting, conference, wedding, ballroom, banquet), settings (beach, oceanfront, waterfront, garden, view, vista), people (couple, family, kids, guest), and time/mood (sunset, evening, morning, night) — only when present in the image.')
lines.append(' Full example: "Couple enjoying breakfast on the terrace overlooking the bay. Dining, eating, food, meal, breakfast, terrace, balcony, patio, view, ocean, waterfront, couple."')
lines.extend(_build_context_lines(file_name, folder_path))
lines.append("")
lines.append("Return ONLY the JSON object.")
return "\n".join(lines)
# ── 4b. Build Video Prompt ───────────────────────────────────────────────────
def build_video_prompt(template_schema, file_name="", folder_path=""):
"""
Dynamically build a Gemini prompt for video analysis from the template schema.
Includes file context and instructs Gemini to consider the full video.
"""
lines = [
"You are an expert video analyst for Marriott's digital asset library.",
"Analyze this video in its entirety — consider all scenes, transitions, and content throughout the full duration.",
"Return a JSON object with the following fields.",
"Also include a 'description' field that combines a human-readable summary with a search-keyword tail (see format below).",
"",
"RULES:",
"- Return ONLY valid JSON, no markdown, no explanation.",
"- For enum fields, pick exactly ONE value from the allowed list, or null if none fit.",
"- For multiSelect fields, return an array of matching values from the allowed list (can be empty).",
"- For string fields, write a concise relevant value, or null if not applicable.",
"- Be accurate. Only tag what you can clearly see or hear in the video.",
"- Base your analysis on the overall content and theme of the video, not just a single frame.",
"- Use the file context below to help identify the hotel brand, property, and destination.",
"",
"FIELDS:",
]
lines.extend(_build_field_lines(template_schema))
lines.append("")
lines.append(' "description": string — formatted as "<summary sentence>. <keyword tail>."')
lines.append(' Part 1 (summary): a natural human-readable sentence describing the video overall (~140 characters max).')
lines.append(' Part 2 (keyword tail): a comma-separated list of search synonyms and broader category terms covering content across the whole video (~100 characters max).')
lines.append(' The two parts together MUST be 255 characters or fewer total.')
lines.append(' KEYWORD STRATEGY: include synonyms and broader category terms for the main concepts shown across the video — NOT adjacent or contextual terms.')
lines.append(' Example: a video featuring a buffet → "buffet, food, dining, eating, meal, breakfast, restaurant" (YES). Do NOT add "hotel, guest, morning" (NO — those are contextual, not synonyms).')
lines.append(' Cover synonym families for what is shown: food/dining (food, dining, eating, meal, restaurant, cafe), spaces (room, suite, lobby, bathroom, balcony, terrace, patio), wellness (pool, swimming, gym, fitness, spa, wellness), events (meeting, conference, wedding, ballroom, banquet), settings (beach, oceanfront, waterfront, garden, view, vista), people (couple, family, kids, guest), and time/mood (sunset, evening, morning, night) — only when present in the video.')
lines.append(' Full example: "Couple enjoys breakfast on the terrace then walks the beach at sunset. Dining, eating, food, breakfast, terrace, beach, oceanfront, sunset, couple, walk."')
lines.append(' "scenes": array of strings — HIGH-LEVEL timestamped scene breakdown.')
lines.append(' Each entry format: "M:SS-M:SS keywords describing scene"')
lines.append(' HARD LIMIT: Never exceed 25 scenes. Aim for 5-20 scenes regardless of video length.')
lines.append(' MINIMUM DURATION: Each scene should span at least 30 seconds of content. Merge shorter moments into the surrounding scene.')
lines.append(' WHAT COUNTS AS A NEW SCENE: Only split when the location, subject, OR primary activity fundamentally changes.')
lines.append(' Same person talking in the same place = ONE scene even if topics shift.')
lines.append(' Same activity in the same setting = ONE scene even if camera angles change.')
lines.append(' DO NOT split for: camera cuts, slight movements, continuation of the same interview/activity/tour.')
lines.append(' Be descriptive with keywords — incomplete sentences are fine.')
lines.append(' Focus on searchable terms: locations, activities, people, objects, mood.')
lines.append(' Example for a 20-min video: ["0:00-2:30 hotel breakfast buffet coffee pastries morning routine", "2:31-5:45 hotel gym treadmill weights workout", "5:46-8:20 hotel room tour bed desk bathroom closet", "8:21-14:00 arena floor volleyball practice drills warmup", "14:01-18:30 fan interviews championship banner trophy poses", "18:31-20:00 arena wide shots jumbotron match highlights"]')
lines.extend(_build_context_lines(file_name, folder_path))
lines.append("")
lines.append("Return ONLY the JSON object.")
return "\n".join(lines)
# ── 5. List Image Files ──────────────────────────────────────────────────────
def _list_folder_media(box_client, folder_id, folder_path=""):
"""List all image and video files in a single Box folder (with pagination)."""
image_files = []
video_files = []
subfolders = []
offset = 0
limit = 100
while True:
items = box_client.folders.get_folder_items(
folder_id=folder_id,
limit=limit,
offset=offset,
)
if not items.entries:
break
for item in items.entries:
if hasattr(item.type, 'value'):
item_type = item.type.value
else:
item_type = str(item.type)
if item_type == "folder":
if item.name.lower().startswith(EXCLUDED_FOLDER_PREFIXES):
print(f" Skipping excluded folder: {item.name}")
else:
subfolders.append({"id": item.id, "name": item.name})
elif item_type == "file":
ext = os.path.splitext(item.name)[1].lower()
file_entry = {"id": item.id, "name": item.name, "folder_path": folder_path}
if ext in IMAGE_EXTENSIONS:
image_files.append(file_entry)
elif ext in VIDEO_EXTENSIONS:
video_files.append(file_entry)
if len(items.entries) < limit:
break
offset += limit
return image_files, video_files, subfolders
def list_all_media(box_client):
"""
Recursively list all image and video files in the target Box folder tree.
Skips folders whose names start with excluded prefixes. Handles pagination.
Returns (image_files, video_files) each a list of {id, name, folder_path} dicts.
"""
all_images = []
all_videos = []
def _recurse(folder_id, folder_path="", depth=0):
indent = " " * depth
images, videos, subfolders = _list_folder_media(box_client, folder_id, folder_path)
if depth == 0:
print(f"Root folder: {len(images)} images, {len(videos)} videos, {len(subfolders)} subfolders")
else:
print(f"{indent}Found {len(images)} images, {len(videos)} videos, {len(subfolders)} subfolders")
all_images.extend(images)
all_videos.extend(videos)
for folder in subfolders:
print(f"{indent} Scanning subfolder: {folder['name']}...")
child_path = f"{folder_path}/{folder['name']}" if folder_path else folder['name']
_recurse(folder["id"], child_path, depth + 1)
_recurse(BOX_FOLDER_ID)
print(f"Found {len(all_images)} total image files, {len(all_videos)} total video files")
return all_images, all_videos
# ── 6. Download and Resize Image ─────────────────────────────────────────────
def download_and_resize_image(box_client, file_id, file_name):
"""
Download file from Box, resize with Pillow (max 1000px longest side).
Returns (bytes, mime_type) or None on failure.
"""
try:
stream = box_client.downloads.download_file(file_id=file_id)
raw_bytes = stream.read()
img = Image.open(io.BytesIO(raw_bytes))
img.thumbnail((MAX_IMAGE_SIZE, MAX_IMAGE_SIZE), Image.Resampling.LANCZOS)
# Convert to RGB if necessary (e.g. RGBA PNGs)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
resized_bytes = buffer.getvalue()
return resized_bytes, "image/jpeg"
except Exception as e:
print(f" ERROR downloading/resizing '{file_name}': {e}")
return None
# ── 6b. Download Video Proxy ─────────────────────────────────────────────────
def download_video_proxy(box_client, file_id, file_name):
"""
Download Box's 480p MP4 representation of a video file.
Returns (bytes, "video/mp4") or None if proxy not ready, source/proxy too large, or other failure.
Skips videos whose source exceeds VIDEO_SOURCE_SIZE_LIMIT or whose proxy exceeds VIDEO_PROXY_SIZE_LIMIT
(proxy size is a proxy-no-pun-intended for runtime; >400MB >60 min, beyond Gemini's context budget).
"""
try:
# Source size gate (catches obvious mega-files before requesting representations)
file_info = box_client.files.get_file_by_id(
file_id=file_id,
fields=["size", "representations"],
x_rep_hints="[mp4]",
)
source_size = getattr(file_info, "size", None)
if source_size and source_size > VIDEO_SOURCE_SIZE_LIMIT:
print(f" Skipping — source file size {source_size / (1024*1024):.0f} MB exceeds limit of {VIDEO_SOURCE_SIZE_LIMIT / (1024*1024):.0f} MB")
return VIDEO_SKIP
mp4_entry = None
if file_info.representations and file_info.representations.entries:
for entry in file_info.representations.entries:
if entry.representation == "mp4":
mp4_entry = entry
break
if mp4_entry is None:
print(f" No MP4 representation available — skipping (re-run later)")
return None
# Poll until representation is ready (up to 60s)
state = mp4_entry.status.state if mp4_entry.status else None
if hasattr(state, "value"):
state = state.value
retries = 0
while state in ("pending", "none") and retries < 12:
print(f" MP4 representation is '{state}' — waiting 5s (attempt {retries + 1}/12)...")
time.sleep(5)
retries += 1
file_info = box_client.files.get_file_by_id(
file_id=file_id,
fields=["representations"],
x_rep_hints="[mp4]",
)
for entry in file_info.representations.entries:
if entry.representation == "mp4":
mp4_entry = entry
break
state = mp4_entry.status.state if mp4_entry.status else None
if hasattr(state, "value"):
state = state.value
if state not in ("success", "viewable"):
print(f" MP4 proxy not ready (state: '{state}') — skipping (re-run later)")
return None
# Build download URL from template
url_template = mp4_entry.content.url_template
download_url = url_template.replace("{+asset_path}", "")
# Proxy size gate via HEAD request (avoids downloading hundreds of MB just to skip)
auth_header = box_client.auth.retrieve_authorization_header()
head_resp = requests.head(download_url, headers={"Authorization": auth_header}, allow_redirects=True)
proxy_size = int(head_resp.headers.get("Content-Length", 0))
if proxy_size and proxy_size > VIDEO_PROXY_SIZE_LIMIT:
print(f" Skipping — 480p proxy {proxy_size / (1024*1024):.0f} MB exceeds limit of {VIDEO_PROXY_SIZE_LIMIT / (1024*1024):.0f} MB (likely >60 min runtime, beyond Gemini context budget)")
return VIDEO_SKIP
# Download with auth
resp = requests.get(download_url, headers={"Authorization": auth_header})
resp.raise_for_status()
print(f" Downloaded MP4 proxy: {len(resp.content) / (1024*1024):.1f} MB")
return resp.content, "video/mp4"
except Exception as e:
print(f" ERROR downloading video proxy for '{file_name}': {e}")
return None
# ── 7. Analyze Image with Gemini ──────────────────────────────────────────────
def analyze_image_with_gemini(gemini_client, image_bytes, mime_type, prompt):
"""
Send image + prompt to Gemini, parse JSON response.
Returns dict or None.
"""
try:
response = gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[
prompt,
types.Part.from_bytes(data=image_bytes, mime_type=mime_type),
],
)
return _parse_gemini_json(response.text)
except Exception as e:
print(f" ERROR from Gemini: {e}")
return None
# ── 7b. Analyze Video with Gemini ─────────────────────────────────────────────
def _parse_gemini_json(text):
"""Parse JSON from Gemini response text, stripping markdown fences if present."""
if not text:
print(f" WARNING: Gemini returned no response (likely too large or processing failed)")
return None
text = text.strip()
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r"\{[\s\S]*\}", text)
if match:
return json.loads(match.group())
print(f" WARNING: Could not parse Gemini response as JSON")
return None
def analyze_video_with_gemini(gemini_client, video_bytes, mime_type, prompt):
"""
Send video + prompt to Gemini, parse JSON response.
Uses inline bytes for videos < 20MB, File API for larger ones.
Returns dict or None.
"""
try:
if len(video_bytes) < VIDEO_SIZE_LIMIT_INLINE:
# Small video — send inline
print(f" Sending video inline ({len(video_bytes) / (1024*1024):.1f} MB)")
response = gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[
prompt,
types.Part.from_bytes(data=video_bytes, mime_type=mime_type),
],
)
else:
# Large video — use File API
print(f" Uploading video via File API ({len(video_bytes) / (1024*1024):.1f} MB)...")
uploaded_file = gemini_client.files.upload(
file=io.BytesIO(video_bytes),
config=types.UploadFileConfig(mime_type=mime_type),
)
# Poll until file is ACTIVE
retries = 0
while uploaded_file.state.name == "PROCESSING" and retries < 30:
print(f" File API processing... (attempt {retries + 1}/30)")
time.sleep(5)
uploaded_file = gemini_client.files.get(name=uploaded_file.name)
retries += 1
if uploaded_file.state.name != "ACTIVE":
print(f" ERROR: File API state is '{uploaded_file.state.name}' — aborting")
try:
gemini_client.files.delete(name=uploaded_file.name)
except Exception:
pass
return None
response = gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[prompt, uploaded_file],
)
# Clean up uploaded file
try:
gemini_client.files.delete(name=uploaded_file.name)
except Exception:
pass
return _parse_gemini_json(response.text)
except Exception as e:
print(f" ERROR from Gemini (video): {e}")
return None
# ── 8. Validate and Clean Metadata ────────────────────────────────────────────
def validate_and_clean_metadata(raw_metadata, template_schema):
"""
Validate Gemini output against the template schema.
- Enum: must match an allowed option (case-insensitive)
- MultiSelect: filter to only allowed options
- String: pass through
- Drop unknown keys
"""
cleaned = {}
for key, info in template_schema.items():
if key not in raw_metadata:
continue
value = raw_metadata[key]
field_type = info["type"]
options = info["options"]
if value is None:
continue
if field_type == "enum" and options:
# Case-insensitive match
options_lower = {o.lower(): o for o in options}
if isinstance(value, str) and value.lower() in options_lower:
cleaned[key] = options_lower[value.lower()]
else:
print(f" WARNING: Invalid enum value '{value}' for field '{key}' — dropped")
elif field_type == "multiSelect" and options:
if isinstance(value, list):
options_lower = {o.lower(): o for o in options}
valid = [options_lower[v.lower()] for v in value if isinstance(v, str) and v.lower() in options_lower]
if valid:
cleaned[key] = valid
invalid = [v for v in value if isinstance(v, str) and v.lower() not in options_lower]
if invalid:
print(f" WARNING: Invalid multiSelect values for '{key}': {invalid} — dropped")
else:
print(f" WARNING: Expected list for multiSelect '{key}', got {type(value).__name__} — dropped")
elif field_type == "string":
if isinstance(value, str) and value.strip():
cleaned[key] = value.strip()
elif field_type == "float":
try:
cleaned[key] = float(value)
except (ValueError, TypeError):
print(f" WARNING: Invalid float value '{value}' for field '{key}' — dropped")
return cleaned
# ── 9. Check Existing Metadata ────────────────────────────────────────────────
def check_existing_metadata(box_client, file_id):
"""Check if file already has marriottUsa metadata. Returns True/False."""
try:
box_client.file_metadata.get_file_metadata_by_id(
file_id=file_id,
scope=CreateFileMetadataByIdScope.ENTERPRISE,
template_key=METADATA_TEMPLATE_KEY,
)
return True
except BoxAPIError as e:
if e.response_info.status_code == 404:
return False
raise
# ── 10. Write Metadata to Box ─────────────────────────────────────────────────
def write_metadata_to_box(box_client, file_id, metadata, file_name):
"""
Create metadata on file. On 409 conflict, fall back to update
using JSON Patch ADD operations.
"""
try:
box_client.file_metadata.create_file_metadata_by_id(
file_id=file_id,
scope=CreateFileMetadataByIdScope.ENTERPRISE,
template_key=METADATA_TEMPLATE_KEY,
request_body=metadata,
)
print(f" Metadata CREATED on '{file_name}'")
return True
except BoxAPIError as e:
if e.response_info.status_code == 409:
# Metadata already exists — update with JSON Patch
try:
box_client.file_metadata.update_file_metadata_by_id(
file_id=file_id,
scope=UpdateFileMetadataByIdScope.ENTERPRISE,
template_key=METADATA_TEMPLATE_KEY,
request_body=[
UpdateFileMetadataByIdRequestBody(
op=UpdateFileMetadataByIdRequestBodyOpField.ADD,
path=f"/{key}",
value=value,
)
for key, value in metadata.items()
],
)
print(f" Metadata UPDATED on '{file_name}'")
return True
except BoxAPIError as update_err:
print(f" ERROR updating metadata on '{file_name}': {update_err}")
return False
else:
print(f" ERROR creating metadata on '{file_name}': {e}")
return False
# ── 11. Write Description to Box ──────────────────────────────────────────────
def write_description_to_box(box_client, file_id, description, file_name):
"""Write a short AI summary to the Box file description field (max 255 chars)."""
try:
truncated = description[:DESCRIPTION_MAX_LENGTH]
box_client.files.update_file_by_id(
file_id=file_id,
description=truncated,
)
print(f" Description written on '{file_name}'")
return True
except BoxAPIError as e:
print(f" ERROR writing description on '{file_name}': {e}")
return False
# ── 12. Write Scene Breakdown Comment ────────────────────────────────────────
def write_scene_comment_to_box(box_client, file_id, scenes, file_name):
"""Write timestamped scene breakdown as a comment on the Box file.
Accepts a list of scene strings or a single semicolon-separated string.
"""
try:
if isinstance(scenes, list):
scene_lines = "\n".join(scenes)
else:
scene_lines = scenes.replace("; ", "\n")
message = f"Scene breakdown:\n{scene_lines}"
box_client.comments.create_comment(
message=message,
item=CreateCommentItem(
id=file_id,
type=CreateCommentItemTypeField.FILE,
),
)
print(f" Scene comment written on '{file_name}'")
return True
except BoxAPIError as e:
print(f" ERROR writing scene comment on '{file_name}': {e}")
return False
# ── 13. Main Pipeline ─────────────────────────────────────────────────────────
def main():
print("=" * 60)
print("Marriott Box Asset Tagger")
print("=" * 60)
# Initialize clients
box_client = init_box_client()
gemini_client = init_gemini_client()
# Fetch template schema (prompts are built per-file to include context)
template_schema = fetch_template_schema(box_client)
# List all media files
image_files, video_files = list_all_media(box_client)
if not image_files and not video_files:
print("No media files found. Exiting.")
return
# Per-run limiter state
run_start = time.monotonic()
cap_hit_reason = None # set to a string when cap is hit; loops break cleanly
def cap_check(newly_tagged_count):
"""Return a reason string if a cap is hit, else None."""
if newly_tagged_count >= MAX_FILES_PER_RUN:
return f"file cap reached ({newly_tagged_count}/{MAX_FILES_PER_RUN} newly-tagged)"
elapsed = time.monotonic() - run_start
if elapsed >= MAX_RUN_DURATION:
return f"time cap reached ({elapsed/3600:.1f}h / {MAX_RUN_DURATION/3600:.1f}h)"
return None
# ── Process Images ───────────────────────────────────────────────────────
img_total = len(image_files)
img_tagged = 0
img_skipped = 0
img_errored = 0
img_unprocessed = 0 # remaining when cap hit
if image_files:
print(f"\n{'' * 60}")
print(f"PROCESSING {img_total} IMAGES")
print(f"{'' * 60}")
for i, file_info in enumerate(image_files, 1):
# Cap check — exit cleanly before doing any new work
cap_hit_reason = cap_check(img_tagged)
if cap_hit_reason:
img_unprocessed = img_total - i + 1
print(f"\nRun cap hit ({cap_hit_reason}) — {img_unprocessed} images remain. Will resume on next run.")
break
file_id = file_info["id"]
file_name = file_info["name"]
folder_path = file_info.get("folder_path", "")
print(f"\n[Image {i}/{img_total}] Processing: {file_name} (ID: {file_id})")
if folder_path:
print(f" Folder: {folder_path}")
# Check if already tagged
if SKIP_ALREADY_TAGGED:
if check_existing_metadata(box_client, file_id):
print(f" Already tagged — skipping")
img_skipped += 1
continue
# Download and resize
result = download_and_resize_image(box_client, file_id, file_name)
if result is None:
img_errored += 1
continue
image_bytes, mime_type = result
# Build per-file prompt with context
image_prompt = build_gemini_prompt(template_schema, file_name, folder_path)
# Analyze with Gemini
raw_metadata = analyze_image_with_gemini(gemini_client, image_bytes, mime_type, image_prompt)
if raw_metadata is None:
img_errored += 1
continue
# Extract description before validation
description = raw_metadata.pop("description", None)
# Validate and clean
cleaned_metadata = validate_and_clean_metadata(raw_metadata, template_schema)
if not cleaned_metadata:
print(f" WARNING: No valid metadata fields after validation — skipping")
img_errored += 1
continue
print(f" Metadata: {json.dumps(cleaned_metadata, indent=2)}")
# Write metadata to Box
if not write_metadata_to_box(box_client, file_id, cleaned_metadata, file_name):
img_errored += 1
continue
# Write description to Box
if description and isinstance(description, str):
write_description_to_box(box_client, file_id, description, file_name)
img_tagged += 1
# Rate limit delay (skip after last file)
if i < img_total or video_files:
print(f" Waiting {GEMINI_DELAY}s (rate limit)...")
time.sleep(GEMINI_DELAY)
# ── Process Videos ───────────────────────────────────────────────────────
vid_total = len(video_files)
vid_tagged = 0
vid_skipped = 0
vid_errored = 0
vid_unprocessed = 0
if video_files:
print(f"\n{'' * 60}")
print(f"PROCESSING {vid_total} VIDEOS")
print(f"{'' * 60}")
for i, file_info in enumerate(video_files, 1):
# Cap check (shared with images) — exit cleanly before doing any new work
cap_hit_reason = cap_check(img_tagged + vid_tagged)
if cap_hit_reason:
vid_unprocessed = vid_total - i + 1
print(f"\nRun cap hit ({cap_hit_reason}) — {vid_unprocessed} videos remain. Will resume on next run.")
break
file_id = file_info["id"]
file_name = file_info["name"]
folder_path = file_info.get("folder_path", "")
print(f"\n[Video {i}/{vid_total}] Processing: {file_name} (ID: {file_id})")
if folder_path:
print(f" Folder: {folder_path}")
# Check if already tagged
if SKIP_ALREADY_TAGGED:
if check_existing_metadata(box_client, file_id):
print(f" Already tagged — skipping")
vid_skipped += 1
continue
# Download video proxy (480p MP4)
result = download_video_proxy(box_client, file_id, file_name)
if result == VIDEO_SKIP:
vid_skipped += 1
continue
if result is None:
vid_errored += 1
continue
video_bytes, mime_type = result
# Build per-file prompt with context
video_prompt = build_video_prompt(template_schema, file_name, folder_path)
# Analyze with Gemini
raw_metadata = analyze_video_with_gemini(gemini_client, video_bytes, mime_type, video_prompt)
if raw_metadata is None:
vid_errored += 1
continue
# Extract description and scenes before validation
description = raw_metadata.pop("description", None)
scenes = raw_metadata.pop("scenes", None)
# Validate and clean
cleaned_metadata = validate_and_clean_metadata(raw_metadata, template_schema)
if not cleaned_metadata:
print(f" WARNING: No valid metadata fields after validation — skipping")
vid_errored += 1
continue
print(f" Metadata: {json.dumps(cleaned_metadata, indent=2)}")
if scenes:
print(f" Scenes: {scenes}")
# Write metadata to Box
if not write_metadata_to_box(box_client, file_id, cleaned_metadata, file_name):
vid_errored += 1
continue
# Write description to Box
if description and isinstance(description, str):
write_description_to_box(box_client, file_id, description, file_name)
# Write scene breakdown as comment
if scenes and isinstance(scenes, (str, list)):
write_scene_comment_to_box(box_client, file_id, scenes, file_name)
vid_tagged += 1
# Rate limit delay (skip after last video)
if i < vid_total:
print(f" Waiting {GEMINI_VIDEO_DELAY}s (rate limit)...")
time.sleep(GEMINI_VIDEO_DELAY)
# ── Combined Summary ─────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f" Images: {img_total} total | {img_tagged} tagged | {img_skipped} skipped | {img_errored} errors | {img_unprocessed} unprocessed (cap)")
print(f" Videos: {vid_total} total | {vid_tagged} tagged | {vid_skipped} skipped | {vid_errored} errors | {vid_unprocessed} unprocessed (cap)")
print(f" Overall: {img_total + vid_total} total | {img_tagged + vid_tagged} tagged | {img_skipped + vid_skipped} skipped | {img_errored + vid_errored} errors | {img_unprocessed + vid_unprocessed} unprocessed (cap)")
elapsed = time.monotonic() - run_start
print(f" Run time: {elapsed/60:.1f} min")
if img_unprocessed + vid_unprocessed > 0:
print(f" NOTE: Run cap was reached. Remaining files will be processed on the next scheduled run.")
print("=" * 60)
if __name__ == "__main__":
main()

49
marriott-tagger.service Normal file
View file

@ -0,0 +1,49 @@
[Unit]
Description=Marriott Box Asset Tagger - one-shot tagging pass
Documentation=https://bitbucket.org/zlalani/marriott-box-image-video-tagging
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
User=marriott-tagger
Group=marriott-tagger
# Working directory and venv-aware Python
WorkingDirectory=/opt/marriott-box-image-video-tagging
ExecStart=/opt/marriott-box-image-video-tagging/env/bin/python -u /opt/marriott-box-image-video-tagging/main.py
# Credentials live outside the code directory; main.py reads box_config.json
# from its own dir and .env via dotenv. Symlink or bind-mount these in place,
# or adjust paths in main.py if you prefer /etc/marriott-tagger/.
# Example: ln -s /etc/marriott-tagger/box_config.json /opt/marriott-box-image-video-tagging/box_config.json
# Output goes to the systemd journal — view with: journalctl -u marriott-tagger
StandardOutput=journal
StandardError=journal
# A full tagging pass on a large folder can take a long time (rate limits +
# video processing). Allow up to 6 hours before systemd kills it.
TimeoutStartSec=6h
# Don't restart on failure for a one-shot run — the timer will pick it up next cycle.
Restart=no
# ── Hardening ─────────────────────────────────────────────────────────────────
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectControlGroups=true
RestrictNamespaces=true
RestrictRealtime=true
RestrictSUIDSGID=true
LockPersonality=true
MemoryDenyWriteExecute=false
# Allow writes only to the app's own directory (for any temp files / caches)
ReadWritePaths=/opt/marriott-box-image-video-tagging
[Install]
WantedBy=multi-user.target

19
marriott-tagger.timer Normal file
View file

@ -0,0 +1,19 @@
[Unit]
Description=Schedule daily Marriott Box asset tagging pass
Documentation=https://bitbucket.org/zlalani/marriott-box-image-video-tagging
[Timer]
# Run every day at 2:00 AM server local time
OnCalendar=*-*-* 02:00:00
# If the server was off when the trigger should have fired, run as soon as it boots
Persistent=true
# Add a small randomized delay so multiple servers (if ever scaled) don't hit
# Box and Gemini APIs at the exact same instant
RandomizedDelaySec=5min
Unit=marriott-tagger.service
[Install]
WantedBy=timers.target

4
requirements.txt Normal file
View file

@ -0,0 +1,4 @@
box-sdk-gen[jwt]
google-genai
Pillow
python-dotenv