"""Old-vs-new PDF diff engine for AXA Policy Document Diff. Two-stage pipeline: 1. Page alignment — fuzzy text matching (difflib.SequenceMatcher) between old.pages and new.pages. Each old page either pairs with a new page (above similarity threshold) or is marked as removed; new pages with no old counterpart are marked as added. 2. Vision LLM page-pair diff — for each aligned pair, send both rendered PNGs to Gemini with a structured prompt asking what changed. Output is parsed JSON: added/removed/moved/style-changes plus a severity tag. Cost shape: ~1 LLM call per aligned page-pair. For an 80-page policy that's ~$0.40-0.80 with Gemini 2.5 Pro. Pairs run in parallel via ThreadPoolExecutor (max 8 concurrent — conservative, room to tune). """ from __future__ import annotations import concurrent.futures import json import re from difflib import SequenceMatcher from typing import Dict, List, Optional, Tuple from PIL import Image from document_mode.formatting_diff import compute_formatting_diff # Similarity threshold for considering two pages "the same page modified" # vs "an inserted/removed page". Tuned for policy docs where page-level text # fingerprint is highly distinctive (section headers + body para). 0.4 is # permissive enough to handle small-paragraph rewrites; below that we # treat the page as inserted/removed rather than modified. SIMILARITY_THRESHOLD = 0.4 # Window — how many positions ahead/behind to scan for the best new-page # match for each old page. Avoids O(N²) blowup on long docs while still # accommodating moderate page-shift caused by inserts/removes. ALIGNMENT_WINDOW = 8 # Max images sent to vision LLM in parallel PARALLEL_PAIRS = 8 def _fingerprint(raw_text: str, length: int = 1000) -> str: """Normalise text for similarity comparison. Lowercased, whitespace collapsed, truncated to first N chars (page header + first paragraph is usually distinctive enough).""" if not raw_text: return '' norm = re.sub(r'\s+', ' ', raw_text.lower()).strip() return norm[:length] def _text_similarity(a: str, b: str) -> float: """0.0–1.0 similarity ratio between two normalised page texts.""" if not a or not b: return 0.0 return SequenceMatcher(None, a, b).ratio() def align_pages(old_pages: List[Dict], new_pages: List[Dict]) -> List[Dict]: """Greedy page alignment with windowed lookahead. Returns a list of alignment entries describing how old/new pages map onto each other: [ {'old_page': 1, 'new_page': 1, 'similarity': 0.99, 'status': 'matched'}, {'old_page': null, 'new_page': 5, 'status': 'added'}, {'old_page': 47, 'new_page': null, 'status': 'removed'}, ... ] Algorithm: • Walk old pages 1..N. For each, scan new pages within [last_matched_new + 1, last_matched_new + 1 + WINDOW] for the best text-similarity match. • If best match ≥ threshold: pair them and advance both cursors. • If best match < threshold: mark old page as 'removed', advance only the old cursor. • Any new pages skipped over by the cursor are marked as 'added'. """ old_fps = [_fingerprint(p.get('raw_text') or '') for p in old_pages] new_fps = [_fingerprint(p.get('raw_text') or '') for p in new_pages] alignment: List[Dict] = [] new_cursor = 0 # Next unmatched new page index new_consumed = set() for old_idx, old_fp in enumerate(old_fps): best_score = -1.0 best_new_idx = -1 # Search forward from new_cursor up to WINDOW pages ahead scan_end = min(len(new_fps), new_cursor + ALIGNMENT_WINDOW + 1) for j in range(new_cursor, scan_end): if j in new_consumed: continue score = _text_similarity(old_fp, new_fps[j]) if score > best_score: best_score = score best_new_idx = j if best_score >= SIMILARITY_THRESHOLD: # Any new pages between new_cursor and best_new_idx are inserts for k in range(new_cursor, best_new_idx): if k not in new_consumed: alignment.append({ 'old_page': None, 'new_page': new_pages[k]['page_num'], 'similarity': None, 'status': 'added', }) alignment.append({ 'old_page': old_pages[old_idx]['page_num'], 'new_page': new_pages[best_new_idx]['page_num'], 'similarity': round(best_score, 3), 'status': 'matched', }) new_consumed.add(best_new_idx) new_cursor = best_new_idx + 1 else: # No good match found — old page was removed alignment.append({ 'old_page': old_pages[old_idx]['page_num'], 'new_page': None, 'similarity': round(best_score, 3) if best_score >= 0 else None, 'status': 'removed', }) # Any remaining unconsumed new pages are inserts at the end for j in range(new_cursor, len(new_fps)): if j not in new_consumed: alignment.append({ 'old_page': None, 'new_page': new_pages[j]['page_num'], 'similarity': None, 'status': 'added', }) return alignment # ───────────────────────────────────────────────────────────────────────────── # Vision LLM page-pair diff # ───────────────────────────────────────────────────────────────────────────── _DIFF_PROMPT = """You are a quality-control reviewer comparing two versions of the same insurance policy document page. The first image is the OLD version. The second image is the NEW version. Your job: identify every meaningful difference between the two pages. Be specific — quote the actual text where you can. Respond with ONLY a JSON object in this exact schema (no markdown fences, no commentary): { "differences_found": true|false, "added": ["specific text or element added in NEW that wasn't in OLD"], "removed": ["specific text or element removed in NEW that was in OLD"], "modified": ["specific text or element changed in wording, formatting, bolding, color, or font"], "moved": ["element repositioned (e.g. blue box moved from top-right to bottom-left)"], "style_changes": ["color, font, size, bold/italic, layout shift not covered above"], "severity": "high|medium|low|none", "summary": "one-sentence overview of what changed on this page" } Rules: - "high" severity = content changes that affect cover/exclusions/customer-facing terms (added paragraphs, removed sections, definition changes) - "medium" = formatting changes that affect readability or compliance (un-bolded defined terms, moved key elements) - "low" = cosmetic only (slight color tweak, kerning adjustment) - "none" = pages are visually identical - If unsure whether something is a difference, INCLUDE it — better false positive than missed defect - Empty arrays are OK if a category has no findings - Always return all fields, even if empty""" def _parse_diff_response(text: str) -> Dict: """Extract the JSON object from the LLM response. Handles cases where the model wraps it in ```json fences despite instructions, and falls back to a structured-error result if parsing fails.""" if not text: return _empty_diff('Empty response') # Strip code fences if present cleaned = text.strip() if cleaned.startswith('```'): cleaned = re.sub(r'^```[a-z]*\n?', '', cleaned) cleaned = re.sub(r'\n?```\s*$', '', cleaned) cleaned = cleaned.strip() # Find the first { and matching last } first = cleaned.find('{') last = cleaned.rfind('}') if first == -1 or last == -1 or last <= first: return _empty_diff(f'No JSON object in response: {text[:200]}') try: data = json.loads(cleaned[first:last + 1]) except json.JSONDecodeError as e: return _empty_diff(f'JSON parse failed: {e}; raw: {text[:200]}') return { 'differences_found': bool(data.get('differences_found', False)), 'added': data.get('added') or [], 'removed': data.get('removed') or [], 'modified': data.get('modified') or [], 'moved': data.get('moved') or [], 'style_changes': data.get('style_changes') or [], 'severity': data.get('severity') or 'none', 'summary': data.get('summary') or '', } def _empty_diff(error: Optional[str] = None) -> Dict: return { 'differences_found': False, 'added': [], 'removed': [], 'modified': [], 'moved': [], 'style_changes': [], 'severity': 'none', 'summary': '', 'error': error, } def _diff_one_pair( old_page: Dict, new_page: Dict, call_gemini_vision_fn, model_version: Optional[str] = None, ) -> Dict: """Run vision LLM on a single page-pair. Returns diff dict + token usage. Wraps call_gemini_vision so the dispatcher doesn't have to know the LLM-call signature. """ try: old_img = Image.open(old_page['image_path']).convert('RGB') new_img = Image.open(new_page['image_path']).convert('RGB') except Exception as e: return { 'old_page': old_page['page_num'], 'new_page': new_page['page_num'], 'diff': _empty_diff(f'Image load failed: {e}'), 'token_usage': {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}, } try: response_text, token_usage = call_gemini_vision_fn( _DIFF_PROMPT, old_img, new_img, model_version=model_version ) except Exception as e: return { 'old_page': old_page['page_num'], 'new_page': new_page['page_num'], 'diff': _empty_diff(f'LLM call failed: {e}'), 'token_usage': {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}, } diff = _parse_diff_response(response_text) return { 'old_page': old_page['page_num'], 'new_page': new_page['page_num'], 'diff': diff, 'token_usage': token_usage, } def run_page_pair_diff( *, old_ingest: Dict, new_ingest: Dict, call_gemini_vision_fn, progress_callback=None, model_version: Optional[str] = None, parallel_pairs: int = PARALLEL_PAIRS, ) -> Dict: """Top-level entrypoint. Aligns pages, then diffs each matched pair via vision LLM in parallel. Returns: { 'alignment': [...], 'pair_diffs': {pair_key: diff_result, ...}, 'totals': {pages_added, pages_removed, pages_matched, ...}, 'token_usage': {prompt_tokens, completion_tokens, total_tokens}, } """ old_pages = old_ingest.get('pages') or [] new_pages = new_ingest.get('pages') or [] alignment = align_pages(old_pages, new_pages) # Index pages by page_num for fast lookup in the diff loop old_by_num = {p['page_num']: p for p in old_pages} new_by_num = {p['page_num']: p for p in new_pages} # Build diff tasks for matched pairs only matched_entries = [e for e in alignment if e['status'] == 'matched'] total_pairs = len(matched_entries) pair_diffs: Dict[str, Dict] = {} aggregate_tokens = {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0} completed = 0 def _run(entry): old_p = old_by_num.get(entry['old_page']) new_p = new_by_num.get(entry['new_page']) if not old_p or not new_p or not old_p.get('image_path') or not new_p.get('image_path'): return entry, None result = _diff_one_pair(old_p, new_p, call_gemini_vision_fn, model_version) # Deterministic formatting diff — runs alongside the LLM diff. # Guard so a single bad span on one page doesn't abort the whole run. try: fmt = compute_formatting_diff( old_p.get('spans') or [], new_p.get('spans') or [], old_p['page_num'], new_p['page_num'], ) except Exception as fmt_err: print(f" [formatting_diff] page {old_p['page_num']}->{new_p['page_num']} failed: {fmt_err}") fmt = {'formatting_changes': [], 'finding_count': 0} diff = result.setdefault('diff', {}) diff['formatting_changes'] = fmt['formatting_changes'] if fmt['finding_count'] > 0: # If the LLM saw the page as identical but the deterministic # layer found typographic flips, we still need the report to # render the pair as "has changes". diff['differences_found'] = True # Each aggregated finding contributes one medium severity entry. # Bump the pair's overall severity to at least 'medium' so the # pair-card pill reflects the finding count. if diff.get('severity') in (None, 'none'): diff['severity'] = 'medium' return entry, result with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_pairs) as pool: futures = [pool.submit(_run, e) for e in matched_entries] for future in concurrent.futures.as_completed(futures): entry, result = future.result() completed += 1 if result is not None: key = f"{entry['old_page']}->{entry['new_page']}" pair_diffs[key] = result tu = result.get('token_usage') or {} for k in aggregate_tokens: aggregate_tokens[k] += tu.get(k, 0) if progress_callback: try: progress_callback(completed, total_pairs) except Exception: pass # Tally up the deltas pages_added = sum(1 for e in alignment if e['status'] == 'added') pages_removed = sum(1 for e in alignment if e['status'] == 'removed') pages_matched = sum(1 for e in alignment if e['status'] == 'matched') pages_modified = sum( 1 for d in pair_diffs.values() if d['diff'].get('differences_found') and d['diff'].get('severity') != 'none' ) pages_unchanged = pages_matched - pages_modified severity_counts = {'high': 0, 'medium': 0, 'low': 0, 'none': 0} for d in pair_diffs.values(): sev = d['diff'].get('severity') or 'none' if sev in severity_counts: severity_counts[sev] += 1 # Each formatting-change finding counts as an additional medium entry, # so a page with N findings contributes N+1 mediums (the +1 from the # base severity already counted above, N more from the findings). fmt_findings = d['diff'].get('formatting_changes') or [] if fmt_findings: # The base severity was already bumped to >= medium in _run when # findings exist; here we add the additional findings minus the # one already accounted for. extra = max(0, len(fmt_findings) - 1) severity_counts['medium'] += extra return { 'alignment': alignment, 'pair_diffs': pair_diffs, 'totals': { 'old_page_count': len(old_pages), 'new_page_count': len(new_pages), 'pages_matched': pages_matched, 'pages_added': pages_added, 'pages_removed': pages_removed, 'pages_modified': pages_modified, 'pages_unchanged': pages_unchanged, 'severity_counts': severity_counts, }, 'token_usage': aggregate_tokens, } # ───────────────────────────────────────────────────────────────────────────── # Top-level orchestrator (called from /api/document/start_diff) # ───────────────────────────────────────────────────────────────────────────── def run_document_diff_analysis( *, old_pdf_path: str, new_pdf_path: str, old_filename: str, new_filename: str, profile_config, profile_id: str, progress_tracker: Dict, session_id: str, ingest_pdf_fn, call_gemini_vision_fn, pages_output_dir_old: str, pages_output_dir_new: str, page_limit: int = 200, parallel_pairs: int = PARALLEL_PAIRS, ) -> Dict: """Full diff pipeline: ingest both PDFs → align → page-pair vision diff.""" from datetime import datetime progress_tracker[session_id].update({ 'stage': 'ingesting_old', 'percentage': 2, 'current_check_display': f'Rendering old version ({old_filename})...', }) def _old_progress(p, t): progress_tracker[session_id].update({ 'percentage': 2 + (p / t) * 8, 'current_check_display': f'Old version: page {p}/{t}', }) old_ingest = ingest_pdf_fn( old_pdf_path, pages_output_dir_old, page_limit=page_limit, progress_callback=_old_progress, ) progress_tracker[session_id].update({ 'stage': 'ingesting_new', 'percentage': 10, 'current_check_display': f'Rendering new version ({new_filename})...', }) def _new_progress(p, t): progress_tracker[session_id].update({ 'percentage': 10 + (p / t) * 8, 'current_check_display': f'New version: page {p}/{t}', }) new_ingest = ingest_pdf_fn( new_pdf_path, pages_output_dir_new, page_limit=page_limit, progress_callback=_new_progress, ) progress_tracker[session_id].update({ 'stage': 'aligning_pages', 'percentage': 18, 'current_check_display': 'Aligning pages between versions...', }) def _diff_progress(completed, total): if total <= 0: return progress_tracker[session_id].update({ 'stage': 'page_pair_diff', 'percentage': 20 + (completed / total) * 75, 'completed_checks': completed, 'total_checks': total, 'current_check_display': f'Diffing page pair {completed}/{total}', }) diff_result = run_page_pair_diff( old_ingest=old_ingest, new_ingest=new_ingest, call_gemini_vision_fn=call_gemini_vision_fn, progress_callback=_diff_progress, parallel_pairs=parallel_pairs, ) progress_tracker[session_id].update({ 'stage': 'aggregating', 'percentage': 96, 'current_check_display': 'Compiling diff report...', }) overall_score, grade = _diff_score(diff_result['totals']) return { 'mode': 'document_diff', 'profile_id': profile_id, 'profile_name': profile_config.name, 'old_pdf': { 'filename': old_filename, 'page_count': old_ingest['page_count'], 'pages_processed': old_ingest['pages_processed'], 'truncated': old_ingest['truncated'], }, 'new_pdf': { 'filename': new_filename, 'page_count': new_ingest['page_count'], 'pages_processed': new_ingest['pages_processed'], 'truncated': new_ingest['truncated'], }, 'alignment': diff_result['alignment'], 'pair_diffs': diff_result['pair_diffs'], 'totals': diff_result['totals'], 'token_usage': diff_result['token_usage'], 'document_summary': { 'overall_score': overall_score, 'grade': grade, }, 'old_pages_meta': [ {'page_num': p['page_num'], 'fonts_used': p.get('fonts_used', []), 'image_path': p.get('image_path')} for p in old_ingest.get('pages', []) ], 'new_pages_meta': [ {'page_num': p['page_num'], 'fonts_used': p.get('fonts_used', []), 'image_path': p.get('image_path')} for p in new_ingest.get('pages', []) ], 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } def _diff_score(totals: Dict) -> Tuple[float, str]: """Convert diff totals into an informational score. A diff doesn't really pass/fail the way a QC check does — it's a *report*. We score as: 100 if no diffs found, scaling down with severity. Grade is informational ("Clean diff" / "Changes detected"). """ high = totals['severity_counts']['high'] medium = totals['severity_counts']['medium'] low = totals['severity_counts']['low'] pages_added = totals['pages_added'] pages_removed = totals['pages_removed'] structural_changes = pages_added + pages_removed if (high + medium + low + structural_changes) == 0: return 100.0, 'Identical' # 100 - (10 per high) - (3 per medium) - (1 per low) - (5 per structural) score = max(0.0, 100.0 - 10 * high - 3 * medium - 1 * low - 5 * structural_changes) if high > 0 or structural_changes > 2: grade = 'Major changes' elif medium > 0 or structural_changes > 0: grade = 'Notable changes' else: grade = 'Minor changes' return round(score, 2), grade