ai_qc/backend/document_mode/dispatcher.py
nickviljoen 1c5dd980d4 perf(document-mode): parallelize per-page check dispatch in stages 3c/3d
A 4-page Boots PPack run (7 page-scoped checks) was taking ~15 min
because the dispatcher processed pages sequentially within each
check — 28 Gemini calls in a single file. Asset-mode's
ThreadPoolExecutor parallelism was bypassed because doc-mode called
process_checks_in_batches once per page in a loop.

Wrap the per-page dispatch in both Stage 3c (page_sample) and Stage
3d (page_each) with a ThreadPoolExecutor (max_workers=4). Extract
the per-page work into a single nested helper used by both stages,
which also tags each result with page_type so the existing artwork
vs informational aggregation in Stage 3d keeps working. Aggregation
logic, scoring, strict-grade override, and report shape are all
unchanged.

process_checks_in_batches is already reentrant (asset-mode uses it
under its own internal ThreadPoolExecutor), so concurrent calls are
safe. Progress-tracker writes intentionally tolerate races (visual
only). Per-page exceptions are caught inside the helper so one bad
page doesn't kill the doc — it just records a score-0 result.

Expected: 15 min → ~3-4 min on the same 4-page PDF. Needs wall-time
confirmation on dev with a real run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 18:14:27 +02:00

