hm_ai_qc_report_tool/modules/campaigns/pricing_parser.py
nickviljoen 39383db95f Pricing refs: Excel support, structured lookup, deterministic price match, video price check
A. Excel upload — /campaigns/pricing/upload now accepts .xlsx/.xls
   alongside .pdf. File picker in the campaigns UI matches.

B. Deterministic Excel parser (openpyxl, no LLM) — looks for H&M-style
   mastersheets:
     - 'MPC Prices' sheet -> flat list of {product_id, language, country,
       price, currency, product_name} entries (this is the gold mine).
     - Regional sheets (AME/CEU/EEU/...) -> formatted prices per locale
       used to derive currency symbol, position, decimal/thousands
       separators. Skips OLD/COPY sheets.
   Verified against the attached 1013A mastersheet: 448 price entries
   across 7 products x 74 locales, 139 locale format entries.

   Parser lives in modules/campaigns/pricing_parser.py alongside the
   existing PDF path (which now also returns the structured form with
   empty _prices).

   New lookup shape stored in PricingReference.parsed_data_json:
     {"_format": {"en-US": {currency_code, symbol, position, ...}, ...},
      "_prices": [{product_id, language, country, price, currency,
                   product_name}, ...]}
   Legacy flat {"<code>": {...}} is still recognised (treated as _format
   only) for backwards compatibility with the legacy global JSON import.

   Model helpers added:
     - PricingReference.get_format_map()
     - PricingReference.get_prices()
   to_dict() now reports price_count alongside entry_count.

C. Upgraded price_currency_check.py — when a pricing reference with
   _prices is attached, the check runs a deterministic comparison:
   detected price(s) -> normalize (_normalize_price handles '$49.99',
   '39,99 €', 'CHF 49.95', '1.234,56', 'Rs. 2,799', '13 995 Ft', '349,-',
   '0.999.000'...) -> compare with tol=0.005 against the expected
   per-locale rows. LLM-based campaign-sheet fallback only runs if no
   _prices are present (legacy PDF reference or has_pricing campaign
   presentation).

D. Video QC price check — new _run_price_check step in the executor.
   Parses filename (Market_lang_CampaignNum_... -> 'lang-Market' locale),
   detects prices across frames via the same Gemini/GPT-4o path the
   other checks use, then deterministic-validates against the attached
   pricing reference. Skipped if no pricing ref, unknown locale, GEN/CEN
   markets, or no price visible in video.

   Overall video score now uses weighted mean of active (non-skipped)
   checks (visual_quality w=50, censorship w=50, price_currency w=30)
   instead of the hardcoded 50/50 split — so skipping any one check
   falls through cleanly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 10:52:39 +02:00

468 lines
15 KiB
Python

