If the deterministic formatting comparator raises on any single page-pair (e.g. unexpected span shape from a future PyMuPDF version), degrade to zero formatting findings for that pair instead of aborting the whole 52-page diff run. Logged for visibility. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
561 lines
21 KiB
Python
561 lines
21 KiB
Python
"""Old-vs-new PDF diff engine for AXA Policy Document Diff.
|
||
|
||
Two-stage pipeline:
|
||
|
||
1. Page alignment — fuzzy text matching (difflib.SequenceMatcher) between
|
||
old.pages and new.pages. Each old page either pairs with a new page
|
||
(above similarity threshold) or is marked as removed; new pages with
|
||
no old counterpart are marked as added.
|
||
|
||
2. Vision LLM page-pair diff — for each aligned pair, send both rendered
|
||
PNGs to Gemini with a structured prompt asking what changed. Output
|
||
is parsed JSON: added/removed/moved/style-changes plus a severity tag.
|
||
|
||
Cost shape: ~1 LLM call per aligned page-pair. For an 80-page policy that's
|
||
~$0.40-0.80 with Gemini 2.5 Pro. Pairs run in parallel via ThreadPoolExecutor
|
||
(max 8 concurrent — conservative, room to tune).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import concurrent.futures
|
||
import json
|
||
import re
|
||
from difflib import SequenceMatcher
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
from PIL import Image
|
||
|
||
from document_mode.formatting_diff import compute_formatting_diff
|
||
|
||
|
||
# Similarity threshold for considering two pages "the same page modified"
|
||
# vs "an inserted/removed page". Tuned for policy docs where page-level text
|
||
# fingerprint is highly distinctive (section headers + body para). 0.4 is
|
||
# permissive enough to handle small-paragraph rewrites; below that we
|
||
# treat the page as inserted/removed rather than modified.
|
||
SIMILARITY_THRESHOLD = 0.4
|
||
|
||
# Window — how many positions ahead/behind to scan for the best new-page
|
||
# match for each old page. Avoids O(N²) blowup on long docs while still
|
||
# accommodating moderate page-shift caused by inserts/removes.
|
||
ALIGNMENT_WINDOW = 8
|
||
|
||
# Max images sent to vision LLM in parallel
|
||
PARALLEL_PAIRS = 8
|
||
|
||
|
||
def _fingerprint(raw_text: str, length: int = 1000) -> str:
|
||
"""Normalise text for similarity comparison. Lowercased, whitespace
|
||
collapsed, truncated to first N chars (page header + first paragraph
|
||
is usually distinctive enough)."""
|
||
if not raw_text:
|
||
return ''
|
||
norm = re.sub(r'\s+', ' ', raw_text.lower()).strip()
|
||
return norm[:length]
|
||
|
||
|
||
def _text_similarity(a: str, b: str) -> float:
|
||
"""0.0–1.0 similarity ratio between two normalised page texts."""
|
||
if not a or not b:
|
||
return 0.0
|
||
return SequenceMatcher(None, a, b).ratio()
|
||
|
||
|
||
def align_pages(old_pages: List[Dict], new_pages: List[Dict]) -> List[Dict]:
|
||
"""Greedy page alignment with windowed lookahead.
|
||
|
||
Returns a list of alignment entries describing how old/new pages map
|
||
onto each other:
|
||
|
||
[
|
||
{'old_page': 1, 'new_page': 1, 'similarity': 0.99, 'status': 'matched'},
|
||
{'old_page': null, 'new_page': 5, 'status': 'added'},
|
||
{'old_page': 47, 'new_page': null, 'status': 'removed'},
|
||
...
|
||
]
|
||
|
||
Algorithm:
|
||
• Walk old pages 1..N. For each, scan new pages within
|
||
[last_matched_new + 1, last_matched_new + 1 + WINDOW] for the best
|
||
text-similarity match.
|
||
• If best match ≥ threshold: pair them and advance both cursors.
|
||
• If best match < threshold: mark old page as 'removed', advance only
|
||
the old cursor.
|
||
• Any new pages skipped over by the cursor are marked as 'added'.
|
||
"""
|
||
old_fps = [_fingerprint(p.get('raw_text') or '') for p in old_pages]
|
||
new_fps = [_fingerprint(p.get('raw_text') or '') for p in new_pages]
|
||
|
||
alignment: List[Dict] = []
|
||
new_cursor = 0 # Next unmatched new page index
|
||
new_consumed = set()
|
||
|
||
for old_idx, old_fp in enumerate(old_fps):
|
||
best_score = -1.0
|
||
best_new_idx = -1
|
||
|
||
# Search forward from new_cursor up to WINDOW pages ahead
|
||
scan_end = min(len(new_fps), new_cursor + ALIGNMENT_WINDOW + 1)
|
||
for j in range(new_cursor, scan_end):
|
||
if j in new_consumed:
|
||
continue
|
||
score = _text_similarity(old_fp, new_fps[j])
|
||
if score > best_score:
|
||
best_score = score
|
||
best_new_idx = j
|
||
|
||
if best_score >= SIMILARITY_THRESHOLD:
|
||
# Any new pages between new_cursor and best_new_idx are inserts
|
||
for k in range(new_cursor, best_new_idx):
|
||
if k not in new_consumed:
|
||
alignment.append({
|
||
'old_page': None,
|
||
'new_page': new_pages[k]['page_num'],
|
||
'similarity': None,
|
||
'status': 'added',
|
||
})
|
||
alignment.append({
|
||
'old_page': old_pages[old_idx]['page_num'],
|
||
'new_page': new_pages[best_new_idx]['page_num'],
|
||
'similarity': round(best_score, 3),
|
||
'status': 'matched',
|
||
})
|
||
new_consumed.add(best_new_idx)
|
||
new_cursor = best_new_idx + 1
|
||
else:
|
||
# No good match found — old page was removed
|
||
alignment.append({
|
||
'old_page': old_pages[old_idx]['page_num'],
|
||
'new_page': None,
|
||
'similarity': round(best_score, 3) if best_score >= 0 else None,
|
||
'status': 'removed',
|
||
})
|
||
|
||
# Any remaining unconsumed new pages are inserts at the end
|
||
for j in range(new_cursor, len(new_fps)):
|
||
if j not in new_consumed:
|
||
alignment.append({
|
||
'old_page': None,
|
||
'new_page': new_pages[j]['page_num'],
|
||
'similarity': None,
|
||
'status': 'added',
|
||
})
|
||
|
||
return alignment
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Vision LLM page-pair diff
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
_DIFF_PROMPT = """You are a quality-control reviewer comparing two versions of the same insurance policy document page. The first image is the OLD version. The second image is the NEW version.
|
||
|
||
Your job: identify every meaningful difference between the two pages. Be specific — quote the actual text where you can.
|
||
|
||
Respond with ONLY a JSON object in this exact schema (no markdown fences, no commentary):
|
||
|
||
{
|
||
"differences_found": true|false,
|
||
"added": ["specific text or element added in NEW that wasn't in OLD"],
|
||
"removed": ["specific text or element removed in NEW that was in OLD"],
|
||
"modified": ["specific text or element changed in wording, formatting, bolding, color, or font"],
|
||
"moved": ["element repositioned (e.g. blue box moved from top-right to bottom-left)"],
|
||
"style_changes": ["color, font, size, bold/italic, layout shift not covered above"],
|
||
"severity": "high|medium|low|none",
|
||
"summary": "one-sentence overview of what changed on this page"
|
||
}
|
||
|
||
Rules:
|
||
- "high" severity = content changes that affect cover/exclusions/customer-facing terms (added paragraphs, removed sections, definition changes)
|
||
- "medium" = formatting changes that affect readability or compliance (un-bolded defined terms, moved key elements)
|
||
- "low" = cosmetic only (slight color tweak, kerning adjustment)
|
||
- "none" = pages are visually identical
|
||
- If unsure whether something is a difference, INCLUDE it — better false positive than missed defect
|
||
- Empty arrays are OK if a category has no findings
|
||
- Always return all fields, even if empty"""
|
||
|
||
|
||
def _parse_diff_response(text: str) -> Dict:
|
||
"""Extract the JSON object from the LLM response. Handles cases where
|
||
the model wraps it in ```json fences despite instructions, and falls
|
||
back to a structured-error result if parsing fails."""
|
||
if not text:
|
||
return _empty_diff('Empty response')
|
||
|
||
# Strip code fences if present
|
||
cleaned = text.strip()
|
||
if cleaned.startswith('```'):
|
||
cleaned = re.sub(r'^```[a-z]*\n?', '', cleaned)
|
||
cleaned = re.sub(r'\n?```\s*$', '', cleaned)
|
||
cleaned = cleaned.strip()
|
||
|
||
# Find the first { and matching last }
|
||
first = cleaned.find('{')
|
||
last = cleaned.rfind('}')
|
||
if first == -1 or last == -1 or last <= first:
|
||
return _empty_diff(f'No JSON object in response: {text[:200]}')
|
||
|
||
try:
|
||
data = json.loads(cleaned[first:last + 1])
|
||
except json.JSONDecodeError as e:
|
||
return _empty_diff(f'JSON parse failed: {e}; raw: {text[:200]}')
|
||
|
||
return {
|
||
'differences_found': bool(data.get('differences_found', False)),
|
||
'added': data.get('added') or [],
|
||
'removed': data.get('removed') or [],
|
||
'modified': data.get('modified') or [],
|
||
'moved': data.get('moved') or [],
|
||
'style_changes': data.get('style_changes') or [],
|
||
'severity': data.get('severity') or 'none',
|
||
'summary': data.get('summary') or '',
|
||
}
|
||
|
||
|
||
def _empty_diff(error: Optional[str] = None) -> Dict:
|
||
return {
|
||
'differences_found': False,
|
||
'added': [],
|
||
'removed': [],
|
||
'modified': [],
|
||
'moved': [],
|
||
'style_changes': [],
|
||
'severity': 'none',
|
||
'summary': '',
|
||
'error': error,
|
||
}
|
||
|
||
|
||
def _diff_one_pair(
|
||
old_page: Dict,
|
||
new_page: Dict,
|
||
call_gemini_vision_fn,
|
||
model_version: Optional[str] = None,
|
||
) -> Dict:
|
||
"""Run vision LLM on a single page-pair. Returns diff dict + token usage.
|
||
|
||
Wraps call_gemini_vision so the dispatcher doesn't have to know the
|
||
LLM-call signature.
|
||
"""
|
||
try:
|
||
old_img = Image.open(old_page['image_path']).convert('RGB')
|
||
new_img = Image.open(new_page['image_path']).convert('RGB')
|
||
except Exception as e:
|
||
return {
|
||
'old_page': old_page['page_num'],
|
||
'new_page': new_page['page_num'],
|
||
'diff': _empty_diff(f'Image load failed: {e}'),
|
||
'token_usage': {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0},
|
||
}
|
||
|
||
try:
|
||
response_text, token_usage = call_gemini_vision_fn(
|
||
_DIFF_PROMPT, old_img, new_img, model_version=model_version
|
||
)
|
||
except Exception as e:
|
||
return {
|
||
'old_page': old_page['page_num'],
|
||
'new_page': new_page['page_num'],
|
||
'diff': _empty_diff(f'LLM call failed: {e}'),
|
||
'token_usage': {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0},
|
||
}
|
||
|
||
diff = _parse_diff_response(response_text)
|
||
return {
|
||
'old_page': old_page['page_num'],
|
||
'new_page': new_page['page_num'],
|
||
'diff': diff,
|
||
'token_usage': token_usage,
|
||
}
|
||
|
||
|
||
def run_page_pair_diff(
|
||
*,
|
||
old_ingest: Dict,
|
||
new_ingest: Dict,
|
||
call_gemini_vision_fn,
|
||
progress_callback=None,
|
||
model_version: Optional[str] = None,
|
||
parallel_pairs: int = PARALLEL_PAIRS,
|
||
) -> Dict:
|
||
"""Top-level entrypoint. Aligns pages, then diffs each matched pair via
|
||
vision LLM in parallel. Returns:
|
||
|
||
{
|
||
'alignment': [...],
|
||
'pair_diffs': {pair_key: diff_result, ...},
|
||
'totals': {pages_added, pages_removed, pages_matched, ...},
|
||
'token_usage': {prompt_tokens, completion_tokens, total_tokens},
|
||
}
|
||
"""
|
||
old_pages = old_ingest.get('pages') or []
|
||
new_pages = new_ingest.get('pages') or []
|
||
|
||
alignment = align_pages(old_pages, new_pages)
|
||
|
||
# Index pages by page_num for fast lookup in the diff loop
|
||
old_by_num = {p['page_num']: p for p in old_pages}
|
||
new_by_num = {p['page_num']: p for p in new_pages}
|
||
|
||
# Build diff tasks for matched pairs only
|
||
matched_entries = [e for e in alignment if e['status'] == 'matched']
|
||
total_pairs = len(matched_entries)
|
||
|
||
pair_diffs: Dict[str, Dict] = {}
|
||
aggregate_tokens = {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}
|
||
completed = 0
|
||
|
||
def _run(entry):
|
||
old_p = old_by_num.get(entry['old_page'])
|
||
new_p = new_by_num.get(entry['new_page'])
|
||
if not old_p or not new_p or not old_p.get('image_path') or not new_p.get('image_path'):
|
||
return entry, None
|
||
result = _diff_one_pair(old_p, new_p, call_gemini_vision_fn, model_version)
|
||
|
||
# Deterministic formatting diff — runs alongside the LLM diff.
|
||
# Guard so a single bad span on one page doesn't abort the whole run.
|
||
try:
|
||
fmt = compute_formatting_diff(
|
||
old_p.get('spans') or [],
|
||
new_p.get('spans') or [],
|
||
old_p['page_num'],
|
||
new_p['page_num'],
|
||
)
|
||
except Exception as fmt_err:
|
||
print(f" [formatting_diff] page {old_p['page_num']}->{new_p['page_num']} failed: {fmt_err}")
|
||
fmt = {'formatting_changes': [], 'finding_count': 0}
|
||
diff = result.setdefault('diff', {})
|
||
diff['formatting_changes'] = fmt['formatting_changes']
|
||
if fmt['finding_count'] > 0:
|
||
# If the LLM saw the page as identical but the deterministic
|
||
# layer found typographic flips, we still need the report to
|
||
# render the pair as "has changes".
|
||
diff['differences_found'] = True
|
||
# Each aggregated finding contributes one medium severity entry.
|
||
# Bump the pair's overall severity to at least 'medium' so the
|
||
# pair-card pill reflects the finding count.
|
||
if diff.get('severity') in (None, 'none'):
|
||
diff['severity'] = 'medium'
|
||
return entry, result
|
||
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_pairs) as pool:
|
||
futures = [pool.submit(_run, e) for e in matched_entries]
|
||
for future in concurrent.futures.as_completed(futures):
|
||
entry, result = future.result()
|
||
completed += 1
|
||
if result is not None:
|
||
key = f"{entry['old_page']}->{entry['new_page']}"
|
||
pair_diffs[key] = result
|
||
tu = result.get('token_usage') or {}
|
||
for k in aggregate_tokens:
|
||
aggregate_tokens[k] += tu.get(k, 0)
|
||
if progress_callback:
|
||
try:
|
||
progress_callback(completed, total_pairs)
|
||
except Exception:
|
||
pass
|
||
|
||
# Tally up the deltas
|
||
pages_added = sum(1 for e in alignment if e['status'] == 'added')
|
||
pages_removed = sum(1 for e in alignment if e['status'] == 'removed')
|
||
pages_matched = sum(1 for e in alignment if e['status'] == 'matched')
|
||
pages_modified = sum(
|
||
1 for d in pair_diffs.values()
|
||
if d['diff'].get('differences_found') and d['diff'].get('severity') != 'none'
|
||
)
|
||
pages_unchanged = pages_matched - pages_modified
|
||
|
||
severity_counts = {'high': 0, 'medium': 0, 'low': 0, 'none': 0}
|
||
for d in pair_diffs.values():
|
||
sev = d['diff'].get('severity') or 'none'
|
||
if sev in severity_counts:
|
||
severity_counts[sev] += 1
|
||
# Each formatting-change finding counts as an additional medium entry,
|
||
# so a page with N findings contributes N+1 mediums (the +1 from the
|
||
# base severity already counted above, N more from the findings).
|
||
fmt_findings = d['diff'].get('formatting_changes') or []
|
||
if fmt_findings:
|
||
# The base severity was already bumped to >= medium in _run when
|
||
# findings exist; here we add the additional findings minus the
|
||
# one already accounted for.
|
||
extra = max(0, len(fmt_findings) - 1)
|
||
severity_counts['medium'] += extra
|
||
|
||
return {
|
||
'alignment': alignment,
|
||
'pair_diffs': pair_diffs,
|
||
'totals': {
|
||
'old_page_count': len(old_pages),
|
||
'new_page_count': len(new_pages),
|
||
'pages_matched': pages_matched,
|
||
'pages_added': pages_added,
|
||
'pages_removed': pages_removed,
|
||
'pages_modified': pages_modified,
|
||
'pages_unchanged': pages_unchanged,
|
||
'severity_counts': severity_counts,
|
||
},
|
||
'token_usage': aggregate_tokens,
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Top-level orchestrator (called from /api/document/start_diff)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def run_document_diff_analysis(
|
||
*,
|
||
old_pdf_path: str,
|
||
new_pdf_path: str,
|
||
old_filename: str,
|
||
new_filename: str,
|
||
profile_config,
|
||
profile_id: str,
|
||
progress_tracker: Dict,
|
||
session_id: str,
|
||
ingest_pdf_fn,
|
||
call_gemini_vision_fn,
|
||
pages_output_dir_old: str,
|
||
pages_output_dir_new: str,
|
||
page_limit: int = 200,
|
||
parallel_pairs: int = PARALLEL_PAIRS,
|
||
) -> Dict:
|
||
"""Full diff pipeline: ingest both PDFs → align → page-pair vision diff."""
|
||
from datetime import datetime
|
||
|
||
progress_tracker[session_id].update({
|
||
'stage': 'ingesting_old',
|
||
'percentage': 2,
|
||
'current_check_display': f'Rendering old version ({old_filename})...',
|
||
})
|
||
|
||
def _old_progress(p, t):
|
||
progress_tracker[session_id].update({
|
||
'percentage': 2 + (p / t) * 8,
|
||
'current_check_display': f'Old version: page {p}/{t}',
|
||
})
|
||
|
||
old_ingest = ingest_pdf_fn(
|
||
old_pdf_path, pages_output_dir_old,
|
||
page_limit=page_limit, progress_callback=_old_progress,
|
||
)
|
||
|
||
progress_tracker[session_id].update({
|
||
'stage': 'ingesting_new',
|
||
'percentage': 10,
|
||
'current_check_display': f'Rendering new version ({new_filename})...',
|
||
})
|
||
|
||
def _new_progress(p, t):
|
||
progress_tracker[session_id].update({
|
||
'percentage': 10 + (p / t) * 8,
|
||
'current_check_display': f'New version: page {p}/{t}',
|
||
})
|
||
|
||
new_ingest = ingest_pdf_fn(
|
||
new_pdf_path, pages_output_dir_new,
|
||
page_limit=page_limit, progress_callback=_new_progress,
|
||
)
|
||
|
||
progress_tracker[session_id].update({
|
||
'stage': 'aligning_pages',
|
||
'percentage': 18,
|
||
'current_check_display': 'Aligning pages between versions...',
|
||
})
|
||
|
||
def _diff_progress(completed, total):
|
||
if total <= 0:
|
||
return
|
||
progress_tracker[session_id].update({
|
||
'stage': 'page_pair_diff',
|
||
'percentage': 20 + (completed / total) * 75,
|
||
'completed_checks': completed,
|
||
'total_checks': total,
|
||
'current_check_display': f'Diffing page pair {completed}/{total}',
|
||
})
|
||
|
||
diff_result = run_page_pair_diff(
|
||
old_ingest=old_ingest,
|
||
new_ingest=new_ingest,
|
||
call_gemini_vision_fn=call_gemini_vision_fn,
|
||
progress_callback=_diff_progress,
|
||
parallel_pairs=parallel_pairs,
|
||
)
|
||
|
||
progress_tracker[session_id].update({
|
||
'stage': 'aggregating',
|
||
'percentage': 96,
|
||
'current_check_display': 'Compiling diff report...',
|
||
})
|
||
|
||
overall_score, grade = _diff_score(diff_result['totals'])
|
||
|
||
return {
|
||
'mode': 'document_diff',
|
||
'profile_id': profile_id,
|
||
'profile_name': profile_config.name,
|
||
'old_pdf': {
|
||
'filename': old_filename,
|
||
'page_count': old_ingest['page_count'],
|
||
'pages_processed': old_ingest['pages_processed'],
|
||
'truncated': old_ingest['truncated'],
|
||
},
|
||
'new_pdf': {
|
||
'filename': new_filename,
|
||
'page_count': new_ingest['page_count'],
|
||
'pages_processed': new_ingest['pages_processed'],
|
||
'truncated': new_ingest['truncated'],
|
||
},
|
||
'alignment': diff_result['alignment'],
|
||
'pair_diffs': diff_result['pair_diffs'],
|
||
'totals': diff_result['totals'],
|
||
'token_usage': diff_result['token_usage'],
|
||
'document_summary': {
|
||
'overall_score': overall_score,
|
||
'grade': grade,
|
||
},
|
||
'old_pages_meta': [
|
||
{'page_num': p['page_num'], 'fonts_used': p.get('fonts_used', []),
|
||
'image_path': p.get('image_path')}
|
||
for p in old_ingest.get('pages', [])
|
||
],
|
||
'new_pages_meta': [
|
||
{'page_num': p['page_num'], 'fonts_used': p.get('fonts_used', []),
|
||
'image_path': p.get('image_path')}
|
||
for p in new_ingest.get('pages', [])
|
||
],
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
}
|
||
|
||
|
||
def _diff_score(totals: Dict) -> Tuple[float, str]:
|
||
"""Convert diff totals into an informational score.
|
||
|
||
A diff doesn't really pass/fail the way a QC check does — it's a
|
||
*report*. We score as: 100 if no diffs found, scaling down with
|
||
severity. Grade is informational ("Clean diff" / "Changes detected").
|
||
"""
|
||
high = totals['severity_counts']['high']
|
||
medium = totals['severity_counts']['medium']
|
||
low = totals['severity_counts']['low']
|
||
pages_added = totals['pages_added']
|
||
pages_removed = totals['pages_removed']
|
||
|
||
structural_changes = pages_added + pages_removed
|
||
if (high + medium + low + structural_changes) == 0:
|
||
return 100.0, 'Identical'
|
||
|
||
# 100 - (10 per high) - (3 per medium) - (1 per low) - (5 per structural)
|
||
score = max(0.0, 100.0 - 10 * high - 3 * medium - 1 * low - 5 * structural_changes)
|
||
|
||
if high > 0 or structural_changes > 2:
|
||
grade = 'Major changes'
|
||
elif medium > 0 or structural_changes > 0:
|
||
grade = 'Notable changes'
|
||
else:
|
||
grade = 'Minor changes'
|
||
|
||
return round(score, 2), grade
|
||
|