ai_qc/backend/document_mode/checks.py
nickviljoen 90563b8cf2 Add AXA document-mode QC pipeline (Phases 1, 3, 4, 5)
Multi-page PDF QC for AXA Ireland policy documents. Runs as a third mode
alongside static + video, gated on profile.mode. New code isolated under
backend/document_mode/ with new endpoints under /api/document/*.

Phase 1 — Spine + 6 deterministic doc-scope checks ($0, runs in seconds):
- Scope-aware dispatcher (document/targeted/page_sample/page_pair/page_each)
- axa_font_inventory, axa_phone_inventory, axa_bold_words_definitions,
  axa_page_numbering, axa_print_code, axa_omg_versioning
- Bootstrap bold-words dictionary extracted from Example 1 General Definitions

Phase 3 — Old-vs-new diff (~$0.50/run, 3-5 min):
- Page alignment via difflib SequenceMatcher (windowed fuzzy match)
- Vision-LLM page-pair diff via Gemini 2.5 Pro (8 concurrent)
- Two-slot upload UX, axa_policy_document_diff profile, mode=document_diff

Phase 4 — PDF accessibility (PyMuPDF, $0):
- 9 PDF/UA-1 aligned criteria (tagged structure, /MarkInfo, title, /Lang,
  encryption, font embedding, PDF version, XMP UA-conformance, alt-text)
- _run_verapdf() stub for optional Java-based veraPDF integration later

Phase 5 — Print preflight (PyMuPDF, $0):
- 7 criteria (page geometry, bleed, image colour spaces, image DPI,
  transparency, PDF/X conformance, spot colours)

Profile additions:
- axa_policy_document — 8 deterministic checks, $0 cost
- axa_policy_document_diff — 1 page-pair LLM check, ~$0.50/run

API additions:
- POST /api/document/start_analysis (single PDF)
- POST /api/document/start_diff (old + new PDFs)

Frontend additions:
- Third profile.mode value (document_diff) in applyProfileMode()
- Two-slot upload UX with PDF-only file pickers
- checkFormValidity() branches by mode for the analyse-button gate

Smoke-tested locally against Example 1 (Home Insurance V8, 86pp) and
Example 2 (Landlord V1 vs V10, 68→74pp) with real findings caught
including bold-words gaps, missing PDF/UA flag, transparency on press,
V1→V10 bold-formatting fixes. Plan + integration map + gotchas in
backend/AXA_DOCUMENT_MODE_PLAN.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 18:38:14 +02:00

531 lines
21 KiB
Python

"""Document-scope deterministic checks for AXA policy QC.
Each check is a function that takes (ingest_result, scope_args) and returns
a result dict. None of them call an LLM — they operate on the structured
text + font data already produced by ingest.py, so they're $0 and run in
milliseconds. Registered in CHECK_REGISTRY for the dispatcher to look up.
Result schema:
{
'check_name': str,
'scope': str,
'score': float, # 0-10, used for the overall doc score
'pass': bool, # findings-driven
'summary': str, # one-line headline for the report
'findings': dict, # structured payload (lists, counts, etc.)
'response': str, # human-readable longform for the report
}
For now, "list-only" checks (font_inventory, phone_inventory) score 10/10 —
they're informational. Once approved-list configs land, they'll flip to
compliance scoring.
"""
import json
import os
import re
from collections import Counter, defaultdict
from typing import Any, Dict, List, Optional
_DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
def _load_bold_words_seed() -> Dict[str, Any]:
path = os.path.join(_DATA_DIR, 'axa_bold_words_seed.json')
if not os.path.exists(path):
return {'terms': [], 'source': 'missing'}
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
# ─────────────────────────────────────────────────────────────────────────────
# Checks
# ─────────────────────────────────────────────────────────────────────────────
def axa_font_inventory(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict:
"""List every unique font found in the document, with per-page distribution.
Until AXA provides an approved Monotype font list, this is informational
(score 10). When the list arrives, this check becomes axa_font_compliance
and flags non-approved fonts.
"""
font_pages: Dict[str, List[int]] = defaultdict(list)
for page in ingest_result.get('pages', []):
for font in page.get('fonts_used') or []:
font_pages[font].append(page['page_num'])
fonts_sorted = sorted(font_pages.keys())
distribution = [
{'font': f, 'page_count': len(font_pages[f]), 'pages': font_pages[f]}
for f in fonts_sorted
]
summary = f"Found {len(fonts_sorted)} unique fonts across {ingest_result.get('pages_processed', 0)} pages."
response_lines = [summary, '']
for d in distribution:
response_lines.append(f"{d['font']}{d['page_count']} pages")
response = '\n'.join(response_lines)
return {
'check_name': 'axa_font_inventory',
'scope': 'document',
'score': 10.0,
'pass': True,
'summary': summary,
'findings': {
'unique_fonts': fonts_sorted,
'total_unique': len(fonts_sorted),
'distribution': distribution,
},
'response': response,
}
def axa_phone_inventory(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict:
"""Extract every phone number found in the document, deduplicated.
Targets common Irish/UK formats: 1800/1850 freephones, +353 international,
01 / 021 etc. landlines. Until an approved list is supplied, this is
informational (score 10). Becomes axa_phone_compliance when list lands.
"""
# Capture: optional + country code, optional spaces/dashes/parens between groups
# Conservative regex — Irish/UK shapes with at least 8 total digits.
pattern = re.compile(
r"\(?\+?\d{1,3}\)?[\s\-.]?(?:\(?\d{2,5}\)?[\s\-.]?){1,3}\d{2,4}"
)
raw_finds: Dict[str, List[int]] = defaultdict(list)
for page in ingest_result.get('pages', []):
text = page.get('raw_text') or ''
for match in pattern.finditer(text):
number = re.sub(r'[\s\-.()]+', ' ', match.group()).strip()
digits = re.sub(r'\D', '', number)
# Filter: must have ≥ 8 digits to count as a phone number
if len(digits) < 8 or len(digits) > 15:
continue
raw_finds[number].append(page['page_num'])
numbers_sorted = sorted(raw_finds.keys())
distribution = [
{'number': n, 'occurrences': len(raw_finds[n]), 'pages': sorted(set(raw_finds[n]))}
for n in numbers_sorted
]
summary = f"Found {len(numbers_sorted)} unique phone-like numbers in the document."
response_lines = [summary, '']
for d in distribution:
response_lines.append(f"{d['number']}{d['occurrences']} occurrences (pages {d['pages']})")
response = '\n'.join(response_lines) if numbers_sorted else f"{summary}\n(No phone-like numbers detected.)"
return {
'check_name': 'axa_phone_inventory',
'scope': 'document',
'score': 10.0,
'pass': True,
'summary': summary,
'findings': {
'unique_numbers': numbers_sorted,
'total_unique': len(numbers_sorted),
'distribution': distribution,
},
'response': response,
}
def axa_bold_words_definitions(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict:
"""For each defined term in the seed dictionary, scan all pages: every
occurrence outside the definitions section should be rendered bold. Flag
any non-bold occurrences — these are the Example-2-class misses (70+
bold definitions that shipped not bolded).
"""
seed = _load_bold_words_seed()
terms = seed.get('terms', [])
if not terms:
return {
'check_name': 'axa_bold_words_definitions',
'scope': 'document',
'score': 0.0,
'pass': False,
'summary': 'No bold-words seed dictionary found — check disabled.',
'findings': {'error': 'seed_missing'},
'response': 'Cannot run — backend/document_mode/data/axa_bold_words_seed.json missing or empty.',
}
# Pre-compile case-insensitive whole-word patterns
term_patterns = [
(term, re.compile(r'\b' + re.escape(term) + r'\b', re.IGNORECASE))
for term in terms
]
# Pages where the definitions section itself lives — by convention exclude
# them from the violation count (the term is defined there, not used).
definitions_pages = set(
s.get('source_page') for s in seed.get('sources', []) if s.get('source_page')
)
if scope_args and scope_args.get('exclude_pages'):
definitions_pages.update(scope_args['exclude_pages'])
violations: List[Dict] = []
bold_occurrences = 0
non_bold_occurrences = 0
for page in ingest_result.get('pages', []):
page_num = page['page_num']
if page_num in definitions_pages:
continue
spans = page.get('spans') or []
for span in spans:
span_text = span.get('text') or ''
if not span_text:
continue
for term, pattern in term_patterns:
if pattern.search(span_text):
if span.get('bold'):
bold_occurrences += 1
else:
non_bold_occurrences += 1
violations.append({
'page': page_num,
'term': term,
'context': span_text,
'font': span.get('font'),
'size': span.get('size'),
})
total = bold_occurrences + non_bold_occurrences
pass_flag = non_bold_occurrences == 0
if total == 0:
score = 10.0
summary = f"No occurrences of {len(terms)} defined terms found outside the definitions section."
elif pass_flag:
score = 10.0
summary = f"All {bold_occurrences} occurrences of {len(terms)} defined terms are correctly rendered bold."
else:
ratio = bold_occurrences / total
score = round(max(0.0, min(10.0, ratio * 10)), 2)
summary = (
f"{non_bold_occurrences} non-bold occurrences of defined terms found "
f"(across {len({v['page'] for v in violations})} pages). "
f"{bold_occurrences} occurrences correctly bold."
)
response_lines = [summary, '']
if violations:
response_lines.append('Non-bold violations (first 50 shown):')
for v in violations[:50]:
ctx = v['context'][:80] + ('' if len(v['context']) > 80 else '')
response_lines.append(f" • Page {v['page']}: '{v['term']}' in: \"{ctx}\"")
if len(violations) > 50:
response_lines.append(f' ...and {len(violations) - 50} more.')
response = '\n'.join(response_lines)
return {
'check_name': 'axa_bold_words_definitions',
'scope': 'document',
'score': score,
'pass': pass_flag,
'summary': summary,
'findings': {
'dictionary_size': len(terms),
'definitions_pages_excluded': sorted(definitions_pages),
'bold_occurrences': bold_occurrences,
'non_bold_occurrences': non_bold_occurrences,
'violations': violations,
'pages_with_violations': sorted({v['page'] for v in violations}),
},
'response': response,
}
def axa_page_numbering(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict:
"""Verify the document's printed page numbering is continuous (1, 2, 3 …)
by parsing the first integer found near the top or bottom of each page's
raw text. Catches Example-2's 'missing page' defect.
NB: this is heuristic — relies on the page number being a standalone
digit on its own line. Skips pages where no candidate is found.
"""
found_numbers: List[Optional[int]] = []
candidates_by_page: List[Dict] = []
for page in ingest_result.get('pages', []):
raw = page.get('raw_text') or ''
# Only inspect the first and last 200 chars (where page numbers usually live)
head = raw[:200]
tail = raw[-200:] if len(raw) > 400 else raw
candidate = None
# Look for standalone-line integers
for chunk in (tail, head): # tail first — footer numbering is more common
for line in chunk.splitlines():
line_clean = line.strip()
if line_clean.isdigit():
n = int(line_clean)
if 0 < n < 1000:
candidate = n
break
if candidate is not None:
break
found_numbers.append(candidate)
candidates_by_page.append({'page_index': page['page_num'], 'detected_number': candidate})
# Walk the sequence: expect each detected number to equal previous + 1
issues = []
expected = None
for entry in candidates_by_page:
n = entry['detected_number']
if n is None:
continue # skip pages with no detectable number
if expected is not None and n != expected:
issues.append({
'page_index': entry['page_index'],
'expected': expected,
'detected': n,
})
expected = n + 1
detected_count = sum(1 for e in candidates_by_page if e['detected_number'] is not None)
# Insurance docs often have unnumbered TOC / divider pages, so isolated
# discontinuities are normal. Score gently — surface the data, let the
# reviewer judge whether a gap is a real missing-page defect or a
# legitimate unnumbered section divider.
if detected_count == 0:
score = 5.0
summary = 'No page numbers detected — cannot validate continuity.'
pass_flag = False
elif issues:
# Cap penalty: 1 discontinuity → 8/10, 5+ → 5/10
score = round(max(5.0, 10 - len(issues) * 0.8), 2)
summary = (
f'{len(issues)} page-number discontinuit{"y" if len(issues) == 1 else "ies"} '
f'detected (heuristic — review against the doc to confirm).'
)
pass_flag = False
else:
score = 10.0
summary = f'Page numbering continuous across {detected_count} pages with detectable numbers.'
pass_flag = True
response_lines = [summary, '']
if issues:
response_lines.append('Discontinuities:')
for i in issues:
response_lines.append(
f" • Page index {i['page_index']}: expected {i['expected']}, found {i['detected']}"
)
response = '\n'.join(response_lines)
return {
'check_name': 'axa_page_numbering',
'scope': 'document',
'score': score,
'pass': pass_flag,
'summary': summary,
'findings': {
'pages_total': ingest_result.get('pages_processed', 0),
'pages_with_detected_number': detected_count,
'discontinuities': issues,
},
'response': response,
}
# ─────────────────────────────────────────────────────────────────────────────
# Targeted checks (specific page or page set)
# ─────────────────────────────────────────────────────────────────────────────
def _resolve_pages(scope_args: Optional[Dict], ingest_result: Dict) -> List[int]:
"""Resolve a scope_args.pages spec to actual page numbers.
Supported specs: "first", "last", "first-N", "last-N", or an explicit list of ints.
"""
pages_processed = ingest_result.get('pages_processed', 0)
if pages_processed == 0:
return []
if not scope_args or 'pages' not in scope_args:
return [pages_processed] # default: last page
spec = scope_args['pages']
if isinstance(spec, list):
return [p for p in spec if 1 <= p <= pages_processed]
if spec == 'first':
return [1]
if spec == 'last':
return [pages_processed]
if isinstance(spec, str) and spec.startswith('first-'):
n = int(spec.split('-', 1)[1])
return list(range(1, min(n, pages_processed) + 1))
if isinstance(spec, str) and spec.startswith('last-'):
n = int(spec.split('-', 1)[1])
return list(range(max(1, pages_processed - n + 1), pages_processed + 1))
return [pages_processed]
def _collect_text_for_pages(ingest_result: Dict, page_nums: List[int]) -> str:
text_chunks = []
for page in ingest_result.get('pages', []):
if page['page_num'] in page_nums:
text_chunks.append(page.get('raw_text') or '')
return '\n'.join(text_chunks)
def axa_print_code(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict:
"""Find and report the print code on the targeted page(s) — usually back
page only. AXA print codes look like 'AXA-XX-NNNN' or '1234-5678' in
documented practice; we'll surface anything that matches a code-like
pattern in the page footer/back text and let the user confirm.
"""
pages = _resolve_pages(scope_args, ingest_result)
text = _collect_text_for_pages(ingest_result, pages)
# AXA Ireland back-page print line — observed real format on Example 1:
# "AG400 11/25 6317047 V8"
# Pattern: 2-4 letter prefix + 2-5 digits, optionally followed by date + ref + version
code_pattern = re.compile(r'\b[A-Z]{2,4}\d{2,5}\b')
date_pattern = re.compile(r'\b\d{1,2}[-/]\d{2,4}\b')
version_pattern = re.compile(r'\bV\d{1,3}\b')
ref_pattern = re.compile(r'\b\d{6,8}\b')
code_matches = list(dict.fromkeys(m.group() for m in code_pattern.finditer(text)))
date_matches = list(dict.fromkeys(m.group() for m in date_pattern.finditer(text)))
version_matches = list(dict.fromkeys(m.group() for m in version_pattern.finditer(text)))
ref_matches = list(dict.fromkeys(m.group() for m in ref_pattern.finditer(text)))
matches = code_matches + ref_matches + date_matches + version_matches
has_code = bool(code_matches)
has_date = bool(date_matches)
has_version = bool(version_matches)
component_count = sum([has_code, has_date, has_version])
if component_count >= 2:
score = 10.0
pass_flag = True
summary = (
f'Print/version line found on page(s) {pages}: code={code_matches}, '
f'date={date_matches}, version={version_matches}.'
)
elif component_count == 1:
score = 6.0
pass_flag = False
summary = f'Partial print/version line on page(s) {pages} — some components missing.'
else:
score = 3.0
pass_flag = False
summary = f'No print-code-shaped content found on page(s) {pages}.'
response_lines = [summary, '']
response_lines.append(f'Code candidates: {code_matches or "(none)"}')
response_lines.append(f'Document refs: {ref_matches or "(none)"}')
response_lines.append(f'Date candidates: {date_matches or "(none)"}')
response_lines.append(f'Version candidates: {version_matches or "(none)"}')
response = '\n'.join(response_lines)
return {
'check_name': 'axa_print_code',
'scope': 'targeted',
'score': score,
'pass': pass_flag,
'summary': summary,
'findings': {
'pages_inspected': pages,
'code_candidates': code_matches,
'doc_refs': ref_matches,
'date_candidates': date_matches,
'version_candidates': version_matches,
},
'response': response,
}
def axa_omg_versioning(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict:
"""OMG number + date format check on the targeted page(s) (back page).
OMG codes — per AXA convention — look like 'OMG-XXXXX' or 'OMG XXXXX'.
Date formats expected: dd/mm/yyyy or 'Month YYYY' on back page.
"""
pages = _resolve_pages(scope_args, ingest_result)
text = _collect_text_for_pages(ingest_result, pages)
omg_pattern = re.compile(r'\bOMG[\s-]?[A-Z0-9]{2,8}\b', re.IGNORECASE)
date_patterns = [
re.compile(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b'),
re.compile(r'\b\d{1,2}-\d{1,2}-\d{2,4}\b'),
re.compile(r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{4}\b'),
]
omg_matches = list({m.group() for m in omg_pattern.finditer(text)})
date_matches = []
for p in date_patterns:
date_matches.extend(m.group() for m in p.finditer(text))
date_matches = list(dict.fromkeys(date_matches)) # dedup, preserve order
if omg_matches and date_matches:
score = 10.0
pass_flag = True
summary = f'OMG code + date format both present on page(s) {pages}.'
elif omg_matches:
score = 7.0
pass_flag = False
summary = 'OMG code found, but no recognisable date format on the targeted page(s).'
elif date_matches:
score = 5.0
pass_flag = False
summary = 'Date format present, but no OMG code found on the targeted page(s).'
else:
score = 3.0
pass_flag = False
summary = 'Neither OMG code nor date format detected on the targeted page(s).'
response_lines = [summary, '']
response_lines.append(f'OMG matches: {omg_matches or "(none)"}')
response_lines.append(f'Date matches: {date_matches or "(none)"}')
response = '\n'.join(response_lines)
return {
'check_name': 'axa_omg_versioning',
'scope': 'targeted',
'score': score,
'pass': pass_flag,
'summary': summary,
'findings': {
'pages_inspected': pages,
'omg_matches': omg_matches,
'date_matches': date_matches,
},
'response': response,
}
# ─────────────────────────────────────────────────────────────────────────────
# Registry
# ─────────────────────────────────────────────────────────────────────────────
from .accessibility_checks import axa_pdf_accessibility
from .print_preflight_checks import axa_print_preflight
CHECK_REGISTRY = {
'axa_font_inventory': {'fn': axa_font_inventory, 'scope': 'document'},
'axa_phone_inventory': {'fn': axa_phone_inventory, 'scope': 'document'},
'axa_bold_words_definitions': {'fn': axa_bold_words_definitions, 'scope': 'document'},
'axa_page_numbering': {'fn': axa_page_numbering, 'scope': 'document'},
'axa_print_code': {'fn': axa_print_code, 'scope': 'targeted'},
'axa_omg_versioning': {'fn': axa_omg_versioning, 'scope': 'targeted'},
'axa_pdf_accessibility': {'fn': axa_pdf_accessibility, 'scope': 'document'},
'axa_print_preflight': {'fn': axa_print_preflight, 'scope': 'document'},
}
def get_check(check_name: str):
"""Return registry entry for a check name, or None if unknown."""
return CHECK_REGISTRY.get(check_name)
def is_document_scope_check(check_name: str) -> bool:
"""True if this check is one of our document-mode deterministic checks."""
return check_name in CHECK_REGISTRY