openpyxl's default (read/write) loader deserializes pivot cache records, which hangs for minutes on Amazon media plans that use pivot tables. The GCP LB then cuts the request off with "upstream request timeout" / "stream timeout". read_only=True skips pivot cache parsing entirely, and our code only uses iter_rows / sheetnames which are both supported in that mode. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
351 lines
12 KiB
Python
351 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Media Plan Processor
|
||
Parses Excel media plans, matches uploaded assets by filename,
|
||
and validates asset specs (dimensions, file type) against the plan.
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
import difflib
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional, Tuple
|
||
from PIL import Image
|
||
import fitz # PyMuPDF
|
||
|
||
|
||
# Sheets to extract asset data from (channel sheets)
|
||
DATA_SHEETS = ['Display', '(P)OLV', '(D)OOH', 'TV', 'Print', 'Audio']
|
||
|
||
# Common column name mappings (normalized key -> possible header substrings)
|
||
COLUMN_MAP = {
|
||
'asset_id': ['asset id'],
|
||
'status': ['asset status'],
|
||
'campaign_project': ['campaign project'],
|
||
'campaign_phase': ['campaign phase'],
|
||
'creative_name': ['creative name'],
|
||
'country': ['country'],
|
||
'language': ['language'],
|
||
'media_type': ['media type'],
|
||
'dimensions': ['resolution', 'dimensions'],
|
||
'vendor': ['media property', 'vendor'],
|
||
'placement': ['placement'],
|
||
'file_type': ['file type'],
|
||
'max_file_size': ['max file size'],
|
||
'delivery_deadline': ['delivery deadline'],
|
||
'media_start_date': ['media start'],
|
||
'media_end_date': ['media end'],
|
||
'unit_description': ['unit description'],
|
||
'asin': ['asin'],
|
||
'comments': ['comments'],
|
||
}
|
||
|
||
|
||
def _find_column_indices(header_row: list) -> Dict[str, int]:
|
||
"""Map normalized column names to their indices based on header row."""
|
||
indices = {}
|
||
for col_idx, header in enumerate(header_row):
|
||
if not header:
|
||
continue
|
||
header_lower = str(header).lower().strip()
|
||
for key, patterns in COLUMN_MAP.items():
|
||
if key not in indices: # first match wins
|
||
for pattern in patterns:
|
||
if pattern in header_lower:
|
||
indices[key] = col_idx
|
||
break
|
||
return indices
|
||
|
||
|
||
def _parse_dimensions(dim_str: str) -> Tuple[Optional[int], Optional[int]]:
|
||
"""Parse dimension string like '970x250' into (width, height)."""
|
||
if not dim_str:
|
||
return None, None
|
||
match = re.search(r'(\d+)\s*[xX×]\s*(\d+)', str(dim_str))
|
||
if match:
|
||
return int(match.group(1)), int(match.group(2))
|
||
return None, None
|
||
|
||
|
||
def _format_date(val) -> str:
|
||
"""Convert date value to string."""
|
||
if val is None:
|
||
return ''
|
||
if isinstance(val, datetime):
|
||
return val.strftime('%Y-%m-%d')
|
||
return str(val).strip()
|
||
|
||
|
||
def parse_media_plan(excel_path: str) -> Dict:
|
||
"""
|
||
Parse an Excel media plan and extract all asset specifications.
|
||
|
||
Args:
|
||
excel_path: Path to the .xlsx file
|
||
|
||
Returns:
|
||
Dict with 'assets' list, 'channels' summary, and metadata
|
||
"""
|
||
import openpyxl
|
||
|
||
# read_only=True skips pivot-cache deserialization, which hangs this loader
|
||
# on workbooks with pivot tables (Amazon media plans use them extensively).
|
||
wb = openpyxl.load_workbook(excel_path, data_only=True, read_only=True)
|
||
all_assets = []
|
||
channel_counts = {}
|
||
|
||
for sheet_name in wb.sheetnames:
|
||
# Only process data sheets
|
||
if sheet_name not in DATA_SHEETS:
|
||
# Also try case-insensitive match
|
||
matched = False
|
||
for ds in DATA_SHEETS:
|
||
if ds.lower() == sheet_name.lower():
|
||
matched = True
|
||
break
|
||
if not matched:
|
||
continue
|
||
|
||
ws = wb[sheet_name]
|
||
# In read_only mode max_row may be None until dimensions are read; treat
|
||
# that as "has data" and let iter_rows decide.
|
||
if ws.max_row is not None and ws.max_row < 2:
|
||
continue
|
||
|
||
# Get header row (row 1)
|
||
header_row = [cell for cell in next(ws.iter_rows(min_row=1, max_row=1, values_only=True))]
|
||
col_map = _find_column_indices(header_row)
|
||
|
||
if 'asset_id' not in col_map:
|
||
continue # Skip sheets without asset ID column
|
||
|
||
channel_count = 0
|
||
|
||
# Process data rows
|
||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||
row_list = list(row)
|
||
|
||
# Get asset ID - skip empty rows
|
||
asset_id_idx = col_map.get('asset_id')
|
||
if asset_id_idx is None or asset_id_idx >= len(row_list):
|
||
continue
|
||
asset_id = row_list[asset_id_idx]
|
||
if not asset_id or not str(asset_id).strip():
|
||
continue
|
||
|
||
asset_id = str(asset_id).strip()
|
||
|
||
# Skip removed assets
|
||
status_idx = col_map.get('status')
|
||
status = ''
|
||
if status_idx is not None and status_idx < len(row_list) and row_list[status_idx]:
|
||
status = str(row_list[status_idx]).strip()
|
||
if 'removed' in status.lower():
|
||
continue
|
||
|
||
def _get(key):
|
||
idx = col_map.get(key)
|
||
if idx is not None and idx < len(row_list) and row_list[idx]:
|
||
return str(row_list[idx]).strip()
|
||
return ''
|
||
|
||
dim_str = _get('dimensions')
|
||
width, height = _parse_dimensions(dim_str)
|
||
|
||
asset = {
|
||
'asset_id': asset_id,
|
||
'channel': sheet_name,
|
||
'status': status,
|
||
'campaign_project': _get('campaign_project'),
|
||
'campaign_phase': _get('campaign_phase'),
|
||
'creative_name': _get('creative_name'),
|
||
'country': _get('country'),
|
||
'language': _get('language'),
|
||
'media_type': _get('media_type'),
|
||
'dimensions': dim_str,
|
||
'width': width,
|
||
'height': height,
|
||
'file_type': _get('file_type').upper(),
|
||
'vendor': _get('vendor'),
|
||
'placement': _get('placement'),
|
||
'unit_description': _get('unit_description'),
|
||
'max_file_size': _get('max_file_size'),
|
||
'delivery_deadline': _format_date(row_list[col_map['delivery_deadline']] if 'delivery_deadline' in col_map and col_map['delivery_deadline'] < len(row_list) else None),
|
||
'media_start_date': _format_date(row_list[col_map['media_start_date']] if 'media_start_date' in col_map and col_map['media_start_date'] < len(row_list) else None),
|
||
'media_end_date': _format_date(row_list[col_map['media_end_date']] if 'media_end_date' in col_map and col_map['media_end_date'] < len(row_list) else None),
|
||
}
|
||
|
||
all_assets.append(asset)
|
||
channel_count += 1
|
||
|
||
if channel_count > 0:
|
||
channel_counts[sheet_name] = channel_count
|
||
|
||
wb.close()
|
||
|
||
return {
|
||
'assets': all_assets,
|
||
'total_assets': len(all_assets),
|
||
'channels': channel_counts,
|
||
'parsed_at': datetime.now().isoformat(),
|
||
}
|
||
|
||
|
||
def find_matching_asset(filename: str, media_plan_data: Dict) -> Optional[Dict]:
|
||
"""
|
||
Find the best matching asset in the media plan for a given filename.
|
||
|
||
Args:
|
||
filename: Uploaded file's name (e.g., 'XCER25IE001_Azzurri_Live1_IE-1-1_RTE_970x250_08-07-2025.jpg')
|
||
media_plan_data: Parsed media plan dict with 'assets' list
|
||
|
||
Returns:
|
||
Dict with 'match', 'confidence', and matched asset data, or None
|
||
"""
|
||
if not filename or not media_plan_data or not media_plan_data.get('assets'):
|
||
return None
|
||
|
||
# Strip extension from filename for matching
|
||
name_no_ext = os.path.splitext(filename)[0].strip()
|
||
assets = media_plan_data['assets']
|
||
|
||
# Try exact match first
|
||
for asset in assets:
|
||
if asset['asset_id'] == name_no_ext:
|
||
return {'match': asset, 'confidence': 1.0, 'match_type': 'exact'}
|
||
|
||
# Try case-insensitive exact match
|
||
name_lower = name_no_ext.lower()
|
||
for asset in assets:
|
||
if asset['asset_id'].lower() == name_lower:
|
||
return {'match': asset, 'confidence': 0.98, 'match_type': 'case_insensitive'}
|
||
|
||
# Try starts-with match (filename might have extra suffixes)
|
||
for asset in assets:
|
||
if name_lower.startswith(asset['asset_id'].lower()):
|
||
return {'match': asset, 'confidence': 0.90, 'match_type': 'starts_with'}
|
||
|
||
# Try contains match (asset_id contained in filename)
|
||
for asset in assets:
|
||
if asset['asset_id'].lower() in name_lower:
|
||
return {'match': asset, 'confidence': 0.85, 'match_type': 'contains'}
|
||
|
||
# Fuzzy match as last resort
|
||
asset_ids = [a['asset_id'] for a in assets]
|
||
matches = difflib.get_close_matches(name_no_ext, asset_ids, n=1, cutoff=0.7)
|
||
if matches:
|
||
matched_id = matches[0]
|
||
for asset in assets:
|
||
if asset['asset_id'] == matched_id:
|
||
ratio = difflib.SequenceMatcher(None, name_no_ext.lower(), matched_id.lower()).ratio()
|
||
return {'match': asset, 'confidence': round(ratio, 2), 'match_type': 'fuzzy'}
|
||
|
||
return None
|
||
|
||
|
||
def get_asset_dimensions(file_path: str) -> Tuple[Optional[int], Optional[int]]:
|
||
"""
|
||
Get the actual dimensions of an image or PDF file.
|
||
|
||
Returns:
|
||
(width, height) tuple, or (None, None) if unable to determine
|
||
"""
|
||
try:
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
if ext == '.pdf':
|
||
doc = fitz.open(file_path)
|
||
if doc.page_count > 0:
|
||
page = doc.load_page(0)
|
||
rect = page.rect
|
||
doc.close()
|
||
return int(rect.width), int(rect.height)
|
||
doc.close()
|
||
else:
|
||
img = Image.open(file_path)
|
||
w, h = img.size
|
||
img.close()
|
||
return w, h
|
||
except Exception as e:
|
||
print(f"Error getting dimensions for {file_path}: {e}")
|
||
return None, None
|
||
|
||
|
||
def validate_asset_specs(file_path: str, matched_spec: Dict) -> Dict:
|
||
"""
|
||
Validate an asset's dimensions and file type against the media plan spec.
|
||
|
||
Args:
|
||
file_path: Path to the uploaded asset file
|
||
matched_spec: The matched asset entry from the media plan
|
||
|
||
Returns:
|
||
Validation results dict
|
||
"""
|
||
actual_w, actual_h = get_asset_dimensions(file_path)
|
||
actual_ext = os.path.splitext(file_path)[1].lower().lstrip('.')
|
||
|
||
expected_w = matched_spec.get('width')
|
||
expected_h = matched_spec.get('height')
|
||
expected_type = matched_spec.get('file_type', '').lower()
|
||
|
||
# Dimensions check
|
||
dims_match = None
|
||
if actual_w and actual_h and expected_w and expected_h:
|
||
dims_match = (actual_w == expected_w and actual_h == expected_h)
|
||
|
||
# File type check
|
||
type_match = None
|
||
if expected_type and actual_ext:
|
||
# Normalize common equivalences
|
||
type_map = {'jpeg': 'jpg', 'tif': 'tiff'}
|
||
norm_actual = type_map.get(actual_ext, actual_ext)
|
||
norm_expected = type_map.get(expected_type, expected_type)
|
||
type_match = (norm_actual == norm_expected)
|
||
|
||
return {
|
||
'dimensions_match': dims_match,
|
||
'expected_dimensions': matched_spec.get('dimensions', ''),
|
||
'actual_dimensions': f"{actual_w}x{actual_h}" if actual_w and actual_h else 'unknown',
|
||
'actual_width': actual_w,
|
||
'actual_height': actual_h,
|
||
'file_type_match': type_match,
|
||
'expected_file_type': matched_spec.get('file_type', ''),
|
||
'actual_file_type': actual_ext.upper(),
|
||
}
|
||
|
||
|
||
def build_media_plan_context(matched_spec: Dict) -> str:
|
||
"""
|
||
Build a text context string from a matched media plan entry
|
||
for inclusion in QC check prompts.
|
||
"""
|
||
lines = [
|
||
"\n=== MEDIA PLAN CONTEXT ===",
|
||
"This asset is specified in the media plan as:",
|
||
f"- Asset ID: {matched_spec.get('asset_id', '')}",
|
||
]
|
||
|
||
if matched_spec.get('country'):
|
||
lines.append(f"- Country: {matched_spec['country']}")
|
||
if matched_spec.get('language'):
|
||
lines.append(f"- Language: {matched_spec['language']}")
|
||
if matched_spec.get('campaign_phase'):
|
||
lines.append(f"- Campaign Phase: {matched_spec['campaign_phase']}")
|
||
if matched_spec.get('creative_name'):
|
||
lines.append(f"- Creative: {matched_spec['creative_name']}")
|
||
if matched_spec.get('channel'):
|
||
lines.append(f"- Channel: {matched_spec['channel']}")
|
||
if matched_spec.get('vendor'):
|
||
lines.append(f"- Media Vendor: {matched_spec['vendor']}")
|
||
if matched_spec.get('placement'):
|
||
lines.append(f"- Placement: {matched_spec['placement']}")
|
||
if matched_spec.get('dimensions'):
|
||
lines.append(f"- Expected Dimensions: {matched_spec['dimensions']}")
|
||
if matched_spec.get('file_type'):
|
||
lines.append(f"- Expected File Type: {matched_spec['file_type']}")
|
||
|
||
lines.append("Please verify the asset content is appropriate for this placement and market.")
|
||
lines.append("=== END MEDIA PLAN CONTEXT ===\n")
|
||
|
||
return "\n".join(lines)
|