ai_qc/backend/localization_processor.py
nickviljoen 1315605576 Fix localization matrix parser: wrong dates for Message B
- Urgency check now runs before dates check in label classifier
  ("urgency messaging (to replace dates)" was matching as "dates")
- Handle unlabeled dates rows (empty column A) by inferring from position
  after tagline rows
- Stop parsing at "Print Day 4" sub-section boundary to prevent
  overwriting main message data

Message B / DE now correctly returns "8. - 11. Juli" instead of
"Nur noch heute" (urgency text from Print Day 4 section).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 21:47:47 +02:00

343 lines
11 KiB
Python

"""
Localization Matrix Processor
Parses Excel localization matrices (e.g., Amazon Prime Day DOOH/Print)
into structured JSON for cross-referencing with media plan data during QC analysis.
The matrix contains expected copy (headline, dates, logo, legal) per message type
(Message A, Message B) and country.
"""
import os
import re
from datetime import datetime
from typing import Dict, Optional
try:
import openpyxl
except ImportError:
openpyxl = None
def parse_localization_matrix(excel_path: str) -> Optional[Dict]:
"""
Parse a localization matrix Excel file into structured JSON.
Expected structure:
- Row with "MESSAGE A" in column A starts Message A section
- Row with "MESSAGE B" in column A starts Message B section
- Country codes in row below the message header (B=first country, C=second, etc.)
- Content rows: Tagline (2/3/4/5 lines), Dates, Logo, CTA, Legal Disclaimer
Returns structured dict or None if the file doesn't look like a localization matrix.
"""
if openpyxl is None:
print("openpyxl not installed, cannot parse localization matrix")
return None
try:
wb = openpyxl.load_workbook(excel_path, data_only=True)
except Exception as e:
print(f"Failed to open localization matrix: {e}")
return None
result = {
'messages': {},
'countries': [],
'source_file': os.path.basename(excel_path),
'parsed_at': datetime.now().isoformat(),
}
# Try each sheet
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
parsed = _parse_sheet(ws)
if parsed and parsed.get('messages'):
result['messages'].update(parsed['messages'])
# Merge countries
for c in parsed.get('countries', []):
if c not in result['countries']:
result['countries'].append(c)
if not result['messages']:
print(f"No message sections found in {excel_path} — not a localization matrix")
return None
result['sheet_names'] = wb.sheetnames
return result
def _parse_sheet(ws) -> Optional[Dict]:
"""Parse a single worksheet for message sections."""
messages = {}
countries = []
# Read all rows into memory
all_rows = []
for row in ws.iter_rows(min_row=1, max_row=ws.max_row, values_only=False):
all_rows.append(row)
# Find message sections by scanning column A
message_starts = []
for idx, row in enumerate(all_rows):
a_val = str(row[0].value or '').strip().upper()
if 'MESSAGE A' in a_val:
message_starts.append((idx, 'Message A'))
elif 'MESSAGE B' in a_val:
message_starts.append((idx, 'Message B'))
if not message_starts:
return None
# Extract country codes from message header rows
# Countries are in columns B onwards in the same row as the message header
first_msg_row = all_rows[message_starts[0][0]]
for cell in first_msg_row[1:]: # Skip column A
val = str(cell.value or '').strip()
if val and val not in ('', 'None'):
countries.append(val)
if not countries:
return None
# Parse each message section
for i, (start_idx, msg_name) in enumerate(message_starts):
# Determine end of this section (next message start or end of sheet)
if i + 1 < len(message_starts):
end_idx = message_starts[i + 1][0]
else:
end_idx = len(all_rows)
msg_data = _parse_message_section(all_rows, start_idx, end_idx, countries)
if msg_data:
messages[msg_name] = msg_data
return {'messages': messages, 'countries': countries}
def _parse_message_section(all_rows, start_idx, end_idx, countries) -> Dict:
"""Parse a single message section (Message A or Message B)."""
msg_data = {}
last_field_type = None # Track what the previous row was
for idx in range(start_idx + 1, end_idx):
if idx >= len(all_rows):
break
row = all_rows[idx]
label = str(row[0].value or '').strip()
label_lower = label.lower()
if not label:
# Handle rows with empty column A — these may be unlabeled dates rows
# that follow tagline rows. Check if prior row was a tagline and this row has data.
if last_field_type and last_field_type.startswith('tagline'):
# Check if this row has values in the country columns
has_values = any(
row[col_idx + 1].value is not None
for col_idx in range(min(len(countries), len(row) - 1))
)
if has_values:
field_name = 'dates'
else:
continue
else:
continue
else:
# Skip sub-section headers (e.g. "Print Day 4") — stop parsing here
# to avoid overwriting main message data with sub-section data
if 'print day' in label_lower or ('print' in label_lower and 'day' in label_lower):
break
# Determine the field type from the label
field_name = _classify_row_label(label_lower)
if not field_name:
continue
last_field_type = field_name
# Extract values per country
for col_idx, country in enumerate(countries):
cell_idx = col_idx + 1 # +1 because column A is the label
if cell_idx < len(row):
val = row[cell_idx].value
if val is not None:
val = str(val).strip()
# Clean up non-breaking spaces
val = val.replace('\xa0', ' ').strip()
if val and val != '-':
if country not in msg_data:
msg_data[country] = {}
msg_data[country][field_name] = val
return msg_data
def _classify_row_label(label: str) -> Optional[str]:
"""Classify a row label into a standardized field name."""
label_lower = label.lower()
if 'tagline' in label_lower or 'headline' in label_lower:
if '2 line' in label_lower:
return 'tagline_2_lines'
elif '3 line' in label_lower:
return 'tagline_3_lines'
elif '4 line' in label_lower:
return 'tagline_4_lines'
elif '5 line' in label_lower:
return 'tagline_5_lines'
else:
return 'tagline'
# Check urgency BEFORE dates — "urgency messaging (to replace dates)" contains "date"
elif 'urgency' in label_lower:
return 'urgency_messaging'
elif label_lower.startswith('date') or label_lower == 'dates':
return 'dates'
elif 'logo' in label_lower:
return 'logo'
elif 'cta' in label_lower:
return 'cta'
elif 'legal' in label_lower or 'disclaimer' in label_lower:
return 'legal_disclaimer'
# Skip section headers like "Print Day 4 (...)"
elif 'print day' in label_lower or 'print only' in label_lower:
return None
elif 'event' in label_lower:
return None # Skip event name row
else:
return None
def build_localization_context(parsed_matrix: Dict, message_type: str, country: str) -> str:
"""
Build a context string for QC prompt injection from the localization matrix.
Args:
parsed_matrix: Parsed localization matrix data (from parse_localization_matrix)
message_type: "Message A" or "Message B" (from media plan creative_name)
country: Country code e.g. "IE", "DE", "IT" (from media plan)
Returns:
Context string to inject into QC prompts, or empty string if no match.
"""
if not parsed_matrix or not message_type or not country:
return ""
messages = parsed_matrix.get('messages', {})
# Normalize message type — handle variations like "message a", "Message A", "MSG A"
msg_key = None
msg_type_upper = message_type.strip().upper()
for key in messages:
if key.upper() == msg_type_upper:
msg_key = key
break
# Also try partial match: "Message A" in "Live1_Message A" etc.
if not msg_key:
for key in messages:
if key.upper() in msg_type_upper or msg_type_upper in key.upper():
msg_key = key
break
if not msg_key:
return ""
msg_data = messages[msg_key]
# Find country data — try exact match, then case-insensitive
country_data = None
country_upper = country.strip().upper()
for c_key, c_data in msg_data.items():
if c_key.upper() == country_upper:
country_data = c_data
break
if not country_data:
return ""
# Build the context string
lines = [
"\n=== LOCALIZATION MATRIX — EXPECTED COPY ===",
f"Message Type: {msg_key}",
f"Country/Market: {country}",
"",
"The following is the EXPECTED copy for this asset based on the localization matrix.",
"Verify the asset contains the correct text matching one of these variants:",
"",
]
# Add tagline variants
tagline_fields = [
('tagline_2_lines', 'Expected Headline (2-line layout)'),
('tagline_3_lines', 'Expected Headline (3-line layout)'),
('tagline_4_lines', 'Expected Headline (4-line layout)'),
('tagline_5_lines', 'Expected Headline (5-line layout)'),
('tagline', 'Expected Headline'),
]
has_tagline = False
for field, label in tagline_fields:
val = country_data.get(field)
if val:
# Show line breaks clearly
display_val = val.replace('\n', ' / ')
lines.append(f"{label}: {display_val}")
has_tagline = True
if has_tagline:
lines.append("")
# Add dates
dates_val = country_data.get('dates')
if dates_val:
lines.append(f"Expected Dates: {dates_val}")
# Add logo
logo_val = country_data.get('logo')
if logo_val:
lines.append(f"Expected Logo: {logo_val}")
# Add CTA
cta_val = country_data.get('cta')
if cta_val:
lines.append(f"Expected CTA: {cta_val}")
# Add legal
legal_val = country_data.get('legal_disclaimer')
if legal_val:
lines.append(f"Expected Legal Disclaimer: {legal_val}")
# Add urgency messaging if present
urgency_val = country_data.get('urgency_messaging')
if urgency_val:
lines.append(f"Urgency Messaging (alternative to dates): {urgency_val}")
lines.append("")
lines.append("IMPORTANT: Cross-check the text visible in the asset against the expected copy above.")
lines.append("The headline should match one of the tagline variants for this market and message type.")
lines.append("Flag any text that doesn't match — wrong language, wrong message version, missing elements, or incorrect copy.")
lines.append("=== END LOCALIZATION MATRIX ===\n")
return "\n".join(lines)
def detect_message_type(creative_name: str) -> Optional[str]:
"""
Detect if a creative_name from the media plan maps to a message type.
Returns "Message A", "Message B", etc., or None if no match.
"""
if not creative_name:
return None
name_upper = creative_name.strip().upper()
if 'MESSAGE A' in name_upper:
return 'Message A'
elif 'MESSAGE B' in name_upper:
return 'Message B'
elif 'MESSAGE C' in name_upper:
return 'Message C'
else:
return None