ai_qc/backend/document_mode/diff_engine.py
nickviljoen d327776c70 fix(diff_engine): guard compute_formatting_diff against per-pair failure
If the deterministic formatting comparator raises on any single page-pair
(e.g. unexpected span shape from a future PyMuPDF version), degrade to
zero formatting findings for that pair instead of aborting the whole
52-page diff run. Logged for visibility.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 10:31:16 +02:00

561 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Old-vs-new PDF diff engine for AXA Policy Document Diff.
Two-stage pipeline:
1. Page alignment — fuzzy text matching (difflib.SequenceMatcher) between
old.pages and new.pages. Each old page either pairs with a new page
(above similarity threshold) or is marked as removed; new pages with
no old counterpart are marked as added.
2. Vision LLM page-pair diff — for each aligned pair, send both rendered
PNGs to Gemini with a structured prompt asking what changed. Output
is parsed JSON: added/removed/moved/style-changes plus a severity tag.
Cost shape: ~1 LLM call per aligned page-pair. For an 80-page policy that's
~$0.40-0.80 with Gemini 2.5 Pro. Pairs run in parallel via ThreadPoolExecutor
(max 8 concurrent — conservative, room to tune).
"""
from __future__ import annotations
import concurrent.futures
import json
import re
from difflib import SequenceMatcher
from typing import Dict, List, Optional, Tuple
from PIL import Image
from document_mode.formatting_diff import compute_formatting_diff
# Similarity threshold for considering two pages "the same page modified"
# vs "an inserted/removed page". Tuned for policy docs where page-level text
# fingerprint is highly distinctive (section headers + body para). 0.4 is
# permissive enough to handle small-paragraph rewrites; below that we
# treat the page as inserted/removed rather than modified.
SIMILARITY_THRESHOLD = 0.4
# Window — how many positions ahead/behind to scan for the best new-page
# match for each old page. Avoids O(N²) blowup on long docs while still
# accommodating moderate page-shift caused by inserts/removes.
ALIGNMENT_WINDOW = 8
# Max images sent to vision LLM in parallel
PARALLEL_PAIRS = 8
def _fingerprint(raw_text: str, length: int = 1000) -> str:
"""Normalise text for similarity comparison. Lowercased, whitespace
collapsed, truncated to first N chars (page header + first paragraph
is usually distinctive enough)."""
if not raw_text:
return ''
norm = re.sub(r'\s+', ' ', raw_text.lower()).strip()
return norm[:length]
def _text_similarity(a: str, b: str) -> float:
"""0.01.0 similarity ratio between two normalised page texts."""
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
def align_pages(old_pages: List[Dict], new_pages: List[Dict]) -> List[Dict]:
"""Greedy page alignment with windowed lookahead.
Returns a list of alignment entries describing how old/new pages map
onto each other:
[
{'old_page': 1, 'new_page': 1, 'similarity': 0.99, 'status': 'matched'},
{'old_page': null, 'new_page': 5, 'status': 'added'},
{'old_page': 47, 'new_page': null, 'status': 'removed'},
...
]
Algorithm:
• Walk old pages 1..N. For each, scan new pages within
[last_matched_new + 1, last_matched_new + 1 + WINDOW] for the best
text-similarity match.
• If best match ≥ threshold: pair them and advance both cursors.
• If best match < threshold: mark old page as 'removed', advance only
the old cursor.
• Any new pages skipped over by the cursor are marked as 'added'.
"""
old_fps = [_fingerprint(p.get('raw_text') or '') for p in old_pages]
new_fps = [_fingerprint(p.get('raw_text') or '') for p in new_pages]
alignment: List[Dict] = []
new_cursor = 0 # Next unmatched new page index
new_consumed = set()
for old_idx, old_fp in enumerate(old_fps):
best_score = -1.0
best_new_idx = -1
# Search forward from new_cursor up to WINDOW pages ahead
scan_end = min(len(new_fps), new_cursor + ALIGNMENT_WINDOW + 1)
for j in range(new_cursor, scan_end):
if j in new_consumed:
continue
score = _text_similarity(old_fp, new_fps[j])
if score > best_score:
best_score = score
best_new_idx = j
if best_score >= SIMILARITY_THRESHOLD:
# Any new pages between new_cursor and best_new_idx are inserts
for k in range(new_cursor, best_new_idx):
if k not in new_consumed:
alignment.append({
'old_page': None,
'new_page': new_pages[k]['page_num'],
'similarity': None,
'status': 'added',
})
alignment.append({
'old_page': old_pages[old_idx]['page_num'],
'new_page': new_pages[best_new_idx]['page_num'],
'similarity': round(best_score, 3),
'status': 'matched',
})
new_consumed.add(best_new_idx)
new_cursor = best_new_idx + 1
else:
# No good match found — old page was removed
alignment.append({
'old_page': old_pages[old_idx]['page_num'],
'new_page': None,
'similarity': round(best_score, 3) if best_score >= 0 else None,
'status': 'removed',
})
# Any remaining unconsumed new pages are inserts at the end
for j in range(new_cursor, len(new_fps)):
if j not in new_consumed:
alignment.append({
'old_page': None,
'new_page': new_pages[j]['page_num'],
'similarity': None,
'status': 'added',
})
return alignment
# ─────────────────────────────────────────────────────────────────────────────
# Vision LLM page-pair diff
# ─────────────────────────────────────────────────────────────────────────────
_DIFF_PROMPT = """You are a quality-control reviewer comparing two versions of the same insurance policy document page. The first image is the OLD version. The second image is the NEW version.
Your job: identify every meaningful difference between the two pages. Be specific — quote the actual text where you can.
Respond with ONLY a JSON object in this exact schema (no markdown fences, no commentary):
{
"differences_found": true|false,
"added": ["specific text or element added in NEW that wasn't in OLD"],
"removed": ["specific text or element removed in NEW that was in OLD"],
"modified": ["specific text or element changed in wording, formatting, bolding, color, or font"],
"moved": ["element repositioned (e.g. blue box moved from top-right to bottom-left)"],
"style_changes": ["color, font, size, bold/italic, layout shift not covered above"],
"severity": "high|medium|low|none",
"summary": "one-sentence overview of what changed on this page"
}
Rules:
- "high" severity = content changes that affect cover/exclusions/customer-facing terms (added paragraphs, removed sections, definition changes)
- "medium" = formatting changes that affect readability or compliance (un-bolded defined terms, moved key elements)
- "low" = cosmetic only (slight color tweak, kerning adjustment)
- "none" = pages are visually identical
- If unsure whether something is a difference, INCLUDE it — better false positive than missed defect
- Empty arrays are OK if a category has no findings
- Always return all fields, even if empty"""
def _parse_diff_response(text: str) -> Dict:
"""Extract the JSON object from the LLM response. Handles cases where
the model wraps it in ```json fences despite instructions, and falls
back to a structured-error result if parsing fails."""
if not text:
return _empty_diff('Empty response')
# Strip code fences if present
cleaned = text.strip()
if cleaned.startswith('```'):
cleaned = re.sub(r'^```[a-z]*\n?', '', cleaned)
cleaned = re.sub(r'\n?```\s*$', '', cleaned)
cleaned = cleaned.strip()
# Find the first { and matching last }
first = cleaned.find('{')
last = cleaned.rfind('}')
if first == -1 or last == -1 or last <= first:
return _empty_diff(f'No JSON object in response: {text[:200]}')
try:
data = json.loads(cleaned[first:last + 1])
except json.JSONDecodeError as e:
return _empty_diff(f'JSON parse failed: {e}; raw: {text[:200]}')
return {
'differences_found': bool(data.get('differences_found', False)),
'added': data.get('added') or [],
'removed': data.get('removed') or [],
'modified': data.get('modified') or [],
'moved': data.get('moved') or [],
'style_changes': data.get('style_changes') or [],
'severity': data.get('severity') or 'none',
'summary': data.get('summary') or '',
}
def _empty_diff(error: Optional[str] = None) -> Dict:
return {
'differences_found': False,
'added': [],
'removed': [],
'modified': [],
'moved': [],
'style_changes': [],
'severity': 'none',
'summary': '',
'error': error,
}
def _diff_one_pair(
old_page: Dict,
new_page: Dict,
call_gemini_vision_fn,
model_version: Optional[str] = None,
) -> Dict:
"""Run vision LLM on a single page-pair. Returns diff dict + token usage.
Wraps call_gemini_vision so the dispatcher doesn't have to know the
LLM-call signature.
"""
try:
old_img = Image.open(old_page['image_path']).convert('RGB')
new_img = Image.open(new_page['image_path']).convert('RGB')
except Exception as e:
return {
'old_page': old_page['page_num'],
'new_page': new_page['page_num'],
'diff': _empty_diff(f'Image load failed: {e}'),
'token_usage': {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0},
}
try:
response_text, token_usage = call_gemini_vision_fn(
_DIFF_PROMPT, old_img, new_img, model_version=model_version
)
except Exception as e:
return {
'old_page': old_page['page_num'],
'new_page': new_page['page_num'],
'diff': _empty_diff(f'LLM call failed: {e}'),
'token_usage': {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0},
}
diff = _parse_diff_response(response_text)
return {
'old_page': old_page['page_num'],
'new_page': new_page['page_num'],
'diff': diff,
'token_usage': token_usage,
}
def run_page_pair_diff(
*,
old_ingest: Dict,
new_ingest: Dict,
call_gemini_vision_fn,
progress_callback=None,
model_version: Optional[str] = None,
parallel_pairs: int = PARALLEL_PAIRS,
) -> Dict:
"""Top-level entrypoint. Aligns pages, then diffs each matched pair via
vision LLM in parallel. Returns:
{
'alignment': [...],
'pair_diffs': {pair_key: diff_result, ...},
'totals': {pages_added, pages_removed, pages_matched, ...},
'token_usage': {prompt_tokens, completion_tokens, total_tokens},
}
"""
old_pages = old_ingest.get('pages') or []
new_pages = new_ingest.get('pages') or []
alignment = align_pages(old_pages, new_pages)
# Index pages by page_num for fast lookup in the diff loop
old_by_num = {p['page_num']: p for p in old_pages}
new_by_num = {p['page_num']: p for p in new_pages}
# Build diff tasks for matched pairs only
matched_entries = [e for e in alignment if e['status'] == 'matched']
total_pairs = len(matched_entries)
pair_diffs: Dict[str, Dict] = {}
aggregate_tokens = {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}
completed = 0
def _run(entry):
old_p = old_by_num.get(entry['old_page'])
new_p = new_by_num.get(entry['new_page'])
if not old_p or not new_p or not old_p.get('image_path') or not new_p.get('image_path'):
return entry, None
result = _diff_one_pair(old_p, new_p, call_gemini_vision_fn, model_version)
# Deterministic formatting diff — runs alongside the LLM diff.
# Guard so a single bad span on one page doesn't abort the whole run.
try:
fmt = compute_formatting_diff(
old_p.get('spans') or [],
new_p.get('spans') or [],
old_p['page_num'],
new_p['page_num'],
)
except Exception as fmt_err:
print(f" [formatting_diff] page {old_p['page_num']}->{new_p['page_num']} failed: {fmt_err}")
fmt = {'formatting_changes': [], 'finding_count': 0}
diff = result.setdefault('diff', {})
diff['formatting_changes'] = fmt['formatting_changes']
if fmt['finding_count'] > 0:
# If the LLM saw the page as identical but the deterministic
# layer found typographic flips, we still need the report to
# render the pair as "has changes".
diff['differences_found'] = True
# Each aggregated finding contributes one medium severity entry.
# Bump the pair's overall severity to at least 'medium' so the
# pair-card pill reflects the finding count.
if diff.get('severity') in (None, 'none'):
diff['severity'] = 'medium'
return entry, result
with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_pairs) as pool:
futures = [pool.submit(_run, e) for e in matched_entries]
for future in concurrent.futures.as_completed(futures):
entry, result = future.result()
completed += 1
if result is not None:
key = f"{entry['old_page']}->{entry['new_page']}"
pair_diffs[key] = result
tu = result.get('token_usage') or {}
for k in aggregate_tokens:
aggregate_tokens[k] += tu.get(k, 0)
if progress_callback:
try:
progress_callback(completed, total_pairs)
except Exception:
pass
# Tally up the deltas
pages_added = sum(1 for e in alignment if e['status'] == 'added')
pages_removed = sum(1 for e in alignment if e['status'] == 'removed')
pages_matched = sum(1 for e in alignment if e['status'] == 'matched')
pages_modified = sum(
1 for d in pair_diffs.values()
if d['diff'].get('differences_found') and d['diff'].get('severity') != 'none'
)
pages_unchanged = pages_matched - pages_modified
severity_counts = {'high': 0, 'medium': 0, 'low': 0, 'none': 0}
for d in pair_diffs.values():
sev = d['diff'].get('severity') or 'none'
if sev in severity_counts:
severity_counts[sev] += 1
# Each formatting-change finding counts as an additional medium entry,
# so a page with N findings contributes N+1 mediums (the +1 from the
# base severity already counted above, N more from the findings).
fmt_findings = d['diff'].get('formatting_changes') or []
if fmt_findings:
# The base severity was already bumped to >= medium in _run when
# findings exist; here we add the additional findings minus the
# one already accounted for.
extra = max(0, len(fmt_findings) - 1)
severity_counts['medium'] += extra
return {
'alignment': alignment,
'pair_diffs': pair_diffs,
'totals': {
'old_page_count': len(old_pages),
'new_page_count': len(new_pages),
'pages_matched': pages_matched,
'pages_added': pages_added,
'pages_removed': pages_removed,
'pages_modified': pages_modified,
'pages_unchanged': pages_unchanged,
'severity_counts': severity_counts,
},
'token_usage': aggregate_tokens,
}
# ─────────────────────────────────────────────────────────────────────────────
# Top-level orchestrator (called from /api/document/start_diff)
# ─────────────────────────────────────────────────────────────────────────────
def run_document_diff_analysis(
*,
old_pdf_path: str,
new_pdf_path: str,
old_filename: str,
new_filename: str,
profile_config,
profile_id: str,
progress_tracker: Dict,
session_id: str,
ingest_pdf_fn,
call_gemini_vision_fn,
pages_output_dir_old: str,
pages_output_dir_new: str,
page_limit: int = 200,
parallel_pairs: int = PARALLEL_PAIRS,
) -> Dict:
"""Full diff pipeline: ingest both PDFs → align → page-pair vision diff."""
from datetime import datetime
progress_tracker[session_id].update({
'stage': 'ingesting_old',
'percentage': 2,
'current_check_display': f'Rendering old version ({old_filename})...',
})
def _old_progress(p, t):
progress_tracker[session_id].update({
'percentage': 2 + (p / t) * 8,
'current_check_display': f'Old version: page {p}/{t}',
})
old_ingest = ingest_pdf_fn(
old_pdf_path, pages_output_dir_old,
page_limit=page_limit, progress_callback=_old_progress,
)
progress_tracker[session_id].update({
'stage': 'ingesting_new',
'percentage': 10,
'current_check_display': f'Rendering new version ({new_filename})...',
})
def _new_progress(p, t):
progress_tracker[session_id].update({
'percentage': 10 + (p / t) * 8,
'current_check_display': f'New version: page {p}/{t}',
})
new_ingest = ingest_pdf_fn(
new_pdf_path, pages_output_dir_new,
page_limit=page_limit, progress_callback=_new_progress,
)
progress_tracker[session_id].update({
'stage': 'aligning_pages',
'percentage': 18,
'current_check_display': 'Aligning pages between versions...',
})
def _diff_progress(completed, total):
if total <= 0:
return
progress_tracker[session_id].update({
'stage': 'page_pair_diff',
'percentage': 20 + (completed / total) * 75,
'completed_checks': completed,
'total_checks': total,
'current_check_display': f'Diffing page pair {completed}/{total}',
})
diff_result = run_page_pair_diff(
old_ingest=old_ingest,
new_ingest=new_ingest,
call_gemini_vision_fn=call_gemini_vision_fn,
progress_callback=_diff_progress,
parallel_pairs=parallel_pairs,
)
progress_tracker[session_id].update({
'stage': 'aggregating',
'percentage': 96,
'current_check_display': 'Compiling diff report...',
})
overall_score, grade = _diff_score(diff_result['totals'])
return {
'mode': 'document_diff',
'profile_id': profile_id,
'profile_name': profile_config.name,
'old_pdf': {
'filename': old_filename,
'page_count': old_ingest['page_count'],
'pages_processed': old_ingest['pages_processed'],
'truncated': old_ingest['truncated'],
},
'new_pdf': {
'filename': new_filename,
'page_count': new_ingest['page_count'],
'pages_processed': new_ingest['pages_processed'],
'truncated': new_ingest['truncated'],
},
'alignment': diff_result['alignment'],
'pair_diffs': diff_result['pair_diffs'],
'totals': diff_result['totals'],
'token_usage': diff_result['token_usage'],
'document_summary': {
'overall_score': overall_score,
'grade': grade,
},
'old_pages_meta': [
{'page_num': p['page_num'], 'fonts_used': p.get('fonts_used', []),
'image_path': p.get('image_path')}
for p in old_ingest.get('pages', [])
],
'new_pages_meta': [
{'page_num': p['page_num'], 'fonts_used': p.get('fonts_used', []),
'image_path': p.get('image_path')}
for p in new_ingest.get('pages', [])
],
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
}
def _diff_score(totals: Dict) -> Tuple[float, str]:
"""Convert diff totals into an informational score.
A diff doesn't really pass/fail the way a QC check does — it's a
*report*. We score as: 100 if no diffs found, scaling down with
severity. Grade is informational ("Clean diff" / "Changes detected").
"""
high = totals['severity_counts']['high']
medium = totals['severity_counts']['medium']
low = totals['severity_counts']['low']
pages_added = totals['pages_added']
pages_removed = totals['pages_removed']
structural_changes = pages_added + pages_removed
if (high + medium + low + structural_changes) == 0:
return 100.0, 'Identical'
# 100 - (10 per high) - (3 per medium) - (1 per low) - (5 per structural)
score = max(0.0, 100.0 - 10 * high - 3 * medium - 1 * low - 5 * structural_changes)
if high > 0 or structural_changes > 2:
grade = 'Major changes'
elif medium > 0 or structural_changes > 0:
grade = 'Notable changes'
else:
grade = 'Minor changes'
return round(score, 2), grade