"""Document-mode dispatcher. Scope-aware routing. Each check declares its scope in the profile JSON; the dispatcher then runs: • document → checks.py registry function over the full ingest result • targeted → same registry function, but with scope_args.pages resolved to specific page numbers (e.g. "last", "first", [1,2]) • page_sample → existing batch dispatcher on N evenly-spaced page images • page_each → existing batch dispatcher on every page image (Phase-1 legacy; unused by AXA profile after refactor) • page_pair → reserved for Phase 3 old-vs-new diff (not yet implemented) Document-scope checks bypass the LLM pipeline entirely (deterministic, $0). Page-level checks plug into `process_checks_in_batches()` exactly as before. """ from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Callable, Dict, List, Optional from . import checks as doc_checks # Max concurrent page-level LLM calls within a single check, used by Stage 3c # (page_sample) and Stage 3d (page_each). Was sequential; that pinned a 4-page # × 7-check Boots PPack run at ~15 min. Bump if larger docs / paid-tier rate # limits make it safe; keep modest to stay well under Gemini's per-key quota. _PAGE_PARALLEL_WORKERS = 4 def _grade(overall_score: float) -> str: """Same Pass/Fail rule as single-asset mode: avg per-check ≥ 6 = Pass.""" avg_individual = overall_score / 10 return 'Pass' if avg_individual >= 6 else 'Fail' def _score_class(score: float) -> str: if score >= 8: return 'good' if score >= 6: return 'ok' return 'bad' def _resolve_scope(profile_config, check_name: str) -> tuple: """Return (scope, scope_args) for a check. Falls back to the registry's declared scope, then to 'page_each' for legacy compat. Profile-level scope/scope_args overrides whatever the registry declares. """ cfg = profile_config.checks.get(check_name) if cfg and cfg.scope: return cfg.scope, cfg.scope_args or {} registry_entry = doc_checks.get_check(check_name) if registry_entry: return registry_entry['scope'], (cfg.scope_args if cfg else None) or {} # Legacy: existing image-based checks default to running on every page return 'page_each', {} def _run_document_scope(check_name: str, ingest_result: Dict, scope_args: Dict) -> Dict: """Invoke a registered document-scope check function.""" entry = doc_checks.get_check(check_name) if not entry: return { 'check_name': check_name, 'scope': 'document', 'score': 0.0, 'pass': False, 'summary': f"Unknown document-scope check '{check_name}'.", 'findings': {}, 'response': '', } try: return entry['fn'](ingest_result, scope_args) except Exception as e: return { 'check_name': check_name, 'scope': 'document', 'score': 0.0, 'pass': False, 'summary': f"Check raised {type(e).__name__}: {e}", 'findings': {'error': str(e)}, 'response': str(e), } def _evenly_spaced(total: int, n: int) -> List[int]: """Return n 1-indexed page numbers evenly spaced across [1, total].""" if total <= 0 or n <= 0: return [] if n >= total: return list(range(1, total + 1)) step = total / n return sorted({int(round(i * step)) + 1 for i in range(n)} & set(range(1, total + 1))) def run_document_analysis( *, pdf_path: str, profile_config, profile_id: str, profile_weights: Dict[str, float], enabled_checks: List[str], qc_apps: Dict, brand_db, analysis_reference_asset: Optional[str], media_plan_context: Optional[str], ocr_context: Optional[str], progress_tracker: Dict, session_id: str, process_checks_in_batches: Callable, ingest_pdf_fn: Callable, pages_output_dir: str, page_limit: int = 200, ) -> Dict: """Run scope-aware document-mode QC. See module docstring for routing.""" # ── Stage 1: ingest ────────────────────────────────────────────────── progress_tracker[session_id].update({ 'stage': 'ingesting_pdf', 'percentage': 2, 'current_check_display': 'Rendering PDF pages...', }) def _ingest_progress(page_num, total): pct = 2 + (page_num / total) * 8 progress_tracker[session_id].update({ 'percentage': pct, 'current_check_display': f'Rendering page {page_num} of {total}', }) ingest_result = ingest_pdf_fn( pdf_path, pages_output_dir, page_limit=page_limit, progress_callback=_ingest_progress, ) pages = ingest_result['pages'] pages_processed = ingest_result['pages_processed'] page_count = ingest_result['page_count'] truncated = ingest_result['truncated'] if pages_processed == 0: return { 'mode': 'document', 'profile_id': profile_id, 'profile_name': profile_config.name, 'page_count': page_count, 'pages_processed': 0, 'truncated': truncated, 'pages': [], 'check_results': {}, 'document_summary': { 'overall_score': 0, 'grade': 'Fail', 'check_summaries': {}, 'error': 'No pages could be rendered from the supplied PDF.', }, 'ingest_metadata': {'fonts_inventory': []}, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } # ── Stage 2: classify each enabled check by scope ───────────────────── scope_buckets: Dict[str, List[tuple]] = { 'document': [], 'targeted': [], 'page_sample': [], 'page_each': [], 'page_pair': [], } for check_name in enabled_checks: scope, scope_args = _resolve_scope(profile_config, check_name) scope_buckets.setdefault(scope, []).append((check_name, scope_args)) progress_tracker[session_id].update({ 'stage': 'analysing', 'percentage': 12, 'total_checks': len(enabled_checks), 'completed_checks': 0, 'total_pages': pages_processed, 'current_page': 0, }) # ── Stage 3a: document-scope checks (deterministic, fast) ────────────── check_results: Dict[str, Dict] = {} completed = 0 for check_name, scope_args in scope_buckets['document']: progress_tracker[session_id].update({ 'current_check_display': f'Running {check_name}...', }) check_results[check_name] = _run_document_scope(check_name, ingest_result, scope_args) completed += 1 progress_tracker[session_id].update({ 'completed_checks': completed, 'percentage': 12 + (completed / len(enabled_checks)) * 80, }) # ── Stage 3b: targeted checks (specific pages) ──────────────────────── for check_name, scope_args in scope_buckets['targeted']: progress_tracker[session_id].update({ 'current_check_display': f'Running {check_name} on targeted pages...', }) check_results[check_name] = _run_document_scope(check_name, ingest_result, scope_args) completed += 1 progress_tracker[session_id].update({ 'completed_checks': completed, 'percentage': 12 + (completed / len(enabled_checks)) * 80, }) # ── Page-type map (shared by 3c page_sample and 3d page_each) ────────── # For profiles that don't tag pages (e.g. AXA), every page is 'artwork', # so the page-type-aware aggregation in Stage 3d falls through cleanly. page_type_map = {p['page_num']: p.get('page_type', 'artwork') for p in pages} artwork_page_nums = {pn for pn, pt in page_type_map.items() if pt == 'artwork'} # ── Per-page dispatch helper (used by 3c and 3d in parallel) ─────────── # process_checks_in_batches is already reentrant — asset-mode runs it # under its own ThreadPoolExecutor — so it's safe to call concurrently # from a pool. progress_tracker writes are GIL-safe and racy by design # (visual only). page_level_results writes happen from the main thread # after future.result(), so no races on that dict either. def _run_check_on_page(check_name: str, page: Dict): page_num = page['page_num'] try: page_check_results = process_checks_in_batches( enabled_checks=[check_name], qc_apps=qc_apps, profile_config=profile_config, profile_weights=profile_weights, file_path=page['image_path'], analysis_reference_asset=analysis_reference_asset, brand_db=brand_db, progress_tracker=progress_tracker, session_id=session_id, batch_size=15, media_plan_context=media_plan_context, ocr_context=ocr_context, ) result_for_page = page_check_results.get(check_name, {}) except Exception as e: result_for_page = { 'check_name': check_name, 'score': 0.0, 'pass': False, 'response': f'Check raised {type(e).__name__}: {e}', 'findings': {'error': str(e)}, } result_for_page['page_type'] = page_type_map.get(page_num, 'artwork') return page_num, result_for_page # ── Stage 3c: page-sample (LLM, sampled pages) ──────────────────────── page_level_results: Dict[str, Dict[int, Dict]] = {} # check → page_num → result sample_buckets = scope_buckets['page_sample'] if sample_buckets: for check_name, scope_args in sample_buckets: n = (scope_args or {}).get('sample_size', 8) page_nums = _evenly_spaced(pages_processed, n) eligible = [pages[pn - 1] for pn in page_nums if pages[pn - 1].get('image_path')] page_level_results[check_name] = {} progress_tracker[session_id].update({ 'current_check_display': f'{check_name} (sampling {len(eligible)} pages)...', }) pages_done = 0 with ThreadPoolExecutor(max_workers=_PAGE_PARALLEL_WORKERS) as pool: futures = [pool.submit(_run_check_on_page, check_name, p) for p in eligible] for fut in as_completed(futures): pn, result = fut.result() page_level_results[check_name][pn] = result pages_done += 1 progress_tracker[session_id].update({ 'current_check_display': f'{check_name}: {pages_done} of {len(eligible)} pages', }) # Aggregate the sampled results into the doc-level entry page_scores = {p: (r.get('score') or 0) for p, r in page_level_results[check_name].items()} scores = list(page_scores.values()) avg = round(sum(scores) / len(scores), 2) if scores else 0.0 check_results[check_name] = { 'check_name': check_name, 'scope': 'page_sample', 'score': avg, 'pass': avg >= 6, 'summary': f'{check_name} sampled across {len(scores)} pages: avg {avg}, min {min(scores) if scores else 0}, max {max(scores) if scores else 0}', 'findings': { 'pages_sampled': sorted(page_scores.keys()), 'page_scores': page_scores, 'failing_pages': sorted([p for p, s in page_scores.items() if s < 6]), }, 'response': '\n'.join( f"Page {p}: score {s}\n{(page_level_results[check_name][p].get('response') or '')[:1500]}" for p, s in page_scores.items() ), } completed += 1 progress_tracker[session_id].update({ 'completed_checks': completed, 'percentage': 12 + (completed / len(enabled_checks)) * 80, }) # ── Stage 3d: page_each — run check on every page in the document ────── # Page-type-aware: results from non-artwork pages (cover/checklist/palette/ # notes) are surfaced for visibility but excluded from the per-check # average that drives the headline score & grade. This implements the # strict-grade exemption requested for Boots Production Packs without # changing AXA-style profiles (which don't tag pages → all pages count). if scope_buckets['page_each']: for check_name, _scope_args in scope_buckets['page_each']: page_level_results.setdefault(check_name, {}) eligible_pages = [p for p in pages if p.get('image_path')] progress_tracker[session_id].update({ 'current_check_display': f'{check_name} across {len(eligible_pages)} pages...', 'current_page': 0, }) pages_done = 0 with ThreadPoolExecutor(max_workers=_PAGE_PARALLEL_WORKERS) as pool: futures = [pool.submit(_run_check_on_page, check_name, p) for p in eligible_pages] for fut in as_completed(futures): pn, result = fut.result() page_level_results[check_name][pn] = result pages_done += 1 progress_tracker[session_id].update({ 'current_check_display': f'{check_name}: {pages_done} of {len(eligible_pages)} pages', }) page_scores = {p: (r.get('score') or 0) for p, r in page_level_results[check_name].items()} artwork_scores = {p: s for p, s in page_scores.items() if p in artwork_page_nums} non_artwork_scores = {p: s for p, s in page_scores.items() if p not in artwork_page_nums} # Headline score = average of artwork-page scores. If a profile # has no artwork pages at all (extreme edge case), fall back to # all pages so we don't return a 0-score Fail. scoring_pool = artwork_scores if artwork_scores else page_scores scores = list(scoring_pool.values()) avg = round(sum(scores) / len(scores), 2) if scores else 0.0 # Per-page response excerpts are captured in findings so the # report renderer can show a per-page card without needing access # to the doc-level page_level_results dict. page_responses = { p: ((page_level_results[check_name][p].get('response') or '')[:1500]) for p in page_scores.keys() } check_results[check_name] = { 'check_name': check_name, 'scope': 'page_each', 'score': avg, 'pass': avg >= 6, 'summary': ( f'{check_name} ran on {len(page_scores)} pages ' f'({len(artwork_scores)} artwork, {len(non_artwork_scores)} informational). ' f'Artwork avg {avg}.' ), 'findings': { 'page_scores': page_scores, 'artwork_page_scores': artwork_scores, 'informational_page_scores': non_artwork_scores, 'failing_artwork_pages': sorted([p for p, s in artwork_scores.items() if s < 6]), 'page_types': page_type_map, 'page_responses': page_responses, }, 'response': '\n'.join( f"Page {p} [{page_type_map.get(p, 'artwork')}]: {s}\n" f"{page_responses[p]}" for p, s in sorted(page_scores.items()) ), } completed += 1 progress_tracker[session_id].update({ 'completed_checks': completed, 'percentage': 12 + (completed / len(enabled_checks)) * 80, }) # ── Stage 4: aggregate document score ───────────────────────────────── progress_tracker[session_id].update({ 'stage': 'aggregating', 'percentage': 96, 'current_check_display': 'Aggregating findings...', }) total_weighted = 0.0 total_weight = 0.0 check_summaries = {} for check_name in enabled_checks: weight = profile_weights.get(check_name, 1.0) result = check_results.get(check_name) or {'score': 0.0} total_weighted += result['score'] * weight total_weight += weight check_summaries[check_name] = { 'score': result.get('score', 0), 'pass': result.get('pass', False), 'scope': result.get('scope', 'unknown'), 'summary': result.get('summary', ''), 'weight': weight, 'findings': result.get('findings', {}), } if total_weight >= 10.0: overall_score = round(min(total_weighted, 100), 2) elif total_weight > 0: overall_score = round(min((total_weighted / total_weight) * 10, 100), 2) else: overall_score = 0.0 # ── Strict-grade override (artwork pages only) ───────────────────────── # Profiles with strict_grade=True (e.g. Boots PPack) Fail if ANY check # scored <6 on ANY artwork page. Cover/checklist/palette/notes pages do # not contribute to this check. AXA-style profiles leave strict_grade # off and behave as before. strict_grade = bool(getattr(profile_config, 'strict_grade', False)) strict_violations = [] if strict_grade: for check_name, per_page in page_level_results.items(): for page_num, page_result in per_page.items(): if page_result.get('page_type', 'artwork') != 'artwork': continue page_score = page_result.get('score') or 0 if page_score < 6: strict_violations.append({ 'check': check_name, 'page': page_num, 'score': page_score, }) if strict_grade and strict_violations: grade = 'Fail' else: grade = _grade(overall_score) fonts_inventory = sorted({ font for page in pages for font in (page.get('fonts_used') or []) }) return { 'mode': 'document', 'profile_id': profile_id, 'profile_name': profile_config.name, 'page_count': page_count, 'pages_processed': pages_processed, 'truncated': truncated, 'pages': [ { 'page_num': p['page_num'], 'page_type': p.get('page_type', 'artwork'), 'fonts_used': p.get('fonts_used', []), 'image_path': p.get('image_path'), } for p in pages ], 'check_results': check_results, 'page_level_results': page_level_results, 'document_summary': { 'overall_score': overall_score, 'grade': grade, 'check_summaries': check_summaries, 'total_weight': total_weight, 'strict_grade': strict_grade, 'strict_violations': strict_violations, }, 'ingest_metadata': { 'fonts_inventory': fonts_inventory, }, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), }