"""
Pricing Reference Parser.
Parses a pricing reference document (PDF or Excel) into a structured lookup:
{
"_format": {
"en-US": {"currency_code": "USD", "symbol": "$", "position": "before",
"decimal_separator": ".", "thousands_separator": ",",
"format_example": "$49.99", "country": "United States"},
...
},
"_prices": [
{"product_id": "1334912002", "language": "en-US", "country": "US",
"price": "49.99", "currency": "USD", "formatted": "$49.99",
"product_name": "Top"},
...
]
}
Excel parsing is deterministic (openpyxl) and prefers a flat "MPC Prices"
sheet. Regional sheets provide currency format enrichment. PDF parsing
falls back to LlamaParse + LLM and returns _format only (no _prices).
Legacy flat shape {"<code>": {...}} is still accepted downstream and
treated as _format by PricingReference.get_format_map().
"""
import os
import re
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
# ---------- format-from-formatted-price helpers ----------
_NUMBER_RE = re.compile(r'[\d][\d\s.,]*\d|\d')
_SKIP_VALUES = {
'PRICE NOT PRESENT IN REPORT',
'PRODUCT NOT PRESENT IN REPORT',
'#N/A',
'N/A',
}
def _is_skip(v) -> bool:
if v is None:
return True
s = str(v).strip()
if not s:
return True
return s in _SKIP_VALUES
def _derive_format(formatted: str, currency_code: str = None) -> dict:
"""Extract symbol, position, and separators from a formatted price string.
Examples:
"$49.99" -> symbol='$', position='before', decimal='.', thousands=','
"39,99 €" -> symbol='', position='after', decimal=',', thousands='.'
"CHF 49.95" -> symbol='CHF', position='before', decimal='.', thousands=','
"349,-" -> symbol='-', position='after', decimal=',', thousands='.'
"""
out = {'format_example': formatted}
if not formatted:
return out
s = str(formatted).strip()
m = _NUMBER_RE.search(s)
if not m:
return out
before = s[:m.start()].strip()
after = s[m.end():].strip()
number_span = m.group()
if before and not after:
position = 'before'
symbol = before
elif after and not before:
position = 'after'
symbol = after
elif before and after:
# Rare: pick the shorter side as the currency symbol
if len(before) <= len(after):
position, symbol = 'before', before
else:
position, symbol = 'after', after
else:
position = 'before'
symbol = currency_code or ''
# Separator detection on the number part
decimal_sep = ''
thousands_sep = ''
last_dot = number_span.rfind('.')
last_comma = number_span.rfind(',')
last_sep_pos = max(last_dot, last_comma)
if last_sep_pos != -1:
last_sep = number_span[last_sep_pos]
tail = number_span[last_sep_pos + 1:].replace(' ', '')
if 1 <= len(tail) <= 2 and tail.isdigit():
decimal_sep = last_sep
other = '.' if last_sep == ',' else ','
if other in number_span:
thousands_sep = other
else:
thousands_sep = last_sep
out.update({
'symbol': symbol,
'position': position,
'decimal_separator': decimal_sep,
'thousands_separator': thousands_sep,
})
if currency_code:
out['currency_code'] = currency_code
return out
def _merge_format(existing: dict, new: dict) -> dict:
"""Merge two format dicts, preferring existing non-empty fields."""
if not existing:
return new
merged = dict(existing)
for k, v in new.items():
if v and not merged.get(k):
merged[k] = v
return merged
# ---------- Excel parsing ----------
def _parse_mpc_prices(ws) -> list:
"""Parse the 'MPC Prices' sheet into a flat list of price entries.
Expected columns (row 1 header):
0: 'Product id 1' (composite e.g. "1334912002 ar-JO")
1: 'Product id' (int)
2: 'Article id' (int)
3: 'Language' (e.g. "en-US")
4: 'Product name' (localized)
5: 'Country' (2-letter)
6: 'Price' (numeric string)
7: 'Currency' (ISO code)
8: 'Campaign'
9: 'Season'
10: 'Filename'
"""
prices = []
for row_idx, row in enumerate(ws.iter_rows(min_row=2, values_only=True), 2):
# Defensive — tuple may be shorter if trailing empties
def col(i):
return row[i] if i < len(row) else None
language = col(3)
price = col(6)
currency = col(7)
article_id = col(2) if col(2) is not None else col(1)
if not language or _is_skip(price) or _is_skip(currency):
continue
prices.append({
'product_id': str(article_id) if article_id is not None else '',
'language': str(language).strip(),
'country': str(col(5)).strip() if col(5) else '',
'price': str(price).strip(),
'currency': str(currency).strip(),
'product_name': str(col(4)).strip() if col(4) else '',
})
return prices
def _parse_regional_sheet_format(ws) -> dict:
"""Scan a regional sheet for formatted prices and derive per-locale format.
Regional sheets use a repeating 6-row block per product:
row 0: Product ID in col 4, 'Top' label in col 5
row 1: '' col 4, 'TOP' uppercase in col 5
row 2: 'Data: Price' in col 5, formatted prices in cols 6+
row 3-5: validation markers (ignored)
Locale codes are in row 2 cols 6+ (header row). We find each
'Data: Price' row, read its cols 6+, and zip with the header locale.
"""
locale_by_col = {}
header_row = None
for row_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=2, values_only=True), 2):
header_row = row
break
if not header_row:
return {}
for i, v in enumerate(header_row):
if v and isinstance(v, str) and re.match(r'^[a-z]{2,3}-[A-Z]{2}$', v.strip()):
locale_by_col[i] = v.strip()
if not locale_by_col:
return {}
fmt = {}
for row in ws.iter_rows(min_row=3, values_only=True):
if len(row) < 6:
continue
# Detect "Data: Price" marker in col 5
marker = row[5] if len(row) > 5 else None
if not marker or 'price' not in str(marker).lower():
continue
for col_idx, locale in locale_by_col.items():
if col_idx >= len(row):
continue
formatted = row[col_idx]
if _is_skip(formatted):
continue
derived = _derive_format(str(formatted))
if derived.get('symbol'):
fmt[locale] = _merge_format(fmt.get(locale), derived)
return fmt
def parse_pricing_excel_to_dict(xlsx_path: str):
"""Parse a pricing Excel workbook into the {_format, _prices} structure.
Returns:
tuple (data: dict, description: str)
"""
import openpyxl
if not os.path.isfile(xlsx_path):
raise FileNotFoundError(f"Excel file not found at {xlsx_path}")
logger.info(f"Parsing pricing Excel: {xlsx_path}")
wb = openpyxl.load_workbook(xlsx_path, data_only=True, read_only=True)
prices = []
format_map = {}
sheets_used = []
sheets_skipped = []
for sheet_name in wb.sheetnames:
name_upper = sheet_name.upper()
if 'OLD' in name_upper or 'COPY' in name_upper:
sheets_skipped.append(sheet_name)
continue
ws = wb[sheet_name]
if sheet_name.strip().lower() == 'mpc prices':
try:
sheet_prices = _parse_mpc_prices(ws)
prices.extend(sheet_prices)
sheets_used.append(f"{sheet_name} ({len(sheet_prices)} rows)")
logger.info(f"MPC Prices: extracted {len(sheet_prices)} price entries")
except Exception as e:
logger.warning(f"Failed to parse MPC Prices sheet: {e}")
continue
# Any other sheet with a row-2 header of locale codes is a candidate
# for format enrichment.
try:
sheet_format = _parse_regional_sheet_format(ws)
if sheet_format:
for locale, fmt in sheet_format.items():
format_map[locale] = _merge_format(format_map.get(locale), fmt)
sheets_used.append(f"{sheet_name} ({len(sheet_format)} locales)")
except Exception as e:
logger.warning(f"Failed to parse sheet {sheet_name}: {e}")
# Enrich _format with currency codes derived from MPC Prices
lang_to_currency = {}
for entry in prices:
lang = entry['language']
cur = entry['currency']
if lang and cur and lang not in lang_to_currency:
lang_to_currency[lang] = cur
for lang, cur in lang_to_currency.items():
format_map[lang] = _merge_format(format_map.get(lang), {'currency_code': cur})
# 2-letter country code aliases (for file-naming conventions that use only country)
for lang, fmt in list(format_map.items()):
if '-' in lang:
country = lang.split('-')[-1]
if country and country not in format_map:
format_map[country] = fmt
data = {'_format': format_map, '_prices': prices}
description_parts = [
f"Pricing Excel parsed from {os.path.basename(xlsx_path)}:",
f" {len(prices)} price entries",
f" {len(format_map)} locale formats",
f" Sheets used: {', '.join(sheets_used) if sheets_used else 'none'}",
]
if sheets_skipped:
description_parts.append(f" Sheets skipped: {', '.join(sheets_skipped)}")
description = "\n".join(description_parts)
logger.info(description.replace('\n', ' | '))
return data, description
# ---------- PDF parsing (legacy, returns _format only) ----------
def parse_pricing_pdf_to_dict(pdf_path: str):
"""Parse a pricing reference PDF into a {_format: {...}} structure.
Uses LlamaParse to extract text and an LLM to structure it. No _prices
are produced from PDFs — they are format-reference only.
Returns:
tuple (data: dict, extracted_text: str)
"""
if not os.path.isfile(pdf_path):
raise FileNotFoundError(f"Pricing PDF not found at {pdf_path}")
logger.info(f"Parsing pricing PDF: {pdf_path}")
import nest_asyncio
nest_asyncio.apply()
from llama_parse import LlamaParse
from core.services.llm_config import LLMConfig
parser = LlamaParse(
result_type="text",
add_page_breaks=False,
parsing_instruction=(
"Extract all text from this pricing reference document. "
"Pay special attention to country names, country codes, "
"language codes, currency symbols, currency codes, and price formats."
),
premium_mode=False,
)
documents = parser.load_data(pdf_path)
if not documents:
raise RuntimeError("No text extracted from pricing PDF")
extracted_text = "\n".join(doc.text for doc in documents)
logger.info(f"Extracted {len(extracted_text)} chars from pricing PDF")
prompt = f"""Parse this pricing reference document and extract a structured lookup table
mapping each country/region language code to its CURRENCY FORMAT information.
IMPORTANT: This document is a FORMAT REFERENCE ONLY. Ignore the actual price values —
they are outdated. We only need to know:
- What currency symbol each country uses
- Whether the symbol goes before or after the price
- What decimal and thousands separators are used
Use language-country code format (e.g., "en-GB", "de-DE", "es-ES", "el-GR", "tr-TR", "bg-BG")
Also include the 2-letter country code as an alternate key (e.g., "GB", "DE", "ES", "BG")
"position" should be "before" if symbol comes before the price (e.g., $29.99)
or "after" if symbol comes after the price (e.g., 29,99 EUR or 97,99 лв.)
Return ONLY valid JSON (no markdown fences, no explanation) in this exact format:
{{
"en-GB": {{
"country": "United Kingdom",
"currency_code": "GBP",
"symbol": "\\u00a3",
"position": "before",
"decimal_separator": ".",
"thousands_separator": ",",
"format_example": "\\u00a329.99"
}}
}}
Include ALL countries/regions mentioned in the document.
DOCUMENT TEXT:
---
{extracted_text}
---"""
client = LLMConfig.get_client('openai', 'gpt-4o')
api_response = client.chat.completions.create(
model='gpt-4o',
messages=[{"role": "user", "content": prompt}],
max_tokens=8192
)
response_text = api_response.choices[0].message.content or ''
try:
from core.models.usage_log import UsageLog
usage = api_response.usage if api_response.usage else None
UsageLog.log_call(
provider='openai',
model='gpt-4o',
input_tokens=getattr(usage, 'prompt_tokens', None),
output_tokens=getattr(usage, 'completion_tokens', None),
tokens=getattr(usage, 'total_tokens', None),
module='campaigns',
check_name='pricing_parser',
success=True
)
except Exception as log_err:
logger.warning(f"Failed to log usage: {log_err}")
try:
text = response_text.strip()
if text.startswith('```'):
text = text.split('\n', 1)[1] if '\n' in text else text[3:]
if text.endswith('```'):
text = text[:-3]
text = text.strip()
format_map = json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse LLM response as JSON: {e}")
raise RuntimeError(f"LLM response was not valid JSON: {e}")
data = {'_format': format_map, '_prices': []}
logger.info(f"Pricing PDF parsed: {len(format_map)} format entries")
return data, extracted_text
# ---------- Entry point used by the route ----------
def parse_pricing_reference(pricing_reference_id: int) -> bool:
"""Parse the PricingReference row's file (PDF or Excel), populate
parsed_data_json and parsed_content, set status ready/error."""
from core.models.database import db
from core.models.pricing_reference import PricingReference
ref = PricingReference.query.get(pricing_reference_id)
if not ref:
logger.error(f"PricingReference {pricing_reference_id} not found")
return False
try:
ref.status = 'parsing'
db.session.commit()
path = ref.pdf_path
ext = os.path.splitext(path)[1].lower()
if ext in ('.xlsx', '.xls'):
data, description = parse_pricing_excel_to_dict(path)
else:
data, description = parse_pricing_pdf_to_dict(path)
ref.parsed_data_json = json.dumps(data, ensure_ascii=False)
ref.parsed_content = description
ref.parsed_at = datetime.utcnow()
ref.status = 'ready'
ref.error_message = None
db.session.commit()
fmt_count = len(data.get('_format', {}))
prices_count = len(data.get('_prices', []))
logger.info(
f"PricingReference {ref.id} ready: "
f"{fmt_count} formats, {prices_count} prices"
)
return True
except Exception as e:
logger.error(f"Failed to parse PricingReference {pricing_reference_id}: {e}", exc_info=True)
try:
ref.status = 'error'
ref.error_message = str(e)[:500]
db.session.commit()
except Exception:
db.session.rollback()
return False