Upload Excel media plans per client. On QC analysis, automatically match the uploaded file's name against the media plan's Asset IDs to validate dimensions and file type, and include the media plan context (country, language, placement, vendor) in QC check prompts. - New backend/media_plan_processor.py: Excel parsing, fuzzy filename matching, dimension/file-type validation, prompt context builder - New backend/media_plans/ directory for storage - API endpoints: POST/GET/DELETE /api/media_plan - Settings modal: new "Media Plan" tab for upload/manage - Analysis flow: auto-match + validation in response + context in prompts Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
347 lines
12 KiB
Python
347 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Media Plan Processor
|
||
Parses Excel media plans, matches uploaded assets by filename,
|
||
and validates asset specs (dimensions, file type) against the plan.
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
import difflib
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional, Tuple
|
||
from PIL import Image
|
||
import fitz # PyMuPDF
|
||
|
||
|
||
# Sheets to extract asset data from (channel sheets)
|
||
DATA_SHEETS = ['Display', '(P)OLV', '(D)OOH', 'TV', 'Print', 'Audio']
|
||
|
||
# Common column name mappings (normalized key -> possible header substrings)
|
||
COLUMN_MAP = {
|
||
'asset_id': ['asset id'],
|
||
'status': ['asset status'],
|
||
'campaign_project': ['campaign project'],
|
||
'campaign_phase': ['campaign phase'],
|
||
'creative_name': ['creative name'],
|
||
'country': ['country'],
|
||
'language': ['language'],
|
||
'media_type': ['media type'],
|
||
'dimensions': ['resolution', 'dimensions'],
|
||
'vendor': ['media property', 'vendor'],
|
||
'placement': ['placement'],
|
||
'file_type': ['file type'],
|
||
'max_file_size': ['max file size'],
|
||
'delivery_deadline': ['delivery deadline'],
|
||
'media_start_date': ['media start'],
|
||
'media_end_date': ['media end'],
|
||
'unit_description': ['unit description'],
|
||
'asin': ['asin'],
|
||
'comments': ['comments'],
|
||
}
|
||
|
||
|
||
def _find_column_indices(header_row: list) -> Dict[str, int]:
|
||
"""Map normalized column names to their indices based on header row."""
|
||
indices = {}
|
||
for col_idx, header in enumerate(header_row):
|
||
if not header:
|
||
continue
|
||
header_lower = str(header).lower().strip()
|
||
for key, patterns in COLUMN_MAP.items():
|
||
if key not in indices: # first match wins
|
||
for pattern in patterns:
|
||
if pattern in header_lower:
|
||
indices[key] = col_idx
|
||
break
|
||
return indices
|
||
|
||
|
||
def _parse_dimensions(dim_str: str) -> Tuple[Optional[int], Optional[int]]:
|
||
"""Parse dimension string like '970x250' into (width, height)."""
|
||
if not dim_str:
|
||
return None, None
|
||
match = re.search(r'(\d+)\s*[xX×]\s*(\d+)', str(dim_str))
|
||
if match:
|
||
return int(match.group(1)), int(match.group(2))
|
||
return None, None
|
||
|
||
|
||
def _format_date(val) -> str:
|
||
"""Convert date value to string."""
|
||
if val is None:
|
||
return ''
|
||
if isinstance(val, datetime):
|
||
return val.strftime('%Y-%m-%d')
|
||
return str(val).strip()
|
||
|
||
|
||
def parse_media_plan(excel_path: str) -> Dict:
|
||
"""
|
||
Parse an Excel media plan and extract all asset specifications.
|
||
|
||
Args:
|
||
excel_path: Path to the .xlsx file
|
||
|
||
Returns:
|
||
Dict with 'assets' list, 'channels' summary, and metadata
|
||
"""
|
||
import openpyxl
|
||
|
||
wb = openpyxl.load_workbook(excel_path, data_only=True)
|
||
all_assets = []
|
||
channel_counts = {}
|
||
|
||
for sheet_name in wb.sheetnames:
|
||
# Only process data sheets
|
||
if sheet_name not in DATA_SHEETS:
|
||
# Also try case-insensitive match
|
||
matched = False
|
||
for ds in DATA_SHEETS:
|
||
if ds.lower() == sheet_name.lower():
|
||
matched = True
|
||
break
|
||
if not matched:
|
||
continue
|
||
|
||
ws = wb[sheet_name]
|
||
if ws.max_row < 2:
|
||
continue
|
||
|
||
# Get header row (row 1)
|
||
header_row = [cell for cell in next(ws.iter_rows(min_row=1, max_row=1, values_only=True))]
|
||
col_map = _find_column_indices(header_row)
|
||
|
||
if 'asset_id' not in col_map:
|
||
continue # Skip sheets without asset ID column
|
||
|
||
channel_count = 0
|
||
|
||
# Process data rows
|
||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||
row_list = list(row)
|
||
|
||
# Get asset ID - skip empty rows
|
||
asset_id_idx = col_map.get('asset_id')
|
||
if asset_id_idx is None or asset_id_idx >= len(row_list):
|
||
continue
|
||
asset_id = row_list[asset_id_idx]
|
||
if not asset_id or not str(asset_id).strip():
|
||
continue
|
||
|
||
asset_id = str(asset_id).strip()
|
||
|
||
# Skip removed assets
|
||
status_idx = col_map.get('status')
|
||
status = ''
|
||
if status_idx is not None and status_idx < len(row_list) and row_list[status_idx]:
|
||
status = str(row_list[status_idx]).strip()
|
||
if 'removed' in status.lower():
|
||
continue
|
||
|
||
def _get(key):
|
||
idx = col_map.get(key)
|
||
if idx is not None and idx < len(row_list) and row_list[idx]:
|
||
return str(row_list[idx]).strip()
|
||
return ''
|
||
|
||
dim_str = _get('dimensions')
|
||
width, height = _parse_dimensions(dim_str)
|
||
|
||
asset = {
|
||
'asset_id': asset_id,
|
||
'channel': sheet_name,
|
||
'status': status,
|
||
'campaign_project': _get('campaign_project'),
|
||
'campaign_phase': _get('campaign_phase'),
|
||
'creative_name': _get('creative_name'),
|
||
'country': _get('country'),
|
||
'language': _get('language'),
|
||
'media_type': _get('media_type'),
|
||
'dimensions': dim_str,
|
||
'width': width,
|
||
'height': height,
|
||
'file_type': _get('file_type').upper(),
|
||
'vendor': _get('vendor'),
|
||
'placement': _get('placement'),
|
||
'unit_description': _get('unit_description'),
|
||
'max_file_size': _get('max_file_size'),
|
||
'delivery_deadline': _format_date(row_list[col_map['delivery_deadline']] if 'delivery_deadline' in col_map and col_map['delivery_deadline'] < len(row_list) else None),
|
||
'media_start_date': _format_date(row_list[col_map['media_start_date']] if 'media_start_date' in col_map and col_map['media_start_date'] < len(row_list) else None),
|
||
'media_end_date': _format_date(row_list[col_map['media_end_date']] if 'media_end_date' in col_map and col_map['media_end_date'] < len(row_list) else None),
|
||
}
|
||
|
||
all_assets.append(asset)
|
||
channel_count += 1
|
||
|
||
if channel_count > 0:
|
||
channel_counts[sheet_name] = channel_count
|
||
|
||
wb.close()
|
||
|
||
return {
|
||
'assets': all_assets,
|
||
'total_assets': len(all_assets),
|
||
'channels': channel_counts,
|
||
'parsed_at': datetime.now().isoformat(),
|
||
}
|
||
|
||
|
||
def find_matching_asset(filename: str, media_plan_data: Dict) -> Optional[Dict]:
|
||
"""
|
||
Find the best matching asset in the media plan for a given filename.
|
||
|
||
Args:
|
||
filename: Uploaded file's name (e.g., 'XCER25IE001_Azzurri_Live1_IE-1-1_RTE_970x250_08-07-2025.jpg')
|
||
media_plan_data: Parsed media plan dict with 'assets' list
|
||
|
||
Returns:
|
||
Dict with 'match', 'confidence', and matched asset data, or None
|
||
"""
|
||
if not filename or not media_plan_data or not media_plan_data.get('assets'):
|
||
return None
|
||
|
||
# Strip extension from filename for matching
|
||
name_no_ext = os.path.splitext(filename)[0].strip()
|
||
assets = media_plan_data['assets']
|
||
|
||
# Try exact match first
|
||
for asset in assets:
|
||
if asset['asset_id'] == name_no_ext:
|
||
return {'match': asset, 'confidence': 1.0, 'match_type': 'exact'}
|
||
|
||
# Try case-insensitive exact match
|
||
name_lower = name_no_ext.lower()
|
||
for asset in assets:
|
||
if asset['asset_id'].lower() == name_lower:
|
||
return {'match': asset, 'confidence': 0.98, 'match_type': 'case_insensitive'}
|
||
|
||
# Try starts-with match (filename might have extra suffixes)
|
||
for asset in assets:
|
||
if name_lower.startswith(asset['asset_id'].lower()):
|
||
return {'match': asset, 'confidence': 0.90, 'match_type': 'starts_with'}
|
||
|
||
# Try contains match (asset_id contained in filename)
|
||
for asset in assets:
|
||
if asset['asset_id'].lower() in name_lower:
|
||
return {'match': asset, 'confidence': 0.85, 'match_type': 'contains'}
|
||
|
||
# Fuzzy match as last resort
|
||
asset_ids = [a['asset_id'] for a in assets]
|
||
matches = difflib.get_close_matches(name_no_ext, asset_ids, n=1, cutoff=0.7)
|
||
if matches:
|
||
matched_id = matches[0]
|
||
for asset in assets:
|
||
if asset['asset_id'] == matched_id:
|
||
ratio = difflib.SequenceMatcher(None, name_no_ext.lower(), matched_id.lower()).ratio()
|
||
return {'match': asset, 'confidence': round(ratio, 2), 'match_type': 'fuzzy'}
|
||
|
||
return None
|
||
|
||
|
||
def get_asset_dimensions(file_path: str) -> Tuple[Optional[int], Optional[int]]:
|
||
"""
|
||
Get the actual dimensions of an image or PDF file.
|
||
|
||
Returns:
|
||
(width, height) tuple, or (None, None) if unable to determine
|
||
"""
|
||
try:
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
if ext == '.pdf':
|
||
doc = fitz.open(file_path)
|
||
if doc.page_count > 0:
|
||
page = doc.load_page(0)
|
||
rect = page.rect
|
||
doc.close()
|
||
return int(rect.width), int(rect.height)
|
||
doc.close()
|
||
else:
|
||
img = Image.open(file_path)
|
||
w, h = img.size
|
||
img.close()
|
||
return w, h
|
||
except Exception as e:
|
||
print(f"Error getting dimensions for {file_path}: {e}")
|
||
return None, None
|
||
|
||
|
||
def validate_asset_specs(file_path: str, matched_spec: Dict) -> Dict:
|
||
"""
|
||
Validate an asset's dimensions and file type against the media plan spec.
|
||
|
||
Args:
|
||
file_path: Path to the uploaded asset file
|
||
matched_spec: The matched asset entry from the media plan
|
||
|
||
Returns:
|
||
Validation results dict
|
||
"""
|
||
actual_w, actual_h = get_asset_dimensions(file_path)
|
||
actual_ext = os.path.splitext(file_path)[1].lower().lstrip('.')
|
||
|
||
expected_w = matched_spec.get('width')
|
||
expected_h = matched_spec.get('height')
|
||
expected_type = matched_spec.get('file_type', '').lower()
|
||
|
||
# Dimensions check
|
||
dims_match = None
|
||
if actual_w and actual_h and expected_w and expected_h:
|
||
dims_match = (actual_w == expected_w and actual_h == expected_h)
|
||
|
||
# File type check
|
||
type_match = None
|
||
if expected_type and actual_ext:
|
||
# Normalize common equivalences
|
||
type_map = {'jpeg': 'jpg', 'tif': 'tiff'}
|
||
norm_actual = type_map.get(actual_ext, actual_ext)
|
||
norm_expected = type_map.get(expected_type, expected_type)
|
||
type_match = (norm_actual == norm_expected)
|
||
|
||
return {
|
||
'dimensions_match': dims_match,
|
||
'expected_dimensions': matched_spec.get('dimensions', ''),
|
||
'actual_dimensions': f"{actual_w}x{actual_h}" if actual_w and actual_h else 'unknown',
|
||
'actual_width': actual_w,
|
||
'actual_height': actual_h,
|
||
'file_type_match': type_match,
|
||
'expected_file_type': matched_spec.get('file_type', ''),
|
||
'actual_file_type': actual_ext.upper(),
|
||
}
|
||
|
||
|
||
def build_media_plan_context(matched_spec: Dict) -> str:
|
||
"""
|
||
Build a text context string from a matched media plan entry
|
||
for inclusion in QC check prompts.
|
||
"""
|
||
lines = [
|
||
"\n=== MEDIA PLAN CONTEXT ===",
|
||
"This asset is specified in the media plan as:",
|
||
f"- Asset ID: {matched_spec.get('asset_id', '')}",
|
||
]
|
||
|
||
if matched_spec.get('country'):
|
||
lines.append(f"- Country: {matched_spec['country']}")
|
||
if matched_spec.get('language'):
|
||
lines.append(f"- Language: {matched_spec['language']}")
|
||
if matched_spec.get('campaign_phase'):
|
||
lines.append(f"- Campaign Phase: {matched_spec['campaign_phase']}")
|
||
if matched_spec.get('creative_name'):
|
||
lines.append(f"- Creative: {matched_spec['creative_name']}")
|
||
if matched_spec.get('channel'):
|
||
lines.append(f"- Channel: {matched_spec['channel']}")
|
||
if matched_spec.get('vendor'):
|
||
lines.append(f"- Media Vendor: {matched_spec['vendor']}")
|
||
if matched_spec.get('placement'):
|
||
lines.append(f"- Placement: {matched_spec['placement']}")
|
||
if matched_spec.get('dimensions'):
|
||
lines.append(f"- Expected Dimensions: {matched_spec['dimensions']}")
|
||
if matched_spec.get('file_type'):
|
||
lines.append(f"- Expected File Type: {matched_spec['file_type']}")
|
||
|
||
lines.append("Please verify the asset content is appropriate for this placement and market.")
|
||
lines.append("=== END MEDIA PLAN CONTEXT ===\n")
|
||
|
||
return "\n".join(lines)
|