469 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Document-mode dispatcher.
Scope-aware routing. Each check declares its scope in the profile JSON; the
dispatcher then runs:
• document → checks.py registry function over the full ingest result
• targeted → same registry function, but with scope_args.pages resolved
to specific page numbers (e.g. "last", "first", [1,2])
• page_sample → existing batch dispatcher on N evenly-spaced page images
• page_each → existing batch dispatcher on every page image (Phase-1
legacy; unused by AXA profile after refactor)
• page_pair → reserved for Phase 3 old-vs-new diff (not yet implemented)
Document-scope checks bypass the LLM pipeline entirely (deterministic, $0).
Page-level checks plug into `process_checks_in_batches()` exactly as before.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Callable, Dict, List, Optional
from . import checks as doc_checks
# Max concurrent page-level LLM calls within a single check, used by Stage 3c
# (page_sample) and Stage 3d (page_each). Was sequential; that pinned a 4-page
# × 7-check Boots PPack run at ~15 min. Bump if larger docs / paid-tier rate
# limits make it safe; keep modest to stay well under Gemini's per-key quota.
_PAGE_PARALLEL_WORKERS = 4
def _grade(overall_score: float) -> str:
"""Same Pass/Fail rule as single-asset mode: avg per-check ≥ 6 = Pass."""
avg_individual = overall_score / 10
return 'Pass' if avg_individual >= 6 else 'Fail'
def _score_class(score: float) -> str:
if score >= 8:
return 'good'
if score >= 6:
return 'ok'
return 'bad'
def _resolve_scope(profile_config, check_name: str) -> tuple:
"""Return (scope, scope_args) for a check. Falls back to the registry's
declared scope, then to 'page_each' for legacy compat. Profile-level
scope/scope_args overrides whatever the registry declares.
"""
cfg = profile_config.checks.get(check_name)
if cfg and cfg.scope:
return cfg.scope, cfg.scope_args or {}
registry_entry = doc_checks.get_check(check_name)
if registry_entry:
return registry_entry['scope'], (cfg.scope_args if cfg else None) or {}
# Legacy: existing image-based checks default to running on every page
return 'page_each', {}
def _run_document_scope(check_name: str, ingest_result: Dict, scope_args: Dict) -> Dict:
"""Invoke a registered document-scope check function."""
entry = doc_checks.get_check(check_name)
if not entry:
return {
'check_name': check_name,
'scope': 'document',
'score': 0.0,
'pass': False,
'summary': f"Unknown document-scope check '{check_name}'.",
'findings': {},
'response': '',
}
try:
return entry['fn'](ingest_result, scope_args)
except Exception as e:
return {
'check_name': check_name,
'scope': 'document',
'score': 0.0,
'pass': False,
'summary': f"Check raised {type(e).__name__}: {e}",
'findings': {'error': str(e)},
'response': str(e),
}
def _evenly_spaced(total: int, n: int) -> List[int]:
"""Return n 1-indexed page numbers evenly spaced across [1, total]."""
if total <= 0 or n <= 0:
return []
if n >= total:
return list(range(1, total + 1))
step = total / n
return sorted({int(round(i * step)) + 1 for i in range(n)} & set(range(1, total + 1)))
def run_document_analysis(
*,
pdf_path: str,
profile_config,
profile_id: str,
profile_weights: Dict[str, float],
enabled_checks: List[str],
qc_apps: Dict,
brand_db,
analysis_reference_asset: Optional[str],
media_plan_context: Optional[str],
ocr_context: Optional[str],
progress_tracker: Dict,
session_id: str,
process_checks_in_batches: Callable,
ingest_pdf_fn: Callable,
pages_output_dir: str,
page_limit: int = 200,
) -> Dict:
"""Run scope-aware document-mode QC. See module docstring for routing."""
# ── Stage 1: ingest ──────────────────────────────────────────────────
progress_tracker[session_id].update({
'stage': 'ingesting_pdf',
'percentage': 2,
'current_check_display': 'Rendering PDF pages...',
})
def _ingest_progress(page_num, total):
pct = 2 + (page_num / total) * 8
progress_tracker[session_id].update({
'percentage': pct,
'current_check_display': f'Rendering page {page_num} of {total}',
})
ingest_result = ingest_pdf_fn(
pdf_path,
pages_output_dir,
page_limit=page_limit,
progress_callback=_ingest_progress,
)
pages = ingest_result['pages']
pages_processed = ingest_result['pages_processed']
page_count = ingest_result['page_count']
truncated = ingest_result['truncated']
if pages_processed == 0:
return {
'mode': 'document',
'profile_id': profile_id,
'profile_name': profile_config.name,
'page_count': page_count,
'pages_processed': 0,
'truncated': truncated,
'pages': [],
'check_results': {},
'document_summary': {
'overall_score': 0,
'grade': 'Fail',
'check_summaries': {},
'error': 'No pages could be rendered from the supplied PDF.',
},
'ingest_metadata': {'fonts_inventory': []},
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
}
# ── Stage 2: classify each enabled check by scope ─────────────────────
scope_buckets: Dict[str, List[tuple]] = {
'document': [], 'targeted': [], 'page_sample': [], 'page_each': [], 'page_pair': [],
}
for check_name in enabled_checks:
scope, scope_args = _resolve_scope(profile_config, check_name)
scope_buckets.setdefault(scope, []).append((check_name, scope_args))
progress_tracker[session_id].update({
'stage': 'analysing',
'percentage': 12,
'total_checks': len(enabled_checks),
'completed_checks': 0,
'total_pages': pages_processed,
'current_page': 0,
})
# ── Stage 3a: document-scope checks (deterministic, fast) ──────────────
check_results: Dict[str, Dict] = {}
completed = 0
for check_name, scope_args in scope_buckets['document']:
progress_tracker[session_id].update({
'current_check_display': f'Running {check_name}...',
})
check_results[check_name] = _run_document_scope(check_name, ingest_result, scope_args)
completed += 1
progress_tracker[session_id].update({
'completed_checks': completed,
'percentage': 12 + (completed / len(enabled_checks)) * 80,
})
# ── Stage 3b: targeted checks (specific pages) ────────────────────────
for check_name, scope_args in scope_buckets['targeted']:
progress_tracker[session_id].update({
'current_check_display': f'Running {check_name} on targeted pages...',
})
check_results[check_name] = _run_document_scope(check_name, ingest_result, scope_args)
completed += 1
progress_tracker[session_id].update({
'completed_checks': completed,
'percentage': 12 + (completed / len(enabled_checks)) * 80,
})
# ── Page-type map (shared by 3c page_sample and 3d page_each) ──────────
# For profiles that don't tag pages (e.g. AXA), every page is 'artwork',
# so the page-type-aware aggregation in Stage 3d falls through cleanly.
page_type_map = {p['page_num']: p.get('page_type', 'artwork') for p in pages}
artwork_page_nums = {pn for pn, pt in page_type_map.items() if pt == 'artwork'}
# ── Per-page dispatch helper (used by 3c and 3d in parallel) ───────────
# process_checks_in_batches is already reentrant — asset-mode runs it
# under its own ThreadPoolExecutor — so it's safe to call concurrently
# from a pool. progress_tracker writes are GIL-safe and racy by design
# (visual only). page_level_results writes happen from the main thread
# after future.result(), so no races on that dict either.
def _run_check_on_page(check_name: str, page: Dict):
page_num = page['page_num']
try:
page_check_results = process_checks_in_batches(
enabled_checks=[check_name],
qc_apps=qc_apps,
profile_config=profile_config,
profile_weights=profile_weights,
file_path=page['image_path'],
analysis_reference_asset=analysis_reference_asset,
brand_db=brand_db,
progress_tracker=progress_tracker,
session_id=session_id,
batch_size=15,
media_plan_context=media_plan_context,
ocr_context=ocr_context,
)
result_for_page = page_check_results.get(check_name, {})
except Exception as e:
result_for_page = {
'check_name': check_name,
'score': 0.0,
'pass': False,
'response': f'Check raised {type(e).__name__}: {e}',
'findings': {'error': str(e)},
}
result_for_page['page_type'] = page_type_map.get(page_num, 'artwork')
return page_num, result_for_page
# ── Stage 3c: page-sample (LLM, sampled pages) ────────────────────────
page_level_results: Dict[str, Dict[int, Dict]] = {} # check → page_num → result
sample_buckets = scope_buckets['page_sample']
if sample_buckets:
for check_name, scope_args in sample_buckets:
n = (scope_args or {}).get('sample_size', 8)
page_nums = _evenly_spaced(pages_processed, n)
eligible = [pages[pn - 1] for pn in page_nums if pages[pn - 1].get('image_path')]
page_level_results[check_name] = {}
progress_tracker[session_id].update({
'current_check_display': f'{check_name} (sampling {len(eligible)} pages)...',
})
pages_done = 0
with ThreadPoolExecutor(max_workers=_PAGE_PARALLEL_WORKERS) as pool:
futures = [pool.submit(_run_check_on_page, check_name, p) for p in eligible]
for fut in as_completed(futures):
pn, result = fut.result()
page_level_results[check_name][pn] = result
pages_done += 1
progress_tracker[session_id].update({
'current_check_display': f'{check_name}: {pages_done} of {len(eligible)} pages',
})
# Aggregate the sampled results into the doc-level entry
page_scores = {p: (r.get('score') or 0) for p, r in page_level_results[check_name].items()}
scores = list(page_scores.values())
avg = round(sum(scores) / len(scores), 2) if scores else 0.0
check_results[check_name] = {
'check_name': check_name,
'scope': 'page_sample',
'score': avg,
'pass': avg >= 6,
'summary': f'{check_name} sampled across {len(scores)} pages: avg {avg}, min {min(scores) if scores else 0}, max {max(scores) if scores else 0}',
'findings': {
'pages_sampled': sorted(page_scores.keys()),
'page_scores': page_scores,
'failing_pages': sorted([p for p, s in page_scores.items() if s < 6]),
},
'response': '\n'.join(
f"Page {p}: score {s}\n{(page_level_results[check_name][p].get('response') or '')[:1500]}"
for p, s in page_scores.items()
),
}
completed += 1
progress_tracker[session_id].update({
'completed_checks': completed,
'percentage': 12 + (completed / len(enabled_checks)) * 80,
})
# ── Stage 3d: page_each — run check on every page in the document ──────
# Page-type-aware: results from non-artwork pages (cover/checklist/palette/
# notes) are surfaced for visibility but excluded from the per-check
# average that drives the headline score & grade. This implements the
# strict-grade exemption requested for Boots Production Packs without
# changing AXA-style profiles (which don't tag pages → all pages count).
if scope_buckets['page_each']:
for check_name, _scope_args in scope_buckets['page_each']:
page_level_results.setdefault(check_name, {})
eligible_pages = [p for p in pages if p.get('image_path')]
progress_tracker[session_id].update({
'current_check_display': f'{check_name} across {len(eligible_pages)} pages...',
'current_page': 0,
})
pages_done = 0
with ThreadPoolExecutor(max_workers=_PAGE_PARALLEL_WORKERS) as pool:
futures = [pool.submit(_run_check_on_page, check_name, p) for p in eligible_pages]
for fut in as_completed(futures):
pn, result = fut.result()
page_level_results[check_name][pn] = result
pages_done += 1
progress_tracker[session_id].update({
'current_check_display': f'{check_name}: {pages_done} of {len(eligible_pages)} pages',
})
page_scores = {p: (r.get('score') or 0) for p, r in page_level_results[check_name].items()}
artwork_scores = {p: s for p, s in page_scores.items() if p in artwork_page_nums}
non_artwork_scores = {p: s for p, s in page_scores.items() if p not in artwork_page_nums}
# Headline score = average of artwork-page scores. If a profile
# has no artwork pages at all (extreme edge case), fall back to
# all pages so we don't return a 0-score Fail.
scoring_pool = artwork_scores if artwork_scores else page_scores
scores = list(scoring_pool.values())
avg = round(sum(scores) / len(scores), 2) if scores else 0.0
# Per-page response excerpts are captured in findings so the
# report renderer can show a per-page card without needing access
# to the doc-level page_level_results dict.
page_responses = {
p: ((page_level_results[check_name][p].get('response') or '')[:1500])
for p in page_scores.keys()
}
check_results[check_name] = {
'check_name': check_name,
'scope': 'page_each',
'score': avg,
'pass': avg >= 6,
'summary': (
f'{check_name} ran on {len(page_scores)} pages '
f'({len(artwork_scores)} artwork, {len(non_artwork_scores)} informational). '
f'Artwork avg {avg}.'
),
'findings': {
'page_scores': page_scores,
'artwork_page_scores': artwork_scores,
'informational_page_scores': non_artwork_scores,
'failing_artwork_pages': sorted([p for p, s in artwork_scores.items() if s < 6]),
'page_types': page_type_map,
'page_responses': page_responses,
},
'response': '\n'.join(
f"Page {p} [{page_type_map.get(p, 'artwork')}]: {s}\n"
f"{page_responses[p]}"
for p, s in sorted(page_scores.items())
),
}
completed += 1
progress_tracker[session_id].update({
'completed_checks': completed,
'percentage': 12 + (completed / len(enabled_checks)) * 80,
})
# ── Stage 4: aggregate document score ─────────────────────────────────
progress_tracker[session_id].update({
'stage': 'aggregating',
'percentage': 96,
'current_check_display': 'Aggregating findings...',
})
total_weighted = 0.0
total_weight = 0.0
check_summaries = {}
for check_name in enabled_checks:
weight = profile_weights.get(check_name, 1.0)
result = check_results.get(check_name) or {'score': 0.0}
total_weighted += result['score'] * weight
total_weight += weight
check_summaries[check_name] = {
'score': result.get('score', 0),
'pass': result.get('pass', False),
'scope': result.get('scope', 'unknown'),
'summary': result.get('summary', ''),
'weight': weight,
'findings': result.get('findings', {}),
}
if total_weight >= 10.0:
overall_score = round(min(total_weighted, 100), 2)
elif total_weight > 0:
overall_score = round(min((total_weighted / total_weight) * 10, 100), 2)
else:
overall_score = 0.0
# ── Strict-grade override (artwork pages only) ─────────────────────────
# Profiles with strict_grade=True (e.g. Boots PPack) Fail if ANY check
# scored <6 on ANY artwork page. Cover/checklist/palette/notes pages do
# not contribute to this check. AXA-style profiles leave strict_grade
# off and behave as before.
strict_grade = bool(getattr(profile_config, 'strict_grade', False))
strict_violations = []
if strict_grade:
for check_name, per_page in page_level_results.items():
for page_num, page_result in per_page.items():
if page_result.get('page_type', 'artwork') != 'artwork':
continue
page_score = page_result.get('score') or 0
if page_score < 6:
strict_violations.append({
'check': check_name,
'page': page_num,
'score': page_score,
})
if strict_grade and strict_violations:
grade = 'Fail'
else:
grade = _grade(overall_score)
fonts_inventory = sorted({
font for page in pages for font in (page.get('fonts_used') or [])
})
return {
'mode': 'document',
'profile_id': profile_id,
'profile_name': profile_config.name,
'page_count': page_count,
'pages_processed': pages_processed,
'truncated': truncated,
'pages': [
{
'page_num': p['page_num'],
'page_type': p.get('page_type', 'artwork'),
'fonts_used': p.get('fonts_used', []),
'image_path': p.get('image_path'),
}
for p in pages
],
'check_results': check_results,
'page_level_results': page_level_results,
'document_summary': {
'overall_score': overall_score,
'grade': grade,
'check_summaries': check_summaries,
'total_weight': total_weight,
'strict_grade': strict_grade,
'strict_violations': strict_violations,
},
'ingest_metadata': {
'fonts_inventory': fonts_inventory,
},
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
}