A 4-page Boots PPack run (7 page-scoped checks) was taking ~15 min because the dispatcher processed pages sequentially within each check — 28 Gemini calls in a single file. Asset-mode's ThreadPoolExecutor parallelism was bypassed because doc-mode called process_checks_in_batches once per page in a loop. Wrap the per-page dispatch in both Stage 3c (page_sample) and Stage 3d (page_each) with a ThreadPoolExecutor (max_workers=4). Extract the per-page work into a single nested helper used by both stages, which also tags each result with page_type so the existing artwork vs informational aggregation in Stage 3d keeps working. Aggregation logic, scoring, strict-grade override, and report shape are all unchanged. process_checks_in_batches is already reentrant (asset-mode uses it under its own internal ThreadPoolExecutor), so concurrent calls are safe. Progress-tracker writes intentionally tolerate races (visual only). Per-page exceptions are caught inside the helper so one bad page doesn't kill the doc — it just records a score-0 result. Expected: 15 min → ~3-4 min on the same 4-page PDF. Needs wall-time confirmation on dev with a real run. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
469 lines
20 KiB
Python
469 lines
20 KiB
Python
"""Document-mode dispatcher.
|
||
|
||
Scope-aware routing. Each check declares its scope in the profile JSON; the
|
||
dispatcher then runs:
|
||
|
||
• document → checks.py registry function over the full ingest result
|
||
• targeted → same registry function, but with scope_args.pages resolved
|
||
to specific page numbers (e.g. "last", "first", [1,2])
|
||
• page_sample → existing batch dispatcher on N evenly-spaced page images
|
||
• page_each → existing batch dispatcher on every page image (Phase-1
|
||
legacy; unused by AXA profile after refactor)
|
||
• page_pair → reserved for Phase 3 old-vs-new diff (not yet implemented)
|
||
|
||
Document-scope checks bypass the LLM pipeline entirely (deterministic, $0).
|
||
Page-level checks plug into `process_checks_in_batches()` exactly as before.
|
||
"""
|
||
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime
|
||
from typing import Callable, Dict, List, Optional
|
||
|
||
from . import checks as doc_checks
|
||
|
||
|
||
# Max concurrent page-level LLM calls within a single check, used by Stage 3c
|
||
# (page_sample) and Stage 3d (page_each). Was sequential; that pinned a 4-page
|
||
# × 7-check Boots PPack run at ~15 min. Bump if larger docs / paid-tier rate
|
||
# limits make it safe; keep modest to stay well under Gemini's per-key quota.
|
||
_PAGE_PARALLEL_WORKERS = 4
|
||
|
||
|
||
def _grade(overall_score: float) -> str:
|
||
"""Same Pass/Fail rule as single-asset mode: avg per-check ≥ 6 = Pass."""
|
||
avg_individual = overall_score / 10
|
||
return 'Pass' if avg_individual >= 6 else 'Fail'
|
||
|
||
|
||
def _score_class(score: float) -> str:
|
||
if score >= 8:
|
||
return 'good'
|
||
if score >= 6:
|
||
return 'ok'
|
||
return 'bad'
|
||
|
||
|
||
def _resolve_scope(profile_config, check_name: str) -> tuple:
|
||
"""Return (scope, scope_args) for a check. Falls back to the registry's
|
||
declared scope, then to 'page_each' for legacy compat. Profile-level
|
||
scope/scope_args overrides whatever the registry declares.
|
||
"""
|
||
cfg = profile_config.checks.get(check_name)
|
||
if cfg and cfg.scope:
|
||
return cfg.scope, cfg.scope_args or {}
|
||
|
||
registry_entry = doc_checks.get_check(check_name)
|
||
if registry_entry:
|
||
return registry_entry['scope'], (cfg.scope_args if cfg else None) or {}
|
||
|
||
# Legacy: existing image-based checks default to running on every page
|
||
return 'page_each', {}
|
||
|
||
|
||
def _run_document_scope(check_name: str, ingest_result: Dict, scope_args: Dict) -> Dict:
|
||
"""Invoke a registered document-scope check function."""
|
||
entry = doc_checks.get_check(check_name)
|
||
if not entry:
|
||
return {
|
||
'check_name': check_name,
|
||
'scope': 'document',
|
||
'score': 0.0,
|
||
'pass': False,
|
||
'summary': f"Unknown document-scope check '{check_name}'.",
|
||
'findings': {},
|
||
'response': '',
|
||
}
|
||
try:
|
||
return entry['fn'](ingest_result, scope_args)
|
||
except Exception as e:
|
||
return {
|
||
'check_name': check_name,
|
||
'scope': 'document',
|
||
'score': 0.0,
|
||
'pass': False,
|
||
'summary': f"Check raised {type(e).__name__}: {e}",
|
||
'findings': {'error': str(e)},
|
||
'response': str(e),
|
||
}
|
||
|
||
|
||
def _evenly_spaced(total: int, n: int) -> List[int]:
|
||
"""Return n 1-indexed page numbers evenly spaced across [1, total]."""
|
||
if total <= 0 or n <= 0:
|
||
return []
|
||
if n >= total:
|
||
return list(range(1, total + 1))
|
||
step = total / n
|
||
return sorted({int(round(i * step)) + 1 for i in range(n)} & set(range(1, total + 1)))
|
||
|
||
|
||
def run_document_analysis(
|
||
*,
|
||
pdf_path: str,
|
||
profile_config,
|
||
profile_id: str,
|
||
profile_weights: Dict[str, float],
|
||
enabled_checks: List[str],
|
||
qc_apps: Dict,
|
||
brand_db,
|
||
analysis_reference_asset: Optional[str],
|
||
media_plan_context: Optional[str],
|
||
ocr_context: Optional[str],
|
||
progress_tracker: Dict,
|
||
session_id: str,
|
||
process_checks_in_batches: Callable,
|
||
ingest_pdf_fn: Callable,
|
||
pages_output_dir: str,
|
||
page_limit: int = 200,
|
||
) -> Dict:
|
||
"""Run scope-aware document-mode QC. See module docstring for routing."""
|
||
|
||
# ── Stage 1: ingest ──────────────────────────────────────────────────
|
||
progress_tracker[session_id].update({
|
||
'stage': 'ingesting_pdf',
|
||
'percentage': 2,
|
||
'current_check_display': 'Rendering PDF pages...',
|
||
})
|
||
|
||
def _ingest_progress(page_num, total):
|
||
pct = 2 + (page_num / total) * 8
|
||
progress_tracker[session_id].update({
|
||
'percentage': pct,
|
||
'current_check_display': f'Rendering page {page_num} of {total}',
|
||
})
|
||
|
||
ingest_result = ingest_pdf_fn(
|
||
pdf_path,
|
||
pages_output_dir,
|
||
page_limit=page_limit,
|
||
progress_callback=_ingest_progress,
|
||
)
|
||
|
||
pages = ingest_result['pages']
|
||
pages_processed = ingest_result['pages_processed']
|
||
page_count = ingest_result['page_count']
|
||
truncated = ingest_result['truncated']
|
||
|
||
if pages_processed == 0:
|
||
return {
|
||
'mode': 'document',
|
||
'profile_id': profile_id,
|
||
'profile_name': profile_config.name,
|
||
'page_count': page_count,
|
||
'pages_processed': 0,
|
||
'truncated': truncated,
|
||
'pages': [],
|
||
'check_results': {},
|
||
'document_summary': {
|
||
'overall_score': 0,
|
||
'grade': 'Fail',
|
||
'check_summaries': {},
|
||
'error': 'No pages could be rendered from the supplied PDF.',
|
||
},
|
||
'ingest_metadata': {'fonts_inventory': []},
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
}
|
||
|
||
# ── Stage 2: classify each enabled check by scope ─────────────────────
|
||
scope_buckets: Dict[str, List[tuple]] = {
|
||
'document': [], 'targeted': [], 'page_sample': [], 'page_each': [], 'page_pair': [],
|
||
}
|
||
for check_name in enabled_checks:
|
||
scope, scope_args = _resolve_scope(profile_config, check_name)
|
||
scope_buckets.setdefault(scope, []).append((check_name, scope_args))
|
||
|
||
progress_tracker[session_id].update({
|
||
'stage': 'analysing',
|
||
'percentage': 12,
|
||
'total_checks': len(enabled_checks),
|
||
'completed_checks': 0,
|
||
'total_pages': pages_processed,
|
||
'current_page': 0,
|
||
})
|
||
|
||
# ── Stage 3a: document-scope checks (deterministic, fast) ──────────────
|
||
check_results: Dict[str, Dict] = {}
|
||
completed = 0
|
||
for check_name, scope_args in scope_buckets['document']:
|
||
progress_tracker[session_id].update({
|
||
'current_check_display': f'Running {check_name}...',
|
||
})
|
||
check_results[check_name] = _run_document_scope(check_name, ingest_result, scope_args)
|
||
completed += 1
|
||
progress_tracker[session_id].update({
|
||
'completed_checks': completed,
|
||
'percentage': 12 + (completed / len(enabled_checks)) * 80,
|
||
})
|
||
|
||
# ── Stage 3b: targeted checks (specific pages) ────────────────────────
|
||
for check_name, scope_args in scope_buckets['targeted']:
|
||
progress_tracker[session_id].update({
|
||
'current_check_display': f'Running {check_name} on targeted pages...',
|
||
})
|
||
check_results[check_name] = _run_document_scope(check_name, ingest_result, scope_args)
|
||
completed += 1
|
||
progress_tracker[session_id].update({
|
||
'completed_checks': completed,
|
||
'percentage': 12 + (completed / len(enabled_checks)) * 80,
|
||
})
|
||
|
||
# ── Page-type map (shared by 3c page_sample and 3d page_each) ──────────
|
||
# For profiles that don't tag pages (e.g. AXA), every page is 'artwork',
|
||
# so the page-type-aware aggregation in Stage 3d falls through cleanly.
|
||
page_type_map = {p['page_num']: p.get('page_type', 'artwork') for p in pages}
|
||
artwork_page_nums = {pn for pn, pt in page_type_map.items() if pt == 'artwork'}
|
||
|
||
# ── Per-page dispatch helper (used by 3c and 3d in parallel) ───────────
|
||
# process_checks_in_batches is already reentrant — asset-mode runs it
|
||
# under its own ThreadPoolExecutor — so it's safe to call concurrently
|
||
# from a pool. progress_tracker writes are GIL-safe and racy by design
|
||
# (visual only). page_level_results writes happen from the main thread
|
||
# after future.result(), so no races on that dict either.
|
||
def _run_check_on_page(check_name: str, page: Dict):
|
||
page_num = page['page_num']
|
||
try:
|
||
page_check_results = process_checks_in_batches(
|
||
enabled_checks=[check_name],
|
||
qc_apps=qc_apps,
|
||
profile_config=profile_config,
|
||
profile_weights=profile_weights,
|
||
file_path=page['image_path'],
|
||
analysis_reference_asset=analysis_reference_asset,
|
||
brand_db=brand_db,
|
||
progress_tracker=progress_tracker,
|
||
session_id=session_id,
|
||
batch_size=15,
|
||
media_plan_context=media_plan_context,
|
||
ocr_context=ocr_context,
|
||
)
|
||
result_for_page = page_check_results.get(check_name, {})
|
||
except Exception as e:
|
||
result_for_page = {
|
||
'check_name': check_name,
|
||
'score': 0.0,
|
||
'pass': False,
|
||
'response': f'Check raised {type(e).__name__}: {e}',
|
||
'findings': {'error': str(e)},
|
||
}
|
||
result_for_page['page_type'] = page_type_map.get(page_num, 'artwork')
|
||
return page_num, result_for_page
|
||
|
||
# ── Stage 3c: page-sample (LLM, sampled pages) ────────────────────────
|
||
page_level_results: Dict[str, Dict[int, Dict]] = {} # check → page_num → result
|
||
sample_buckets = scope_buckets['page_sample']
|
||
if sample_buckets:
|
||
for check_name, scope_args in sample_buckets:
|
||
n = (scope_args or {}).get('sample_size', 8)
|
||
page_nums = _evenly_spaced(pages_processed, n)
|
||
eligible = [pages[pn - 1] for pn in page_nums if pages[pn - 1].get('image_path')]
|
||
page_level_results[check_name] = {}
|
||
|
||
progress_tracker[session_id].update({
|
||
'current_check_display': f'{check_name} (sampling {len(eligible)} pages)...',
|
||
})
|
||
|
||
pages_done = 0
|
||
with ThreadPoolExecutor(max_workers=_PAGE_PARALLEL_WORKERS) as pool:
|
||
futures = [pool.submit(_run_check_on_page, check_name, p) for p in eligible]
|
||
for fut in as_completed(futures):
|
||
pn, result = fut.result()
|
||
page_level_results[check_name][pn] = result
|
||
pages_done += 1
|
||
progress_tracker[session_id].update({
|
||
'current_check_display': f'{check_name}: {pages_done} of {len(eligible)} pages',
|
||
})
|
||
|
||
# Aggregate the sampled results into the doc-level entry
|
||
page_scores = {p: (r.get('score') or 0) for p, r in page_level_results[check_name].items()}
|
||
scores = list(page_scores.values())
|
||
avg = round(sum(scores) / len(scores), 2) if scores else 0.0
|
||
check_results[check_name] = {
|
||
'check_name': check_name,
|
||
'scope': 'page_sample',
|
||
'score': avg,
|
||
'pass': avg >= 6,
|
||
'summary': f'{check_name} sampled across {len(scores)} pages: avg {avg}, min {min(scores) if scores else 0}, max {max(scores) if scores else 0}',
|
||
'findings': {
|
||
'pages_sampled': sorted(page_scores.keys()),
|
||
'page_scores': page_scores,
|
||
'failing_pages': sorted([p for p, s in page_scores.items() if s < 6]),
|
||
},
|
||
'response': '\n'.join(
|
||
f"Page {p}: score {s}\n{(page_level_results[check_name][p].get('response') or '')[:1500]}"
|
||
for p, s in page_scores.items()
|
||
),
|
||
}
|
||
completed += 1
|
||
progress_tracker[session_id].update({
|
||
'completed_checks': completed,
|
||
'percentage': 12 + (completed / len(enabled_checks)) * 80,
|
||
})
|
||
|
||
# ── Stage 3d: page_each — run check on every page in the document ──────
|
||
# Page-type-aware: results from non-artwork pages (cover/checklist/palette/
|
||
# notes) are surfaced for visibility but excluded from the per-check
|
||
# average that drives the headline score & grade. This implements the
|
||
# strict-grade exemption requested for Boots Production Packs without
|
||
# changing AXA-style profiles (which don't tag pages → all pages count).
|
||
|
||
if scope_buckets['page_each']:
|
||
for check_name, _scope_args in scope_buckets['page_each']:
|
||
page_level_results.setdefault(check_name, {})
|
||
eligible_pages = [p for p in pages if p.get('image_path')]
|
||
|
||
progress_tracker[session_id].update({
|
||
'current_check_display': f'{check_name} across {len(eligible_pages)} pages...',
|
||
'current_page': 0,
|
||
})
|
||
|
||
pages_done = 0
|
||
with ThreadPoolExecutor(max_workers=_PAGE_PARALLEL_WORKERS) as pool:
|
||
futures = [pool.submit(_run_check_on_page, check_name, p) for p in eligible_pages]
|
||
for fut in as_completed(futures):
|
||
pn, result = fut.result()
|
||
page_level_results[check_name][pn] = result
|
||
pages_done += 1
|
||
progress_tracker[session_id].update({
|
||
'current_check_display': f'{check_name}: {pages_done} of {len(eligible_pages)} pages',
|
||
})
|
||
|
||
page_scores = {p: (r.get('score') or 0) for p, r in page_level_results[check_name].items()}
|
||
artwork_scores = {p: s for p, s in page_scores.items() if p in artwork_page_nums}
|
||
non_artwork_scores = {p: s for p, s in page_scores.items() if p not in artwork_page_nums}
|
||
|
||
# Headline score = average of artwork-page scores. If a profile
|
||
# has no artwork pages at all (extreme edge case), fall back to
|
||
# all pages so we don't return a 0-score Fail.
|
||
scoring_pool = artwork_scores if artwork_scores else page_scores
|
||
scores = list(scoring_pool.values())
|
||
avg = round(sum(scores) / len(scores), 2) if scores else 0.0
|
||
|
||
# Per-page response excerpts are captured in findings so the
|
||
# report renderer can show a per-page card without needing access
|
||
# to the doc-level page_level_results dict.
|
||
page_responses = {
|
||
p: ((page_level_results[check_name][p].get('response') or '')[:1500])
|
||
for p in page_scores.keys()
|
||
}
|
||
|
||
check_results[check_name] = {
|
||
'check_name': check_name,
|
||
'scope': 'page_each',
|
||
'score': avg,
|
||
'pass': avg >= 6,
|
||
'summary': (
|
||
f'{check_name} ran on {len(page_scores)} pages '
|
||
f'({len(artwork_scores)} artwork, {len(non_artwork_scores)} informational). '
|
||
f'Artwork avg {avg}.'
|
||
),
|
||
'findings': {
|
||
'page_scores': page_scores,
|
||
'artwork_page_scores': artwork_scores,
|
||
'informational_page_scores': non_artwork_scores,
|
||
'failing_artwork_pages': sorted([p for p, s in artwork_scores.items() if s < 6]),
|
||
'page_types': page_type_map,
|
||
'page_responses': page_responses,
|
||
},
|
||
'response': '\n'.join(
|
||
f"Page {p} [{page_type_map.get(p, 'artwork')}]: {s}\n"
|
||
f"{page_responses[p]}"
|
||
for p, s in sorted(page_scores.items())
|
||
),
|
||
}
|
||
completed += 1
|
||
progress_tracker[session_id].update({
|
||
'completed_checks': completed,
|
||
'percentage': 12 + (completed / len(enabled_checks)) * 80,
|
||
})
|
||
|
||
# ── Stage 4: aggregate document score ─────────────────────────────────
|
||
progress_tracker[session_id].update({
|
||
'stage': 'aggregating',
|
||
'percentage': 96,
|
||
'current_check_display': 'Aggregating findings...',
|
||
})
|
||
|
||
total_weighted = 0.0
|
||
total_weight = 0.0
|
||
check_summaries = {}
|
||
for check_name in enabled_checks:
|
||
weight = profile_weights.get(check_name, 1.0)
|
||
result = check_results.get(check_name) or {'score': 0.0}
|
||
total_weighted += result['score'] * weight
|
||
total_weight += weight
|
||
check_summaries[check_name] = {
|
||
'score': result.get('score', 0),
|
||
'pass': result.get('pass', False),
|
||
'scope': result.get('scope', 'unknown'),
|
||
'summary': result.get('summary', ''),
|
||
'weight': weight,
|
||
'findings': result.get('findings', {}),
|
||
}
|
||
|
||
if total_weight >= 10.0:
|
||
overall_score = round(min(total_weighted, 100), 2)
|
||
elif total_weight > 0:
|
||
overall_score = round(min((total_weighted / total_weight) * 10, 100), 2)
|
||
else:
|
||
overall_score = 0.0
|
||
|
||
# ── Strict-grade override (artwork pages only) ─────────────────────────
|
||
# Profiles with strict_grade=True (e.g. Boots PPack) Fail if ANY check
|
||
# scored <6 on ANY artwork page. Cover/checklist/palette/notes pages do
|
||
# not contribute to this check. AXA-style profiles leave strict_grade
|
||
# off and behave as before.
|
||
strict_grade = bool(getattr(profile_config, 'strict_grade', False))
|
||
strict_violations = []
|
||
if strict_grade:
|
||
for check_name, per_page in page_level_results.items():
|
||
for page_num, page_result in per_page.items():
|
||
if page_result.get('page_type', 'artwork') != 'artwork':
|
||
continue
|
||
page_score = page_result.get('score') or 0
|
||
if page_score < 6:
|
||
strict_violations.append({
|
||
'check': check_name,
|
||
'page': page_num,
|
||
'score': page_score,
|
||
})
|
||
|
||
if strict_grade and strict_violations:
|
||
grade = 'Fail'
|
||
else:
|
||
grade = _grade(overall_score)
|
||
|
||
fonts_inventory = sorted({
|
||
font for page in pages for font in (page.get('fonts_used') or [])
|
||
})
|
||
|
||
return {
|
||
'mode': 'document',
|
||
'profile_id': profile_id,
|
||
'profile_name': profile_config.name,
|
||
'page_count': page_count,
|
||
'pages_processed': pages_processed,
|
||
'truncated': truncated,
|
||
'pages': [
|
||
{
|
||
'page_num': p['page_num'],
|
||
'page_type': p.get('page_type', 'artwork'),
|
||
'fonts_used': p.get('fonts_used', []),
|
||
'image_path': p.get('image_path'),
|
||
}
|
||
for p in pages
|
||
],
|
||
'check_results': check_results,
|
||
'page_level_results': page_level_results,
|
||
'document_summary': {
|
||
'overall_score': overall_score,
|
||
'grade': grade,
|
||
'check_summaries': check_summaries,
|
||
'total_weight': total_weight,
|
||
'strict_grade': strict_grade,
|
||
'strict_violations': strict_violations,
|
||
},
|
||
'ingest_metadata': {
|
||
'fonts_inventory': fonts_inventory,
|
||
},
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
}
|