"""Document-scope deterministic checks for AXA policy QC. Each check is a function that takes (ingest_result, scope_args) and returns a result dict. None of them call an LLM — they operate on the structured text + font data already produced by ingest.py, so they're $0 and run in milliseconds. Registered in CHECK_REGISTRY for the dispatcher to look up. Result schema: { 'check_name': str, 'scope': str, 'score': float, # 0-10, used for the overall doc score 'pass': bool, # findings-driven 'summary': str, # one-line headline for the report 'findings': dict, # structured payload (lists, counts, etc.) 'response': str, # human-readable longform for the report } For now, "list-only" checks (font_inventory, phone_inventory) score 10/10 — they're informational. Once approved-list configs land, they'll flip to compliance scoring. """ import json import os import re from collections import Counter, defaultdict from typing import Any, Dict, List, Optional _DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data') def _load_bold_words_seed() -> Dict[str, Any]: path = os.path.join(_DATA_DIR, 'axa_bold_words_seed.json') if not os.path.exists(path): return {'terms': [], 'source': 'missing'} with open(path, 'r', encoding='utf-8') as f: return json.load(f) # ───────────────────────────────────────────────────────────────────────────── # Checks # ───────────────────────────────────────────────────────────────────────────── def axa_font_inventory(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict: """List every unique font found in the document, with per-page distribution. Until AXA provides an approved Monotype font list, this is informational (score 10). When the list arrives, this check becomes axa_font_compliance and flags non-approved fonts. """ font_pages: Dict[str, List[int]] = defaultdict(list) for page in ingest_result.get('pages', []): for font in page.get('fonts_used') or []: font_pages[font].append(page['page_num']) fonts_sorted = sorted(font_pages.keys()) distribution = [ {'font': f, 'page_count': len(font_pages[f]), 'pages': font_pages[f]} for f in fonts_sorted ] summary = f"Found {len(fonts_sorted)} unique fonts across {ingest_result.get('pages_processed', 0)} pages." response_lines = [summary, ''] for d in distribution: response_lines.append(f" • {d['font']} — {d['page_count']} pages") response = '\n'.join(response_lines) return { 'check_name': 'axa_font_inventory', 'scope': 'document', 'score': 10.0, 'pass': True, 'summary': summary, 'findings': { 'unique_fonts': fonts_sorted, 'total_unique': len(fonts_sorted), 'distribution': distribution, }, 'response': response, } def axa_phone_inventory(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict: """Extract every phone number found in the document, deduplicated. Targets common Irish/UK formats: 1800/1850 freephones, +353 international, 01 / 021 etc. landlines. Until an approved list is supplied, this is informational (score 10). Becomes axa_phone_compliance when list lands. """ # Capture: optional + country code, optional spaces/dashes/parens between groups # Conservative regex — Irish/UK shapes with at least 8 total digits. pattern = re.compile( r"\(?\+?\d{1,3}\)?[\s\-.]?(?:\(?\d{2,5}\)?[\s\-.]?){1,3}\d{2,4}" ) raw_finds: Dict[str, List[int]] = defaultdict(list) for page in ingest_result.get('pages', []): text = page.get('raw_text') or '' for match in pattern.finditer(text): number = re.sub(r'[\s\-.()]+', ' ', match.group()).strip() digits = re.sub(r'\D', '', number) # Filter: must have ≥ 8 digits to count as a phone number if len(digits) < 8 or len(digits) > 15: continue raw_finds[number].append(page['page_num']) numbers_sorted = sorted(raw_finds.keys()) distribution = [ {'number': n, 'occurrences': len(raw_finds[n]), 'pages': sorted(set(raw_finds[n]))} for n in numbers_sorted ] summary = f"Found {len(numbers_sorted)} unique phone-like numbers in the document." response_lines = [summary, ''] for d in distribution: response_lines.append(f" • {d['number']} — {d['occurrences']} occurrences (pages {d['pages']})") response = '\n'.join(response_lines) if numbers_sorted else f"{summary}\n(No phone-like numbers detected.)" return { 'check_name': 'axa_phone_inventory', 'scope': 'document', 'score': 10.0, 'pass': True, 'summary': summary, 'findings': { 'unique_numbers': numbers_sorted, 'total_unique': len(numbers_sorted), 'distribution': distribution, }, 'response': response, } def axa_bold_words_definitions(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict: """For each defined term in the seed dictionary, scan all pages: every occurrence outside the definitions section should be rendered bold. Flag any non-bold occurrences — these are the Example-2-class misses (70+ bold definitions that shipped not bolded). """ seed = _load_bold_words_seed() terms = seed.get('terms', []) if not terms: return { 'check_name': 'axa_bold_words_definitions', 'scope': 'document', 'score': 0.0, 'pass': False, 'summary': 'No bold-words seed dictionary found — check disabled.', 'findings': {'error': 'seed_missing'}, 'response': 'Cannot run — backend/document_mode/data/axa_bold_words_seed.json missing or empty.', } # Pre-compile case-insensitive whole-word patterns term_patterns = [ (term, re.compile(r'\b' + re.escape(term) + r'\b', re.IGNORECASE)) for term in terms ] # Pages where the definitions section itself lives — by convention exclude # them from the violation count (the term is defined there, not used). definitions_pages = set( s.get('source_page') for s in seed.get('sources', []) if s.get('source_page') ) if scope_args and scope_args.get('exclude_pages'): definitions_pages.update(scope_args['exclude_pages']) violations: List[Dict] = [] bold_occurrences = 0 non_bold_occurrences = 0 for page in ingest_result.get('pages', []): page_num = page['page_num'] if page_num in definitions_pages: continue spans = page.get('spans') or [] for span in spans: span_text = span.get('text') or '' if not span_text: continue for term, pattern in term_patterns: if pattern.search(span_text): if span.get('bold'): bold_occurrences += 1 else: non_bold_occurrences += 1 violations.append({ 'page': page_num, 'term': term, 'context': span_text, 'font': span.get('font'), 'size': span.get('size'), }) total = bold_occurrences + non_bold_occurrences pass_flag = non_bold_occurrences == 0 if total == 0: score = 10.0 summary = f"No occurrences of {len(terms)} defined terms found outside the definitions section." elif pass_flag: score = 10.0 summary = f"All {bold_occurrences} occurrences of {len(terms)} defined terms are correctly rendered bold." else: ratio = bold_occurrences / total score = round(max(0.0, min(10.0, ratio * 10)), 2) summary = ( f"{non_bold_occurrences} non-bold occurrences of defined terms found " f"(across {len({v['page'] for v in violations})} pages). " f"{bold_occurrences} occurrences correctly bold." ) response_lines = [summary, ''] if violations: response_lines.append('Non-bold violations (first 50 shown):') for v in violations[:50]: ctx = v['context'][:80] + ('…' if len(v['context']) > 80 else '') response_lines.append(f" • Page {v['page']}: '{v['term']}' in: \"{ctx}\"") if len(violations) > 50: response_lines.append(f' ...and {len(violations) - 50} more.') response = '\n'.join(response_lines) return { 'check_name': 'axa_bold_words_definitions', 'scope': 'document', 'score': score, 'pass': pass_flag, 'summary': summary, 'findings': { 'dictionary_size': len(terms), 'definitions_pages_excluded': sorted(definitions_pages), 'bold_occurrences': bold_occurrences, 'non_bold_occurrences': non_bold_occurrences, 'violations': violations, 'pages_with_violations': sorted({v['page'] for v in violations}), }, 'response': response, } def axa_page_numbering(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict: """Verify the document's printed page numbering is continuous (1, 2, 3 …) by parsing the first integer found near the top or bottom of each page's raw text. Catches Example-2's 'missing page' defect. NB: this is heuristic — relies on the page number being a standalone digit on its own line. Skips pages where no candidate is found. """ found_numbers: List[Optional[int]] = [] candidates_by_page: List[Dict] = [] for page in ingest_result.get('pages', []): raw = page.get('raw_text') or '' # Only inspect the first and last 200 chars (where page numbers usually live) head = raw[:200] tail = raw[-200:] if len(raw) > 400 else raw candidate = None # Look for standalone-line integers for chunk in (tail, head): # tail first — footer numbering is more common for line in chunk.splitlines(): line_clean = line.strip() if line_clean.isdigit(): n = int(line_clean) if 0 < n < 1000: candidate = n break if candidate is not None: break found_numbers.append(candidate) candidates_by_page.append({'page_index': page['page_num'], 'detected_number': candidate}) # Walk the sequence: expect each detected number to equal previous + 1 issues = [] expected = None for entry in candidates_by_page: n = entry['detected_number'] if n is None: continue # skip pages with no detectable number if expected is not None and n != expected: issues.append({ 'page_index': entry['page_index'], 'expected': expected, 'detected': n, }) expected = n + 1 detected_count = sum(1 for e in candidates_by_page if e['detected_number'] is not None) # Insurance docs often have unnumbered TOC / divider pages, so isolated # discontinuities are normal. Score gently — surface the data, let the # reviewer judge whether a gap is a real missing-page defect or a # legitimate unnumbered section divider. if detected_count == 0: score = 5.0 summary = 'No page numbers detected — cannot validate continuity.' pass_flag = False elif issues: # Cap penalty: 1 discontinuity → 8/10, 5+ → 5/10 score = round(max(5.0, 10 - len(issues) * 0.8), 2) summary = ( f'{len(issues)} page-number discontinuit{"y" if len(issues) == 1 else "ies"} ' f'detected (heuristic — review against the doc to confirm).' ) pass_flag = False else: score = 10.0 summary = f'Page numbering continuous across {detected_count} pages with detectable numbers.' pass_flag = True response_lines = [summary, ''] if issues: response_lines.append('Discontinuities:') for i in issues: response_lines.append( f" • Page index {i['page_index']}: expected {i['expected']}, found {i['detected']}" ) response = '\n'.join(response_lines) return { 'check_name': 'axa_page_numbering', 'scope': 'document', 'score': score, 'pass': pass_flag, 'summary': summary, 'findings': { 'pages_total': ingest_result.get('pages_processed', 0), 'pages_with_detected_number': detected_count, 'discontinuities': issues, }, 'response': response, } # ───────────────────────────────────────────────────────────────────────────── # Targeted checks (specific page or page set) # ───────────────────────────────────────────────────────────────────────────── def _resolve_pages(scope_args: Optional[Dict], ingest_result: Dict) -> List[int]: """Resolve a scope_args.pages spec to actual page numbers. Supported specs: "first", "last", "first-N", "last-N", or an explicit list of ints. """ pages_processed = ingest_result.get('pages_processed', 0) if pages_processed == 0: return [] if not scope_args or 'pages' not in scope_args: return [pages_processed] # default: last page spec = scope_args['pages'] if isinstance(spec, list): return [p for p in spec if 1 <= p <= pages_processed] if spec == 'first': return [1] if spec == 'last': return [pages_processed] if isinstance(spec, str) and spec.startswith('first-'): n = int(spec.split('-', 1)[1]) return list(range(1, min(n, pages_processed) + 1)) if isinstance(spec, str) and spec.startswith('last-'): n = int(spec.split('-', 1)[1]) return list(range(max(1, pages_processed - n + 1), pages_processed + 1)) return [pages_processed] def _collect_text_for_pages(ingest_result: Dict, page_nums: List[int]) -> str: text_chunks = [] for page in ingest_result.get('pages', []): if page['page_num'] in page_nums: text_chunks.append(page.get('raw_text') or '') return '\n'.join(text_chunks) def axa_print_code(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict: """Find and report the print code on the targeted page(s) — usually back page only. AXA print codes look like 'AXA-XX-NNNN' or '1234-5678' in documented practice; we'll surface anything that matches a code-like pattern in the page footer/back text and let the user confirm. """ pages = _resolve_pages(scope_args, ingest_result) text = _collect_text_for_pages(ingest_result, pages) # AXA Ireland back-page print line — observed real format on Example 1: # "AG400 11/25 6317047 V8" # Pattern: 2-4 letter prefix + 2-5 digits, optionally followed by date + ref + version code_pattern = re.compile(r'\b[A-Z]{2,4}\d{2,5}\b') date_pattern = re.compile(r'\b\d{1,2}[-/]\d{2,4}\b') version_pattern = re.compile(r'\bV\d{1,3}\b') ref_pattern = re.compile(r'\b\d{6,8}\b') code_matches = list(dict.fromkeys(m.group() for m in code_pattern.finditer(text))) date_matches = list(dict.fromkeys(m.group() for m in date_pattern.finditer(text))) version_matches = list(dict.fromkeys(m.group() for m in version_pattern.finditer(text))) ref_matches = list(dict.fromkeys(m.group() for m in ref_pattern.finditer(text))) matches = code_matches + ref_matches + date_matches + version_matches has_code = bool(code_matches) has_date = bool(date_matches) has_version = bool(version_matches) component_count = sum([has_code, has_date, has_version]) if component_count >= 2: score = 10.0 pass_flag = True summary = ( f'Print/version line found on page(s) {pages}: code={code_matches}, ' f'date={date_matches}, version={version_matches}.' ) elif component_count == 1: score = 6.0 pass_flag = False summary = f'Partial print/version line on page(s) {pages} — some components missing.' else: score = 3.0 pass_flag = False summary = f'No print-code-shaped content found on page(s) {pages}.' response_lines = [summary, ''] response_lines.append(f'Code candidates: {code_matches or "(none)"}') response_lines.append(f'Document refs: {ref_matches or "(none)"}') response_lines.append(f'Date candidates: {date_matches or "(none)"}') response_lines.append(f'Version candidates: {version_matches or "(none)"}') response = '\n'.join(response_lines) return { 'check_name': 'axa_print_code', 'scope': 'targeted', 'score': score, 'pass': pass_flag, 'summary': summary, 'findings': { 'pages_inspected': pages, 'code_candidates': code_matches, 'doc_refs': ref_matches, 'date_candidates': date_matches, 'version_candidates': version_matches, }, 'response': response, } def axa_omg_versioning(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict: """OMG number + date format check on the targeted page(s) (back page). OMG codes — per AXA convention — look like 'OMG-XXXXX' or 'OMG XXXXX'. Date formats expected: dd/mm/yyyy or 'Month YYYY' on back page. """ pages = _resolve_pages(scope_args, ingest_result) text = _collect_text_for_pages(ingest_result, pages) omg_pattern = re.compile(r'\bOMG[\s-]?[A-Z0-9]{2,8}\b', re.IGNORECASE) date_patterns = [ re.compile(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b'), re.compile(r'\b\d{1,2}-\d{1,2}-\d{2,4}\b'), re.compile(r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{4}\b'), ] omg_matches = list({m.group() for m in omg_pattern.finditer(text)}) date_matches = [] for p in date_patterns: date_matches.extend(m.group() for m in p.finditer(text)) date_matches = list(dict.fromkeys(date_matches)) # dedup, preserve order if omg_matches and date_matches: score = 10.0 pass_flag = True summary = f'OMG code + date format both present on page(s) {pages}.' elif omg_matches: score = 7.0 pass_flag = False summary = 'OMG code found, but no recognisable date format on the targeted page(s).' elif date_matches: score = 5.0 pass_flag = False summary = 'Date format present, but no OMG code found on the targeted page(s).' else: score = 3.0 pass_flag = False summary = 'Neither OMG code nor date format detected on the targeted page(s).' response_lines = [summary, ''] response_lines.append(f'OMG matches: {omg_matches or "(none)"}') response_lines.append(f'Date matches: {date_matches or "(none)"}') response = '\n'.join(response_lines) return { 'check_name': 'axa_omg_versioning', 'scope': 'targeted', 'score': score, 'pass': pass_flag, 'summary': summary, 'findings': { 'pages_inspected': pages, 'omg_matches': omg_matches, 'date_matches': date_matches, }, 'response': response, } # ───────────────────────────────────────────────────────────────────────────── # Registry # ───────────────────────────────────────────────────────────────────────────── from .accessibility_checks import axa_pdf_accessibility from .print_preflight_checks import axa_print_preflight CHECK_REGISTRY = { 'axa_font_inventory': {'fn': axa_font_inventory, 'scope': 'document'}, 'axa_phone_inventory': {'fn': axa_phone_inventory, 'scope': 'document'}, 'axa_bold_words_definitions': {'fn': axa_bold_words_definitions, 'scope': 'document'}, 'axa_page_numbering': {'fn': axa_page_numbering, 'scope': 'document'}, 'axa_print_code': {'fn': axa_print_code, 'scope': 'targeted'}, 'axa_omg_versioning': {'fn': axa_omg_versioning, 'scope': 'targeted'}, 'axa_pdf_accessibility': {'fn': axa_pdf_accessibility, 'scope': 'document'}, 'axa_print_preflight': {'fn': axa_print_preflight, 'scope': 'document'}, } def get_check(check_name: str): """Return registry entry for a check name, or None if unknown.""" return CHECK_REGISTRY.get(check_name) def is_document_scope_check(check_name: str) -> bool: """True if this check is one of our document-mode deterministic checks.""" return check_name in CHECK_REGISTRY