ai_qc/backend/media_plan_processor.py
nickviljoen ffbec7e457 Load media-plan workbooks in read_only mode to skip pivot caches
openpyxl's default (read/write) loader deserializes pivot cache
records, which hangs for minutes on Amazon media plans that use pivot
tables. The GCP LB then cuts the request off with "upstream request
timeout" / "stream timeout".

read_only=True skips pivot cache parsing entirely, and our code only
uses iter_rows / sheetnames which are both supported in that mode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 21:23:08 +02:00

351 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Media Plan Processor
Parses Excel media plans, matches uploaded assets by filename,
and validates asset specs (dimensions, file type) against the plan.
"""
import os
import re
import json
import difflib
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from PIL import Image
import fitz # PyMuPDF
# Sheets to extract asset data from (channel sheets)
DATA_SHEETS = ['Display', '(P)OLV', '(D)OOH', 'TV', 'Print', 'Audio']
# Common column name mappings (normalized key -> possible header substrings)
COLUMN_MAP = {
'asset_id': ['asset id'],
'status': ['asset status'],
'campaign_project': ['campaign project'],
'campaign_phase': ['campaign phase'],
'creative_name': ['creative name'],
'country': ['country'],
'language': ['language'],
'media_type': ['media type'],
'dimensions': ['resolution', 'dimensions'],
'vendor': ['media property', 'vendor'],
'placement': ['placement'],
'file_type': ['file type'],
'max_file_size': ['max file size'],
'delivery_deadline': ['delivery deadline'],
'media_start_date': ['media start'],
'media_end_date': ['media end'],
'unit_description': ['unit description'],
'asin': ['asin'],
'comments': ['comments'],
}
def _find_column_indices(header_row: list) -> Dict[str, int]:
"""Map normalized column names to their indices based on header row."""
indices = {}
for col_idx, header in enumerate(header_row):
if not header:
continue
header_lower = str(header).lower().strip()
for key, patterns in COLUMN_MAP.items():
if key not in indices: # first match wins
for pattern in patterns:
if pattern in header_lower:
indices[key] = col_idx
break
return indices
def _parse_dimensions(dim_str: str) -> Tuple[Optional[int], Optional[int]]:
"""Parse dimension string like '970x250' into (width, height)."""
if not dim_str:
return None, None
match = re.search(r'(\d+)\s*[xX×]\s*(\d+)', str(dim_str))
if match:
return int(match.group(1)), int(match.group(2))
return None, None
def _format_date(val) -> str:
"""Convert date value to string."""
if val is None:
return ''
if isinstance(val, datetime):
return val.strftime('%Y-%m-%d')
return str(val).strip()
def parse_media_plan(excel_path: str) -> Dict:
"""
Parse an Excel media plan and extract all asset specifications.
Args:
excel_path: Path to the .xlsx file
Returns:
Dict with 'assets' list, 'channels' summary, and metadata
"""
import openpyxl
# read_only=True skips pivot-cache deserialization, which hangs this loader
# on workbooks with pivot tables (Amazon media plans use them extensively).
wb = openpyxl.load_workbook(excel_path, data_only=True, read_only=True)
all_assets = []
channel_counts = {}
for sheet_name in wb.sheetnames:
# Only process data sheets
if sheet_name not in DATA_SHEETS:
# Also try case-insensitive match
matched = False
for ds in DATA_SHEETS:
if ds.lower() == sheet_name.lower():
matched = True
break
if not matched:
continue
ws = wb[sheet_name]
# In read_only mode max_row may be None until dimensions are read; treat
# that as "has data" and let iter_rows decide.
if ws.max_row is not None and ws.max_row < 2:
continue
# Get header row (row 1)
header_row = [cell for cell in next(ws.iter_rows(min_row=1, max_row=1, values_only=True))]
col_map = _find_column_indices(header_row)
if 'asset_id' not in col_map:
continue # Skip sheets without asset ID column
channel_count = 0
# Process data rows
for row in ws.iter_rows(min_row=2, values_only=True):
row_list = list(row)
# Get asset ID - skip empty rows
asset_id_idx = col_map.get('asset_id')
if asset_id_idx is None or asset_id_idx >= len(row_list):
continue
asset_id = row_list[asset_id_idx]
if not asset_id or not str(asset_id).strip():
continue
asset_id = str(asset_id).strip()
# Skip removed assets
status_idx = col_map.get('status')
status = ''
if status_idx is not None and status_idx < len(row_list) and row_list[status_idx]:
status = str(row_list[status_idx]).strip()
if 'removed' in status.lower():
continue
def _get(key):
idx = col_map.get(key)
if idx is not None and idx < len(row_list) and row_list[idx]:
return str(row_list[idx]).strip()
return ''
dim_str = _get('dimensions')
width, height = _parse_dimensions(dim_str)
asset = {
'asset_id': asset_id,
'channel': sheet_name,
'status': status,
'campaign_project': _get('campaign_project'),
'campaign_phase': _get('campaign_phase'),
'creative_name': _get('creative_name'),
'country': _get('country'),
'language': _get('language'),
'media_type': _get('media_type'),
'dimensions': dim_str,
'width': width,
'height': height,
'file_type': _get('file_type').upper(),
'vendor': _get('vendor'),
'placement': _get('placement'),
'unit_description': _get('unit_description'),
'max_file_size': _get('max_file_size'),
'delivery_deadline': _format_date(row_list[col_map['delivery_deadline']] if 'delivery_deadline' in col_map and col_map['delivery_deadline'] < len(row_list) else None),
'media_start_date': _format_date(row_list[col_map['media_start_date']] if 'media_start_date' in col_map and col_map['media_start_date'] < len(row_list) else None),
'media_end_date': _format_date(row_list[col_map['media_end_date']] if 'media_end_date' in col_map and col_map['media_end_date'] < len(row_list) else None),
}
all_assets.append(asset)
channel_count += 1
if channel_count > 0:
channel_counts[sheet_name] = channel_count
wb.close()
return {
'assets': all_assets,
'total_assets': len(all_assets),
'channels': channel_counts,
'parsed_at': datetime.now().isoformat(),
}
def find_matching_asset(filename: str, media_plan_data: Dict) -> Optional[Dict]:
"""
Find the best matching asset in the media plan for a given filename.
Args:
filename: Uploaded file's name (e.g., 'XCER25IE001_Azzurri_Live1_IE-1-1_RTE_970x250_08-07-2025.jpg')
media_plan_data: Parsed media plan dict with 'assets' list
Returns:
Dict with 'match', 'confidence', and matched asset data, or None
"""
if not filename or not media_plan_data or not media_plan_data.get('assets'):
return None
# Strip extension from filename for matching
name_no_ext = os.path.splitext(filename)[0].strip()
assets = media_plan_data['assets']
# Try exact match first
for asset in assets:
if asset['asset_id'] == name_no_ext:
return {'match': asset, 'confidence': 1.0, 'match_type': 'exact'}
# Try case-insensitive exact match
name_lower = name_no_ext.lower()
for asset in assets:
if asset['asset_id'].lower() == name_lower:
return {'match': asset, 'confidence': 0.98, 'match_type': 'case_insensitive'}
# Try starts-with match (filename might have extra suffixes)
for asset in assets:
if name_lower.startswith(asset['asset_id'].lower()):
return {'match': asset, 'confidence': 0.90, 'match_type': 'starts_with'}
# Try contains match (asset_id contained in filename)
for asset in assets:
if asset['asset_id'].lower() in name_lower:
return {'match': asset, 'confidence': 0.85, 'match_type': 'contains'}
# Fuzzy match as last resort
asset_ids = [a['asset_id'] for a in assets]
matches = difflib.get_close_matches(name_no_ext, asset_ids, n=1, cutoff=0.7)
if matches:
matched_id = matches[0]
for asset in assets:
if asset['asset_id'] == matched_id:
ratio = difflib.SequenceMatcher(None, name_no_ext.lower(), matched_id.lower()).ratio()
return {'match': asset, 'confidence': round(ratio, 2), 'match_type': 'fuzzy'}
return None
def get_asset_dimensions(file_path: str) -> Tuple[Optional[int], Optional[int]]:
"""
Get the actual dimensions of an image or PDF file.
Returns:
(width, height) tuple, or (None, None) if unable to determine
"""
try:
ext = os.path.splitext(file_path)[1].lower()
if ext == '.pdf':
doc = fitz.open(file_path)
if doc.page_count > 0:
page = doc.load_page(0)
rect = page.rect
doc.close()
return int(rect.width), int(rect.height)
doc.close()
else:
img = Image.open(file_path)
w, h = img.size
img.close()
return w, h
except Exception as e:
print(f"Error getting dimensions for {file_path}: {e}")
return None, None
def validate_asset_specs(file_path: str, matched_spec: Dict) -> Dict:
"""
Validate an asset's dimensions and file type against the media plan spec.
Args:
file_path: Path to the uploaded asset file
matched_spec: The matched asset entry from the media plan
Returns:
Validation results dict
"""
actual_w, actual_h = get_asset_dimensions(file_path)
actual_ext = os.path.splitext(file_path)[1].lower().lstrip('.')
expected_w = matched_spec.get('width')
expected_h = matched_spec.get('height')
expected_type = matched_spec.get('file_type', '').lower()
# Dimensions check
dims_match = None
if actual_w and actual_h and expected_w and expected_h:
dims_match = (actual_w == expected_w and actual_h == expected_h)
# File type check
type_match = None
if expected_type and actual_ext:
# Normalize common equivalences
type_map = {'jpeg': 'jpg', 'tif': 'tiff'}
norm_actual = type_map.get(actual_ext, actual_ext)
norm_expected = type_map.get(expected_type, expected_type)
type_match = (norm_actual == norm_expected)
return {
'dimensions_match': dims_match,
'expected_dimensions': matched_spec.get('dimensions', ''),
'actual_dimensions': f"{actual_w}x{actual_h}" if actual_w and actual_h else 'unknown',
'actual_width': actual_w,
'actual_height': actual_h,
'file_type_match': type_match,
'expected_file_type': matched_spec.get('file_type', ''),
'actual_file_type': actual_ext.upper(),
}
def build_media_plan_context(matched_spec: Dict) -> str:
"""
Build a text context string from a matched media plan entry
for inclusion in QC check prompts.
"""
lines = [
"\n=== MEDIA PLAN CONTEXT ===",
"This asset is specified in the media plan as:",
f"- Asset ID: {matched_spec.get('asset_id', '')}",
]
if matched_spec.get('country'):
lines.append(f"- Country: {matched_spec['country']}")
if matched_spec.get('language'):
lines.append(f"- Language: {matched_spec['language']}")
if matched_spec.get('campaign_phase'):
lines.append(f"- Campaign Phase: {matched_spec['campaign_phase']}")
if matched_spec.get('creative_name'):
lines.append(f"- Creative: {matched_spec['creative_name']}")
if matched_spec.get('channel'):
lines.append(f"- Channel: {matched_spec['channel']}")
if matched_spec.get('vendor'):
lines.append(f"- Media Vendor: {matched_spec['vendor']}")
if matched_spec.get('placement'):
lines.append(f"- Placement: {matched_spec['placement']}")
if matched_spec.get('dimensions'):
lines.append(f"- Expected Dimensions: {matched_spec['dimensions']}")
if matched_spec.get('file_type'):
lines.append(f"- Expected File Type: {matched_spec['file_type']}")
lines.append("Please verify the asset content is appropriate for this placement and market.")
lines.append("=== END MEDIA PLAN CONTEXT ===\n")
return "\n".join(lines)