""" Pricing Reference Parser. Parses a pricing reference document (PDF or Excel) into a structured lookup: { "_format": { "en-US": {"currency_code": "USD", "symbol": "$", "position": "before", "decimal_separator": ".", "thousands_separator": ",", "format_example": "$49.99", "country": "United States"}, ... }, "_prices": [ {"product_id": "1334912002", "language": "en-US", "country": "US", "price": "49.99", "currency": "USD", "formatted": "$49.99", "product_name": "Top"}, ... ] } Excel parsing is deterministic (openpyxl) and prefers a flat "MPC Prices" sheet. Regional sheets provide currency format enrichment. PDF parsing falls back to LlamaParse + LLM and returns _format only (no _prices). Legacy flat shape {"": {...}} is still accepted downstream and treated as _format by PricingReference.get_format_map(). """ import os import re import json import logging from datetime import datetime logger = logging.getLogger(__name__) # ---------- format-from-formatted-price helpers ---------- _NUMBER_RE = re.compile(r'[\d][\d\s.,]*\d|\d') _SKIP_VALUES = { 'PRICE NOT PRESENT IN REPORT', 'PRODUCT NOT PRESENT IN REPORT', '#N/A', 'N/A', } def _is_skip(v) -> bool: if v is None: return True s = str(v).strip() if not s: return True return s in _SKIP_VALUES def _derive_format(formatted: str, currency_code: str = None) -> dict: """Extract symbol, position, and separators from a formatted price string. Examples: "$49.99" -> symbol='$', position='before', decimal='.', thousands=',' "39,99 €" -> symbol='€', position='after', decimal=',', thousands='.' "CHF 49.95" -> symbol='CHF', position='before', decimal='.', thousands=',' "349,-" -> symbol='-', position='after', decimal=',', thousands='.' """ out = {'format_example': formatted} if not formatted: return out s = str(formatted).strip() m = _NUMBER_RE.search(s) if not m: return out before = s[:m.start()].strip() after = s[m.end():].strip() number_span = m.group() if before and not after: position = 'before' symbol = before elif after and not before: position = 'after' symbol = after elif before and after: # Rare: pick the shorter side as the currency symbol if len(before) <= len(after): position, symbol = 'before', before else: position, symbol = 'after', after else: position = 'before' symbol = currency_code or '' # Separator detection on the number part decimal_sep = '' thousands_sep = '' last_dot = number_span.rfind('.') last_comma = number_span.rfind(',') last_sep_pos = max(last_dot, last_comma) if last_sep_pos != -1: last_sep = number_span[last_sep_pos] tail = number_span[last_sep_pos + 1:].replace(' ', '') if 1 <= len(tail) <= 2 and tail.isdigit(): decimal_sep = last_sep other = '.' if last_sep == ',' else ',' if other in number_span: thousands_sep = other else: thousands_sep = last_sep out.update({ 'symbol': symbol, 'position': position, 'decimal_separator': decimal_sep, 'thousands_separator': thousands_sep, }) if currency_code: out['currency_code'] = currency_code return out def _merge_format(existing: dict, new: dict) -> dict: """Merge two format dicts, preferring existing non-empty fields.""" if not existing: return new merged = dict(existing) for k, v in new.items(): if v and not merged.get(k): merged[k] = v return merged # ---------- Excel parsing ---------- def _parse_mpc_prices(ws) -> list: """Parse the 'MPC Prices' sheet into a flat list of price entries. Expected columns (row 1 header): 0: 'Product id 1' (composite e.g. "1334912002 ar-JO") 1: 'Product id' (int) 2: 'Article id' (int) 3: 'Language' (e.g. "en-US") 4: 'Product name' (localized) 5: 'Country' (2-letter) 6: 'Price' (numeric string) 7: 'Currency' (ISO code) 8: 'Campaign' 9: 'Season' 10: 'Filename' """ prices = [] for row_idx, row in enumerate(ws.iter_rows(min_row=2, values_only=True), 2): # Defensive — tuple may be shorter if trailing empties def col(i): return row[i] if i < len(row) else None language = col(3) price = col(6) currency = col(7) article_id = col(2) if col(2) is not None else col(1) if not language or _is_skip(price) or _is_skip(currency): continue prices.append({ 'product_id': str(article_id) if article_id is not None else '', 'language': str(language).strip(), 'country': str(col(5)).strip() if col(5) else '', 'price': str(price).strip(), 'currency': str(currency).strip(), 'product_name': str(col(4)).strip() if col(4) else '', }) return prices def _parse_regional_sheet_format(ws) -> dict: """Scan a regional sheet for formatted prices and derive per-locale format. Regional sheets use a repeating 6-row block per product: row 0: Product ID in col 4, 'Top' label in col 5 row 1: '' col 4, 'TOP' uppercase in col 5 row 2: 'Data: Price' in col 5, formatted prices in cols 6+ row 3-5: validation markers (ignored) Locale codes are in row 2 cols 6+ (header row). We find each 'Data: Price' row, read its cols 6+, and zip with the header locale. """ locale_by_col = {} header_row = None for row_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=2, values_only=True), 2): header_row = row break if not header_row: return {} for i, v in enumerate(header_row): if v and isinstance(v, str) and re.match(r'^[a-z]{2,3}-[A-Z]{2}$', v.strip()): locale_by_col[i] = v.strip() if not locale_by_col: return {} fmt = {} for row in ws.iter_rows(min_row=3, values_only=True): if len(row) < 6: continue # Detect "Data: Price" marker in col 5 marker = row[5] if len(row) > 5 else None if not marker or 'price' not in str(marker).lower(): continue for col_idx, locale in locale_by_col.items(): if col_idx >= len(row): continue formatted = row[col_idx] if _is_skip(formatted): continue derived = _derive_format(str(formatted)) if derived.get('symbol'): fmt[locale] = _merge_format(fmt.get(locale), derived) return fmt def parse_pricing_excel_to_dict(xlsx_path: str): """Parse a pricing Excel workbook into the {_format, _prices} structure. Returns: tuple (data: dict, description: str) """ import openpyxl if not os.path.isfile(xlsx_path): raise FileNotFoundError(f"Excel file not found at {xlsx_path}") logger.info(f"Parsing pricing Excel: {xlsx_path}") wb = openpyxl.load_workbook(xlsx_path, data_only=True, read_only=True) prices = [] format_map = {} sheets_used = [] sheets_skipped = [] for sheet_name in wb.sheetnames: name_upper = sheet_name.upper() if 'OLD' in name_upper or 'COPY' in name_upper: sheets_skipped.append(sheet_name) continue ws = wb[sheet_name] if sheet_name.strip().lower() == 'mpc prices': try: sheet_prices = _parse_mpc_prices(ws) prices.extend(sheet_prices) sheets_used.append(f"{sheet_name} ({len(sheet_prices)} rows)") logger.info(f"MPC Prices: extracted {len(sheet_prices)} price entries") except Exception as e: logger.warning(f"Failed to parse MPC Prices sheet: {e}") continue # Any other sheet with a row-2 header of locale codes is a candidate # for format enrichment. try: sheet_format = _parse_regional_sheet_format(ws) if sheet_format: for locale, fmt in sheet_format.items(): format_map[locale] = _merge_format(format_map.get(locale), fmt) sheets_used.append(f"{sheet_name} ({len(sheet_format)} locales)") except Exception as e: logger.warning(f"Failed to parse sheet {sheet_name}: {e}") # Enrich _format with currency codes derived from MPC Prices lang_to_currency = {} for entry in prices: lang = entry['language'] cur = entry['currency'] if lang and cur and lang not in lang_to_currency: lang_to_currency[lang] = cur for lang, cur in lang_to_currency.items(): format_map[lang] = _merge_format(format_map.get(lang), {'currency_code': cur}) # 2-letter country code aliases (for file-naming conventions that use only country) for lang, fmt in list(format_map.items()): if '-' in lang: country = lang.split('-')[-1] if country and country not in format_map: format_map[country] = fmt data = {'_format': format_map, '_prices': prices} description_parts = [ f"Pricing Excel parsed from {os.path.basename(xlsx_path)}:", f" {len(prices)} price entries", f" {len(format_map)} locale formats", f" Sheets used: {', '.join(sheets_used) if sheets_used else 'none'}", ] if sheets_skipped: description_parts.append(f" Sheets skipped: {', '.join(sheets_skipped)}") description = "\n".join(description_parts) logger.info(description.replace('\n', ' | ')) return data, description # ---------- PDF parsing (legacy, returns _format only) ---------- def parse_pricing_pdf_to_dict(pdf_path: str): """Parse a pricing reference PDF into a {_format: {...}} structure. Uses LlamaParse to extract text and an LLM to structure it. No _prices are produced from PDFs — they are format-reference only. Returns: tuple (data: dict, extracted_text: str) """ if not os.path.isfile(pdf_path): raise FileNotFoundError(f"Pricing PDF not found at {pdf_path}") logger.info(f"Parsing pricing PDF: {pdf_path}") import nest_asyncio nest_asyncio.apply() from llama_parse import LlamaParse from core.services.llm_config import LLMConfig parser = LlamaParse( result_type="text", add_page_breaks=False, parsing_instruction=( "Extract all text from this pricing reference document. " "Pay special attention to country names, country codes, " "language codes, currency symbols, currency codes, and price formats." ), premium_mode=False, ) documents = parser.load_data(pdf_path) if not documents: raise RuntimeError("No text extracted from pricing PDF") extracted_text = "\n".join(doc.text for doc in documents) logger.info(f"Extracted {len(extracted_text)} chars from pricing PDF") prompt = f"""Parse this pricing reference document and extract a structured lookup table mapping each country/region language code to its CURRENCY FORMAT information. IMPORTANT: This document is a FORMAT REFERENCE ONLY. Ignore the actual price values — they are outdated. We only need to know: - What currency symbol each country uses - Whether the symbol goes before or after the price - What decimal and thousands separators are used Use language-country code format (e.g., "en-GB", "de-DE", "es-ES", "el-GR", "tr-TR", "bg-BG") Also include the 2-letter country code as an alternate key (e.g., "GB", "DE", "ES", "BG") "position" should be "before" if symbol comes before the price (e.g., $29.99) or "after" if symbol comes after the price (e.g., 29,99 EUR or 97,99 лв.) Return ONLY valid JSON (no markdown fences, no explanation) in this exact format: {{ "en-GB": {{ "country": "United Kingdom", "currency_code": "GBP", "symbol": "\\u00a3", "position": "before", "decimal_separator": ".", "thousands_separator": ",", "format_example": "\\u00a329.99" }} }} Include ALL countries/regions mentioned in the document. DOCUMENT TEXT: --- {extracted_text} ---""" client = LLMConfig.get_client('openai', 'gpt-4o') api_response = client.chat.completions.create( model='gpt-4o', messages=[{"role": "user", "content": prompt}], max_tokens=8192 ) response_text = api_response.choices[0].message.content or '' try: from core.models.usage_log import UsageLog usage = api_response.usage if api_response.usage else None UsageLog.log_call( provider='openai', model='gpt-4o', input_tokens=getattr(usage, 'prompt_tokens', None), output_tokens=getattr(usage, 'completion_tokens', None), tokens=getattr(usage, 'total_tokens', None), module='campaigns', check_name='pricing_parser', success=True ) except Exception as log_err: logger.warning(f"Failed to log usage: {log_err}") try: text = response_text.strip() if text.startswith('```'): text = text.split('\n', 1)[1] if '\n' in text else text[3:] if text.endswith('```'): text = text[:-3] text = text.strip() format_map = json.loads(text) except json.JSONDecodeError as e: logger.error(f"Failed to parse LLM response as JSON: {e}") raise RuntimeError(f"LLM response was not valid JSON: {e}") data = {'_format': format_map, '_prices': []} logger.info(f"Pricing PDF parsed: {len(format_map)} format entries") return data, extracted_text # ---------- Entry point used by the route ---------- def parse_pricing_reference(pricing_reference_id: int) -> bool: """Parse the PricingReference row's file (PDF or Excel), populate parsed_data_json and parsed_content, set status ready/error.""" from core.models.database import db from core.models.pricing_reference import PricingReference ref = PricingReference.query.get(pricing_reference_id) if not ref: logger.error(f"PricingReference {pricing_reference_id} not found") return False try: ref.status = 'parsing' db.session.commit() path = ref.pdf_path ext = os.path.splitext(path)[1].lower() if ext in ('.xlsx', '.xls'): data, description = parse_pricing_excel_to_dict(path) else: data, description = parse_pricing_pdf_to_dict(path) ref.parsed_data_json = json.dumps(data, ensure_ascii=False) ref.parsed_content = description ref.parsed_at = datetime.utcnow() ref.status = 'ready' ref.error_message = None db.session.commit() fmt_count = len(data.get('_format', {})) prices_count = len(data.get('_prices', [])) logger.info( f"PricingReference {ref.id} ready: " f"{fmt_count} formats, {prices_count} prices" ) return True except Exception as e: logger.error(f"Failed to parse PricingReference {pricing_reference_id}: {e}", exc_info=True) try: ref.status = 'error' ref.error_message = str(e)[:500] db.session.commit() except Exception: db.session.rollback() return False