"""PDF accessibility checks aligned to PDF/UA-1. Two layers, applied in order: 1. veraPDF subprocess — full PDF/UA-1 (ISO 14289-1) validation via the Matterhorn Protocol. This is the same protocol PAC uses, so its verdict is the authoritative one when veraPDF is available on the host. When it runs, its result drives the score and pass flag. 2. Deterministic PyMuPDF criteria (C1-C9) — fast surface checks that run regardless. They give the AXA team a quick visual sanity-pass (tagged? language set? fonts embedded?) and are the sole source of truth when veraPDF is not installed. Deterministic criteria: • C1 Tagged PDF — document has a /StructTreeRoot • C2 Marked — /MarkInfo /Marked is true • C3 Title — metadata /Title set and non-empty • C4 Language — document /Lang specified • C5 No password protection — /Encrypt absent or accessibility-friendly • C6 Fonts embedded — every font flagged as embedded • C7 PDF version — 1.5+ recommended • C8 XMP UA-conformance — XMP metadata declares pdfuaid:part • C9 Image alt text — sampled images have /Alt or /ActualText """ from __future__ import annotations import os import re import shutil import subprocess import xml.etree.ElementTree as ET from typing import Dict, List, Optional import fitz # PyMuPDF # Project-local install path for the production server (see vendor dir # under /opt/ai_qc/vendor/verapdf/). Falls back to PATH lookup or # VERAPDF_BIN env var. _VERAPDF_VENDOR_PATH = '/opt/ai_qc/vendor/verapdf/verapdf' _VERAPDF_TIMEOUT_SECONDS = 180 # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _catalog_object(doc: fitz.Document) -> str: """Return the catalog object dump as a string (PyMuPDF returns the PDF dictionary as a text representation we can grep).""" try: return doc.xref_object(doc.pdf_catalog()) except Exception: return '' def _xmp_metadata(doc: fitz.Document) -> str: """Return the XMP metadata stream as a string, or '' if absent.""" try: meta = doc.get_xml_metadata() return meta or '' except Exception: return '' def _criterion(code: str, title: str, passed: bool, note: str = '', detail: Optional[Dict] = None) -> Dict: return { 'code': code, 'title': title, 'passed': passed, 'note': note, 'detail': detail or {}, } # ───────────────────────────────────────────────────────────────────────────── # Criterion implementations # ───────────────────────────────────────────────────────────────────────────── def _check_tagged(doc: fitz.Document) -> Dict: catalog = _catalog_object(doc) has_struct = '/StructTreeRoot' in catalog return _criterion( 'C1', 'Tagged PDF (StructTreeRoot present)', has_struct, 'StructTreeRoot found in catalog.' if has_struct else 'PDF has no structure tree — screen readers will fall back to raw text. PDF/UA fail.', ) def _check_marked(doc: fitz.Document) -> Dict: catalog = _catalog_object(doc) has_markinfo = '/MarkInfo' in catalog # /Marked must be true within /MarkInfo. PyMuPDF dump returns it as a # nested dict; we look for the literal "Marked true" pattern. is_marked = bool(re.search(r'/Marked\s+true', catalog)) if has_markinfo and is_marked: return _criterion('C2', 'Marked content (/MarkInfo /Marked true)', True, '/MarkInfo /Marked = true.') if has_markinfo: return _criterion('C2', 'Marked content (/MarkInfo /Marked true)', False, '/MarkInfo present but /Marked is not true.') return _criterion('C2', 'Marked content (/MarkInfo /Marked true)', False, '/MarkInfo dictionary missing.') def _check_title(doc: fitz.Document) -> Dict: md = doc.metadata or {} title = (md.get('title') or '').strip() if title: return _criterion('C3', 'Document title metadata', True, f'Title: "{title[:80]}"') return _criterion('C3', 'Document title metadata', False, 'Title metadata missing or empty.') def _check_language(doc: fitz.Document) -> Dict: lang = (doc.language or '').strip() if not lang: # Sometimes language is in the catalog but not exposed via doc.language catalog = _catalog_object(doc) m = re.search(r'/Lang\s*\(([^)]+)\)', catalog) or re.search(r'/Lang\s*<([^>]+)>', catalog) if m: lang = m.group(1) if lang: return _criterion('C4', 'Document language (/Lang)', True, f'Language: {lang}') return _criterion('C4', 'Document language (/Lang)', False, '/Lang missing — assistive tech cannot pick a voice/locale.') def _check_no_blocking_encryption(doc: fitz.Document) -> Dict: if doc.is_encrypted and doc.needs_pass: return _criterion('C5', 'No password protection blocking AT', False, 'Document is password-protected — assistive tech cannot read.') return _criterion('C5', 'No password protection blocking AT', True, 'No password block; assistive tech can read.') def _check_font_embedding(doc: fitz.Document) -> Dict: """Walk every page, list every font, flag any not embedded.""" seen: Dict[str, bool] = {} not_embedded: List[str] = [] for i in range(doc.page_count): for f in doc.get_page_fonts(i): # PyMuPDF tuple: (xref, ext, type, basefont, name, encoding, embedded) basefont = f[3] ext = f[1] # '' if not embedded, file extension if embedded embedded = bool(ext) if basefont not in seen: seen[basefont] = embedded if not embedded: not_embedded.append(basefont) total = len(seen) embedded_count = sum(1 for v in seen.values() if v) if total == 0: return _criterion('C6', 'Fonts embedded', True, 'No fonts present.') if not_embedded: return _criterion('C6', 'Fonts embedded', False, f'{len(not_embedded)} of {total} fonts are not embedded.', {'not_embedded': not_embedded, 'total_fonts': total, 'embedded_count': embedded_count}) return _criterion('C6', 'Fonts embedded', True, f'All {total} fonts embedded.', {'total_fonts': total, 'embedded_count': embedded_count}) def _check_pdf_version(doc: fitz.Document) -> Dict: md = doc.metadata or {} fmt = (md.get('format') or '').strip() m = re.search(r'PDF\s+(\d+\.\d+)', fmt) version = m.group(1) if m else None if not version: return _criterion('C7', 'PDF version', False, 'Could not determine PDF version.') try: version_num = float(version) except ValueError: return _criterion('C7', 'PDF version', False, f'Could not parse version: {fmt}') # PDF 1.5+ supports compressed cross-reference streams + most accessibility features if version_num >= 1.5: return _criterion('C7', 'PDF version', True, f'PDF {version} — supports modern tagging features.') return _criterion('C7', 'PDF version', False, f'PDF {version} is older than 1.5 — may not support full accessibility tagging.') def _check_xmp_ua_conformance(doc: fitz.Document) -> Dict: xmp = _xmp_metadata(doc) if not xmp: return _criterion('C8', 'XMP UA conformance declaration', False, 'No XMP metadata stream found.') # PDF/UA-1 conformance is declared via pdfuaid:part = 1 in XMP if re.search(r'pdfuaid:part\s*[>=]\s*[\'"]?1', xmp): return _criterion('C8', 'XMP UA conformance declaration', True, 'XMP declares PDF/UA-1 conformance.') if 'pdfuaid' in xmp: return _criterion('C8', 'XMP UA conformance declaration', False, 'XMP mentions pdfuaid namespace but does not declare PDF/UA-1.') return _criterion('C8', 'XMP UA conformance declaration', False, 'No PDF/UA conformance flag in XMP metadata.') def _check_alt_text_sampling(doc: fitz.Document) -> Dict: """Sample-check the structure tree for /Alt entries when images are present. Heuristic: count images on the first 10 pages, and look for /Alt strings anywhere in the catalog graph. Not a full S→Figure walk, but a useful early signal — a doc with images and zero /Alt entries is almost certainly missing alt text. """ image_count = 0 pages_with_images = 0 for i in range(min(doc.page_count, 30)): imgs = doc.get_page_images(i) if imgs: pages_with_images += 1 image_count += len(imgs) if image_count == 0: return _criterion('C9', 'Alt text on images (sampling)', True, 'No raster images detected in first 30 pages — no alt-text needed.') # Search the catalog graph for /Alt(...) entries — coarse but effective alt_hits = 0 sample_xrefs = list(range(1, min(doc.xref_length(), 500))) for xref in sample_xrefs: try: obj = doc.xref_object(xref) except Exception: continue if '/Alt' in obj or '/ActualText' in obj: alt_hits += 1 if alt_hits == 0: return _criterion('C9', 'Alt text on images (sampling)', False, f'{image_count} images detected but no /Alt or /ActualText found in sampled ' f'structure objects.', {'image_count': image_count, 'pages_with_images': pages_with_images}) return _criterion('C9', 'Alt text on images (sampling)', True, f'{image_count} images detected; {alt_hits} alt-text entries found in sampled objects.', {'image_count': image_count, 'pages_with_images': pages_with_images, 'alt_hits': alt_hits}) # ───────────────────────────────────────────────────────────────────────────── # Top-level entry point # ───────────────────────────────────────────────────────────────────────────── def axa_pdf_accessibility(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict: """Run PDF/UA-1 accessibility validation on the ingested PDF. When veraPDF is installed on the host, its PDF/UA-1 verdict is the authoritative score driver. The deterministic PyMuPDF criteria run in either case as a quick sanity layer. """ pdf_path = ingest_result.get('pdf_path') if not pdf_path: return { 'check_name': 'axa_pdf_accessibility', 'scope': 'document', 'score': 0.0, 'pass': False, 'summary': 'Cannot run — pdf_path missing from ingest_result.', 'findings': {'error': 'pdf_path_missing'}, 'response': '', } try: doc = fitz.open(pdf_path) except Exception as e: return { 'check_name': 'axa_pdf_accessibility', 'scope': 'document', 'score': 0.0, 'pass': False, 'summary': f'Failed to open PDF: {e}', 'findings': {'error': str(e)}, 'response': '', } try: criteria = [ _check_tagged(doc), _check_marked(doc), _check_title(doc), _check_language(doc), _check_no_blocking_encryption(doc), _check_font_embedding(doc), _check_pdf_version(doc), _check_xmp_ua_conformance(doc), _check_alt_text_sampling(doc), ] finally: doc.close() crit_passed = [c for c in criteria if c['passed']] crit_failed = [c for c in criteria if not c['passed']] crit_total = len(criteria) verapdf = _run_verapdf(pdf_path) verapdf_ok = bool(verapdf and verapdf.get('available') and not verapdf.get('error')) if verapdf_ok: score, pass_flag, summary = _score_from_verapdf(verapdf) else: score = round((len(crit_passed) / crit_total) * 10, 2) if crit_total else 0.0 pass_flag = len(crit_failed) == 0 if pass_flag: summary = f'All {crit_total} fast accessibility criteria passed (veraPDF unavailable — install for full PDF/UA-1 validation).' else: summary = f'{len(crit_failed)} of {crit_total} fast accessibility criteria failed (veraPDF unavailable).' response = _build_response_text(summary, criteria, verapdf if verapdf_ok else None) return { 'check_name': 'axa_pdf_accessibility', 'scope': 'document', 'score': score, 'pass': pass_flag, 'summary': summary, 'findings': { 'criteria': criteria, 'criteria_total': crit_total, 'criteria_passed': len(crit_passed), 'criteria_failed': len(crit_failed), 'verapdf_run': verapdf_ok, 'verapdf': verapdf if verapdf else None, }, 'response': response, } def _score_from_verapdf(verapdf: Dict) -> tuple: """Map veraPDF UA-1 verdict to (score, pass_flag, summary). Severity ladder: any rule failure means the document is not PDF/UA-1, so pass_flag is False whenever veraPDF marks the file non-compliant. Score grades the depth of failure so partially-compliant documents still produce a meaningful number for trend tracking. """ if verapdf.get('compliant'): n_rules = verapdf.get('passed_rules', 0) return 10.0, True, f'PDF/UA-1 compliant per veraPDF ({n_rules} rules passed).' n_failed = verapdf.get('failed_rules', 0) n_failed_checks = verapdf.get('failed_checks', 0) if n_failed <= 1: score = 5.0 elif n_failed == 2: score = 3.0 else: score = 0.0 summary = ( f'PDF/UA-1 non-compliant per veraPDF: {n_failed} rule(s) failed ' f'across {n_failed_checks} individual check(s).' ) return score, False, summary def _build_response_text(summary: str, criteria: List[Dict], verapdf: Optional[Dict]) -> str: """Plain-text response shown in the QC report's response block.""" lines = [summary, ''] if verapdf: lines.append('── veraPDF PDF/UA-1 ──') verdict = 'COMPLIANT' if verapdf.get('compliant') else 'NOT COMPLIANT' lines.append(f' Verdict: {verdict}') lines.append( f' Rules: {verapdf.get("passed_rules", 0)} passed / ' f'{verapdf.get("failed_rules", 0)} failed' ) lines.append( f' Checks: {verapdf.get("passed_checks", 0)} passed / ' f'{verapdf.get("failed_checks", 0)} failed' ) for r in verapdf.get('failed_rule_details', []): tag_str = ', '.join(r.get('tags') or []) or '—' lines.append('') lines.append( f' ✗ Clause {r["clause"]}-{r["test_number"]} ' f'(×{r["failed_checks"]}, {tag_str})' ) lines.append(f' {r["description"]}') for s in r.get('sample_errors', [])[:1]: lines.append(f' e.g. {s}') lines.append('') lines.append('── Fast deterministic criteria ──') for c in criteria: marker = '✓' if c['passed'] else '✗' lines.append(f" {marker} {c['code']} — {c['title']}: {c['note']}") return '\n'.join(lines) # ───────────────────────────────────────────────────────────────────────────── # veraPDF integration # ───────────────────────────────────────────────────────────────────────────── def _resolve_verapdf_binary() -> Optional[str]: """Locate the veraPDF executable. Order: VERAPDF_BIN env > PATH > project-local vendor install. Returns None if veraPDF is not installed; the check then falls back to deterministic-only mode. """ env_path = os.environ.get('VERAPDF_BIN') if env_path and os.path.isfile(env_path) and os.access(env_path, os.X_OK): return env_path path_lookup = shutil.which('verapdf') if path_lookup: return path_lookup if os.path.isfile(_VERAPDF_VENDOR_PATH) and os.access(_VERAPDF_VENDOR_PATH, os.X_OK): return _VERAPDF_VENDOR_PATH return None def _run_verapdf(pdf_path: str) -> Optional[Dict]: """Run veraPDF PDF/UA-1 validation. Returns a structured result dict or None when veraPDF is not installed. Returns a dict with 'error' populated if the subprocess ran but failed in some recoverable way. """ binary = _resolve_verapdf_binary() if not binary: return None try: result = subprocess.run( [binary, '-f', 'ua1', '--format', 'xml', '--maxfailuresdisplayed', '3', pdf_path], capture_output=True, text=True, timeout=_VERAPDF_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired: return {'available': True, 'binary': binary, 'error': f'veraPDF timed out after {_VERAPDF_TIMEOUT_SECONDS}s'} except Exception as e: return {'available': True, 'binary': binary, 'error': f'veraPDF subprocess failed: {e}'} if not result.stdout: return { 'available': True, 'binary': binary, 'error': 'veraPDF produced no output', 'stderr': (result.stderr or '')[:500], } try: root = ET.fromstring(result.stdout) except ET.ParseError as e: return { 'available': True, 'binary': binary, 'error': f'Could not parse veraPDF XML: {e}', } vr = root.find('.//validationReport') if vr is None: return { 'available': True, 'binary': binary, 'error': 'No validationReport in veraPDF output', } details = vr.find('details') rules: List[Dict] = [] if details is not None: for rule in details.findall('rule'): tags = (rule.get('tags') or '').split(',') tags = [t for t in tags if t] rules.append({ 'specification': rule.get('specification'), 'clause': rule.get('clause'), 'test_number': rule.get('testNumber'), 'tags': tags, 'failed_checks': int(rule.get('failedChecks') or 0), 'description': (rule.findtext('description') or '').strip(), 'sample_errors': [ (c.findtext('errorMessage') or '').strip() for c in rule.findall('check')[:2] ], }) def _detail_int(name: str) -> int: if details is None: return 0 try: return int(details.get(name) or 0) except (TypeError, ValueError): return 0 return { 'available': True, 'binary': binary, 'compliant': vr.get('isCompliant') == 'true', 'profile': vr.get('profileName', 'PDF/UA-1'), 'statement': vr.get('statement', ''), 'passed_rules': _detail_int('passedRules'), 'failed_rules': _detail_int('failedRules'), 'passed_checks': _detail_int('passedChecks'), 'failed_checks': _detail_int('failedChecks'), 'failed_rule_details': rules, }