#!/usr/bin/env python3 """ Media Plan Processor Parses Excel media plans, matches uploaded assets by filename, and validates asset specs (dimensions, file type) against the plan. """ import os import re import json import difflib from datetime import datetime from typing import Dict, List, Optional, Tuple from PIL import Image import fitz # PyMuPDF # Sheets to extract asset data from (channel sheets) DATA_SHEETS = ['Display', '(P)OLV', '(D)OOH', 'TV', 'Print', 'Audio'] # Common column name mappings (normalized key -> possible header substrings) COLUMN_MAP = { 'asset_id': ['asset id'], 'status': ['asset status'], 'campaign_project': ['campaign project'], 'campaign_phase': ['campaign phase'], 'creative_name': ['creative name'], 'country': ['country'], 'language': ['language'], 'media_type': ['media type'], 'dimensions': ['resolution', 'dimensions'], 'vendor': ['media property', 'vendor'], 'placement': ['placement'], 'file_type': ['file type'], 'max_file_size': ['max file size'], 'delivery_deadline': ['delivery deadline'], 'media_start_date': ['media start'], 'media_end_date': ['media end'], 'unit_description': ['unit description'], 'asin': ['asin'], 'comments': ['comments'], } def _find_column_indices(header_row: list) -> Dict[str, int]: """Map normalized column names to their indices based on header row.""" indices = {} for col_idx, header in enumerate(header_row): if not header: continue header_lower = str(header).lower().strip() for key, patterns in COLUMN_MAP.items(): if key not in indices: # first match wins for pattern in patterns: if pattern in header_lower: indices[key] = col_idx break return indices def _parse_dimensions(dim_str: str) -> Tuple[Optional[int], Optional[int]]: """Parse dimension string like '970x250' into (width, height).""" if not dim_str: return None, None match = re.search(r'(\d+)\s*[xXĂ—]\s*(\d+)', str(dim_str)) if match: return int(match.group(1)), int(match.group(2)) return None, None def _format_date(val) -> str: """Convert date value to string.""" if val is None: return '' if isinstance(val, datetime): return val.strftime('%Y-%m-%d') return str(val).strip() def parse_media_plan(excel_path: str) -> Dict: """ Parse an Excel media plan and extract all asset specifications. Args: excel_path: Path to the .xlsx file Returns: Dict with 'assets' list, 'channels' summary, and metadata """ import openpyxl # read_only=True skips pivot-cache deserialization, which hangs this loader # on workbooks with pivot tables (Amazon media plans use them extensively). wb = openpyxl.load_workbook(excel_path, data_only=True, read_only=True) all_assets = [] channel_counts = {} for sheet_name in wb.sheetnames: # Only process data sheets if sheet_name not in DATA_SHEETS: # Also try case-insensitive match matched = False for ds in DATA_SHEETS: if ds.lower() == sheet_name.lower(): matched = True break if not matched: continue ws = wb[sheet_name] # In read_only mode max_row may be None until dimensions are read; treat # that as "has data" and let iter_rows decide. if ws.max_row is not None and ws.max_row < 2: continue # Get header row (row 1) header_row = [cell for cell in next(ws.iter_rows(min_row=1, max_row=1, values_only=True))] col_map = _find_column_indices(header_row) if 'asset_id' not in col_map: continue # Skip sheets without asset ID column channel_count = 0 # Process data rows for row in ws.iter_rows(min_row=2, values_only=True): row_list = list(row) # Get asset ID - skip empty rows asset_id_idx = col_map.get('asset_id') if asset_id_idx is None or asset_id_idx >= len(row_list): continue asset_id = row_list[asset_id_idx] if not asset_id or not str(asset_id).strip(): continue asset_id = str(asset_id).strip() # Skip removed assets status_idx = col_map.get('status') status = '' if status_idx is not None and status_idx < len(row_list) and row_list[status_idx]: status = str(row_list[status_idx]).strip() if 'removed' in status.lower(): continue def _get(key): idx = col_map.get(key) if idx is not None and idx < len(row_list) and row_list[idx]: return str(row_list[idx]).strip() return '' dim_str = _get('dimensions') width, height = _parse_dimensions(dim_str) asset = { 'asset_id': asset_id, 'channel': sheet_name, 'status': status, 'campaign_project': _get('campaign_project'), 'campaign_phase': _get('campaign_phase'), 'creative_name': _get('creative_name'), 'country': _get('country'), 'language': _get('language'), 'media_type': _get('media_type'), 'dimensions': dim_str, 'width': width, 'height': height, 'file_type': _get('file_type').upper(), 'vendor': _get('vendor'), 'placement': _get('placement'), 'unit_description': _get('unit_description'), 'max_file_size': _get('max_file_size'), 'delivery_deadline': _format_date(row_list[col_map['delivery_deadline']] if 'delivery_deadline' in col_map and col_map['delivery_deadline'] < len(row_list) else None), 'media_start_date': _format_date(row_list[col_map['media_start_date']] if 'media_start_date' in col_map and col_map['media_start_date'] < len(row_list) else None), 'media_end_date': _format_date(row_list[col_map['media_end_date']] if 'media_end_date' in col_map and col_map['media_end_date'] < len(row_list) else None), } all_assets.append(asset) channel_count += 1 if channel_count > 0: channel_counts[sheet_name] = channel_count wb.close() return { 'assets': all_assets, 'total_assets': len(all_assets), 'channels': channel_counts, 'parsed_at': datetime.now().isoformat(), } def find_matching_asset(filename: str, media_plan_data: Dict) -> Optional[Dict]: """ Find the best matching asset in the media plan for a given filename. Args: filename: Uploaded file's name (e.g., 'XCER25IE001_Azzurri_Live1_IE-1-1_RTE_970x250_08-07-2025.jpg') media_plan_data: Parsed media plan dict with 'assets' list Returns: Dict with 'match', 'confidence', and matched asset data, or None """ if not filename or not media_plan_data or not media_plan_data.get('assets'): return None # Strip extension from filename for matching name_no_ext = os.path.splitext(filename)[0].strip() assets = media_plan_data['assets'] # Try exact match first for asset in assets: if asset['asset_id'] == name_no_ext: return {'match': asset, 'confidence': 1.0, 'match_type': 'exact'} # Try case-insensitive exact match name_lower = name_no_ext.lower() for asset in assets: if asset['asset_id'].lower() == name_lower: return {'match': asset, 'confidence': 0.98, 'match_type': 'case_insensitive'} # Try starts-with match (filename might have extra suffixes) for asset in assets: if name_lower.startswith(asset['asset_id'].lower()): return {'match': asset, 'confidence': 0.90, 'match_type': 'starts_with'} # Try contains match (asset_id contained in filename) for asset in assets: if asset['asset_id'].lower() in name_lower: return {'match': asset, 'confidence': 0.85, 'match_type': 'contains'} # Fuzzy match as last resort asset_ids = [a['asset_id'] for a in assets] matches = difflib.get_close_matches(name_no_ext, asset_ids, n=1, cutoff=0.7) if matches: matched_id = matches[0] for asset in assets: if asset['asset_id'] == matched_id: ratio = difflib.SequenceMatcher(None, name_no_ext.lower(), matched_id.lower()).ratio() return {'match': asset, 'confidence': round(ratio, 2), 'match_type': 'fuzzy'} return None def get_asset_dimensions(file_path: str) -> Tuple[Optional[int], Optional[int]]: """ Get the actual dimensions of an image or PDF file. Returns: (width, height) tuple, or (None, None) if unable to determine """ try: ext = os.path.splitext(file_path)[1].lower() if ext == '.pdf': doc = fitz.open(file_path) if doc.page_count > 0: page = doc.load_page(0) rect = page.rect doc.close() return int(rect.width), int(rect.height) doc.close() else: img = Image.open(file_path) w, h = img.size img.close() return w, h except Exception as e: print(f"Error getting dimensions for {file_path}: {e}") return None, None def validate_asset_specs(file_path: str, matched_spec: Dict) -> Dict: """ Validate an asset's dimensions and file type against the media plan spec. Args: file_path: Path to the uploaded asset file matched_spec: The matched asset entry from the media plan Returns: Validation results dict """ actual_w, actual_h = get_asset_dimensions(file_path) actual_ext = os.path.splitext(file_path)[1].lower().lstrip('.') expected_w = matched_spec.get('width') expected_h = matched_spec.get('height') expected_type = matched_spec.get('file_type', '').lower() # Dimensions check dims_match = None if actual_w and actual_h and expected_w and expected_h: dims_match = (actual_w == expected_w and actual_h == expected_h) # File type check type_match = None if expected_type and actual_ext: # Normalize common equivalences type_map = {'jpeg': 'jpg', 'tif': 'tiff'} norm_actual = type_map.get(actual_ext, actual_ext) norm_expected = type_map.get(expected_type, expected_type) type_match = (norm_actual == norm_expected) return { 'dimensions_match': dims_match, 'expected_dimensions': matched_spec.get('dimensions', ''), 'actual_dimensions': f"{actual_w}x{actual_h}" if actual_w and actual_h else 'unknown', 'actual_width': actual_w, 'actual_height': actual_h, 'file_type_match': type_match, 'expected_file_type': matched_spec.get('file_type', ''), 'actual_file_type': actual_ext.upper(), } def build_media_plan_context(matched_spec: Dict) -> str: """ Build a text context string from a matched media plan entry for inclusion in QC check prompts. """ lines = [ "\n=== MEDIA PLAN CONTEXT ===", "This asset is specified in the media plan as:", f"- Asset ID: {matched_spec.get('asset_id', '')}", ] if matched_spec.get('country'): lines.append(f"- Country: {matched_spec['country']}") if matched_spec.get('language'): lines.append(f"- Language: {matched_spec['language']}") if matched_spec.get('campaign_phase'): lines.append(f"- Campaign Phase: {matched_spec['campaign_phase']}") if matched_spec.get('creative_name'): lines.append(f"- Creative: {matched_spec['creative_name']}") if matched_spec.get('channel'): lines.append(f"- Channel: {matched_spec['channel']}") if matched_spec.get('vendor'): lines.append(f"- Media Vendor: {matched_spec['vendor']}") if matched_spec.get('placement'): lines.append(f"- Placement: {matched_spec['placement']}") if matched_spec.get('dimensions'): lines.append(f"- Expected Dimensions: {matched_spec['dimensions']}") if matched_spec.get('file_type'): lines.append(f"- Expected File Type: {matched_spec['file_type']}") lines.append("Please verify the asset content is appropriate for this placement and market.") lines.append("=== END MEDIA PLAN CONTEXT ===\n") return "\n".join(lines)