ai_qc/backend/document_mode/accessibility_checks.py
nickviljoen 2aeff24136 Wire veraPDF into axa_pdf_accessibility for PAC-equivalent PDF/UA-1 validation
AXA's accessibility QC team uses axes4 PAC (PDF/UA-1 / Matterhorn Protocol)
as their compliance gate, but our existing 9-criterion deterministic check
runs surface-level only and would pass documents PAC fails. Wired up the
existing _run_verapdf() stub so veraPDF — the open-source Matterhorn
implementation — runs as a subprocess and drives the score when available.

Verified locally: veraPDF on EAA_v1.pdf reports the exact same Content (86)
and Metadata (1) failure counts as PAC's report on the same document family,
confirming protocol parity.

Falls back cleanly to the deterministic layer when veraPDF isn't installed,
so deploys are safe before the binary lands on dev/prod servers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 10:36:03 +02:00

502 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PDF accessibility checks aligned to PDF/UA-1.
Two layers, applied in order:
1. veraPDF subprocess — full PDF/UA-1 (ISO 14289-1) validation via the
Matterhorn Protocol. This is the same protocol PAC uses, so its
verdict is the authoritative one when veraPDF is available on the
host. When it runs, its result drives the score and pass flag.
2. Deterministic PyMuPDF criteria (C1-C9) — fast surface checks that
run regardless. They give the AXA team a quick visual sanity-pass
(tagged? language set? fonts embedded?) and are the sole source of
truth when veraPDF is not installed.
Deterministic criteria:
• C1 Tagged PDF — document has a /StructTreeRoot
• C2 Marked — /MarkInfo /Marked is true
• C3 Title — metadata /Title set and non-empty
• C4 Language — document /Lang specified
• C5 No password protection — /Encrypt absent or accessibility-friendly
• C6 Fonts embedded — every font flagged as embedded
• C7 PDF version — 1.5+ recommended
• C8 XMP UA-conformance — XMP metadata declares pdfuaid:part
• C9 Image alt text — sampled images have /Alt or /ActualText
"""
from __future__ import annotations
import os
import re
import shutil
import subprocess
import xml.etree.ElementTree as ET
from typing import Dict, List, Optional
import fitz # PyMuPDF
# Project-local install path for the production server (see vendor dir
# under /opt/ai_qc/vendor/verapdf/). Falls back to PATH lookup or
# VERAPDF_BIN env var.
_VERAPDF_VENDOR_PATH = '/opt/ai_qc/vendor/verapdf/verapdf'
_VERAPDF_TIMEOUT_SECONDS = 180
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _catalog_object(doc: fitz.Document) -> str:
"""Return the catalog object dump as a string (PyMuPDF returns the
PDF dictionary as a text representation we can grep)."""
try:
return doc.xref_object(doc.pdf_catalog())
except Exception:
return ''
def _xmp_metadata(doc: fitz.Document) -> str:
"""Return the XMP metadata stream as a string, or '' if absent."""
try:
meta = doc.get_xml_metadata()
return meta or ''
except Exception:
return ''
def _criterion(code: str, title: str, passed: bool, note: str = '', detail: Optional[Dict] = None) -> Dict:
return {
'code': code,
'title': title,
'passed': passed,
'note': note,
'detail': detail or {},
}
# ─────────────────────────────────────────────────────────────────────────────
# Criterion implementations
# ─────────────────────────────────────────────────────────────────────────────
def _check_tagged(doc: fitz.Document) -> Dict:
catalog = _catalog_object(doc)
has_struct = '/StructTreeRoot' in catalog
return _criterion(
'C1', 'Tagged PDF (StructTreeRoot present)',
has_struct,
'StructTreeRoot found in catalog.' if has_struct
else 'PDF has no structure tree — screen readers will fall back to raw text. PDF/UA fail.',
)
def _check_marked(doc: fitz.Document) -> Dict:
catalog = _catalog_object(doc)
has_markinfo = '/MarkInfo' in catalog
# /Marked must be true within /MarkInfo. PyMuPDF dump returns it as a
# nested dict; we look for the literal "Marked true" pattern.
is_marked = bool(re.search(r'/Marked\s+true', catalog))
if has_markinfo and is_marked:
return _criterion('C2', 'Marked content (/MarkInfo /Marked true)', True,
'/MarkInfo /Marked = true.')
if has_markinfo:
return _criterion('C2', 'Marked content (/MarkInfo /Marked true)', False,
'/MarkInfo present but /Marked is not true.')
return _criterion('C2', 'Marked content (/MarkInfo /Marked true)', False,
'/MarkInfo dictionary missing.')
def _check_title(doc: fitz.Document) -> Dict:
md = doc.metadata or {}
title = (md.get('title') or '').strip()
if title:
return _criterion('C3', 'Document title metadata', True,
f'Title: "{title[:80]}"')
return _criterion('C3', 'Document title metadata', False,
'Title metadata missing or empty.')
def _check_language(doc: fitz.Document) -> Dict:
lang = (doc.language or '').strip()
if not lang:
# Sometimes language is in the catalog but not exposed via doc.language
catalog = _catalog_object(doc)
m = re.search(r'/Lang\s*\(([^)]+)\)', catalog) or re.search(r'/Lang\s*<([^>]+)>', catalog)
if m:
lang = m.group(1)
if lang:
return _criterion('C4', 'Document language (/Lang)', True,
f'Language: {lang}')
return _criterion('C4', 'Document language (/Lang)', False,
'/Lang missing — assistive tech cannot pick a voice/locale.')
def _check_no_blocking_encryption(doc: fitz.Document) -> Dict:
if doc.is_encrypted and doc.needs_pass:
return _criterion('C5', 'No password protection blocking AT', False,
'Document is password-protected — assistive tech cannot read.')
return _criterion('C5', 'No password protection blocking AT', True,
'No password block; assistive tech can read.')
def _check_font_embedding(doc: fitz.Document) -> Dict:
"""Walk every page, list every font, flag any not embedded."""
seen: Dict[str, bool] = {}
not_embedded: List[str] = []
for i in range(doc.page_count):
for f in doc.get_page_fonts(i):
# PyMuPDF tuple: (xref, ext, type, basefont, name, encoding, embedded)
basefont = f[3]
ext = f[1] # '' if not embedded, file extension if embedded
embedded = bool(ext)
if basefont not in seen:
seen[basefont] = embedded
if not embedded:
not_embedded.append(basefont)
total = len(seen)
embedded_count = sum(1 for v in seen.values() if v)
if total == 0:
return _criterion('C6', 'Fonts embedded', True, 'No fonts present.')
if not_embedded:
return _criterion('C6', 'Fonts embedded', False,
f'{len(not_embedded)} of {total} fonts are not embedded.',
{'not_embedded': not_embedded, 'total_fonts': total,
'embedded_count': embedded_count})
return _criterion('C6', 'Fonts embedded', True,
f'All {total} fonts embedded.',
{'total_fonts': total, 'embedded_count': embedded_count})
def _check_pdf_version(doc: fitz.Document) -> Dict:
md = doc.metadata or {}
fmt = (md.get('format') or '').strip()
m = re.search(r'PDF\s+(\d+\.\d+)', fmt)
version = m.group(1) if m else None
if not version:
return _criterion('C7', 'PDF version', False, 'Could not determine PDF version.')
try:
version_num = float(version)
except ValueError:
return _criterion('C7', 'PDF version', False, f'Could not parse version: {fmt}')
# PDF 1.5+ supports compressed cross-reference streams + most accessibility features
if version_num >= 1.5:
return _criterion('C7', 'PDF version', True, f'PDF {version} — supports modern tagging features.')
return _criterion('C7', 'PDF version', False,
f'PDF {version} is older than 1.5 — may not support full accessibility tagging.')
def _check_xmp_ua_conformance(doc: fitz.Document) -> Dict:
xmp = _xmp_metadata(doc)
if not xmp:
return _criterion('C8', 'XMP UA conformance declaration', False,
'No XMP metadata stream found.')
# PDF/UA-1 conformance is declared via pdfuaid:part = 1 in XMP
if re.search(r'pdfuaid:part\s*[>=]\s*[\'"]?1', xmp):
return _criterion('C8', 'XMP UA conformance declaration', True,
'XMP declares PDF/UA-1 conformance.')
if 'pdfuaid' in xmp:
return _criterion('C8', 'XMP UA conformance declaration', False,
'XMP mentions pdfuaid namespace but does not declare PDF/UA-1.')
return _criterion('C8', 'XMP UA conformance declaration', False,
'No PDF/UA conformance flag in XMP metadata.')
def _check_alt_text_sampling(doc: fitz.Document) -> Dict:
"""Sample-check the structure tree for /Alt entries when images are
present. Heuristic: count images on the first 10 pages, and look for
/Alt strings anywhere in the catalog graph. Not a full S→Figure walk,
but a useful early signal — a doc with images and zero /Alt entries
is almost certainly missing alt text.
"""
image_count = 0
pages_with_images = 0
for i in range(min(doc.page_count, 30)):
imgs = doc.get_page_images(i)
if imgs:
pages_with_images += 1
image_count += len(imgs)
if image_count == 0:
return _criterion('C9', 'Alt text on images (sampling)', True,
'No raster images detected in first 30 pages — no alt-text needed.')
# Search the catalog graph for /Alt(...) entries — coarse but effective
alt_hits = 0
sample_xrefs = list(range(1, min(doc.xref_length(), 500)))
for xref in sample_xrefs:
try:
obj = doc.xref_object(xref)
except Exception:
continue
if '/Alt' in obj or '/ActualText' in obj:
alt_hits += 1
if alt_hits == 0:
return _criterion('C9', 'Alt text on images (sampling)', False,
f'{image_count} images detected but no /Alt or /ActualText found in sampled '
f'structure objects.',
{'image_count': image_count, 'pages_with_images': pages_with_images})
return _criterion('C9', 'Alt text on images (sampling)', True,
f'{image_count} images detected; {alt_hits} alt-text entries found in sampled objects.',
{'image_count': image_count, 'pages_with_images': pages_with_images,
'alt_hits': alt_hits})
# ─────────────────────────────────────────────────────────────────────────────
# Top-level entry point
# ─────────────────────────────────────────────────────────────────────────────
def axa_pdf_accessibility(ingest_result: Dict, scope_args: Optional[Dict] = None) -> Dict:
"""Run PDF/UA-1 accessibility validation on the ingested PDF.
When veraPDF is installed on the host, its PDF/UA-1 verdict is the
authoritative score driver. The deterministic PyMuPDF criteria run
in either case as a quick sanity layer.
"""
pdf_path = ingest_result.get('pdf_path')
if not pdf_path:
return {
'check_name': 'axa_pdf_accessibility',
'scope': 'document',
'score': 0.0,
'pass': False,
'summary': 'Cannot run — pdf_path missing from ingest_result.',
'findings': {'error': 'pdf_path_missing'},
'response': '',
}
try:
doc = fitz.open(pdf_path)
except Exception as e:
return {
'check_name': 'axa_pdf_accessibility',
'scope': 'document',
'score': 0.0,
'pass': False,
'summary': f'Failed to open PDF: {e}',
'findings': {'error': str(e)},
'response': '',
}
try:
criteria = [
_check_tagged(doc),
_check_marked(doc),
_check_title(doc),
_check_language(doc),
_check_no_blocking_encryption(doc),
_check_font_embedding(doc),
_check_pdf_version(doc),
_check_xmp_ua_conformance(doc),
_check_alt_text_sampling(doc),
]
finally:
doc.close()
crit_passed = [c for c in criteria if c['passed']]
crit_failed = [c for c in criteria if not c['passed']]
crit_total = len(criteria)
verapdf = _run_verapdf(pdf_path)
verapdf_ok = bool(verapdf and verapdf.get('available') and not verapdf.get('error'))
if verapdf_ok:
score, pass_flag, summary = _score_from_verapdf(verapdf)
else:
score = round((len(crit_passed) / crit_total) * 10, 2) if crit_total else 0.0
pass_flag = len(crit_failed) == 0
if pass_flag:
summary = f'All {crit_total} fast accessibility criteria passed (veraPDF unavailable — install for full PDF/UA-1 validation).'
else:
summary = f'{len(crit_failed)} of {crit_total} fast accessibility criteria failed (veraPDF unavailable).'
response = _build_response_text(summary, criteria, verapdf if verapdf_ok else None)
return {
'check_name': 'axa_pdf_accessibility',
'scope': 'document',
'score': score,
'pass': pass_flag,
'summary': summary,
'findings': {
'criteria': criteria,
'criteria_total': crit_total,
'criteria_passed': len(crit_passed),
'criteria_failed': len(crit_failed),
'verapdf_run': verapdf_ok,
'verapdf': verapdf if verapdf else None,
},
'response': response,
}
def _score_from_verapdf(verapdf: Dict) -> tuple:
"""Map veraPDF UA-1 verdict to (score, pass_flag, summary).
Severity ladder: any rule failure means the document is not PDF/UA-1,
so pass_flag is False whenever veraPDF marks the file non-compliant.
Score grades the depth of failure so partially-compliant documents
still produce a meaningful number for trend tracking.
"""
if verapdf.get('compliant'):
n_rules = verapdf.get('passed_rules', 0)
return 10.0, True, f'PDF/UA-1 compliant per veraPDF ({n_rules} rules passed).'
n_failed = verapdf.get('failed_rules', 0)
n_failed_checks = verapdf.get('failed_checks', 0)
if n_failed <= 1:
score = 5.0
elif n_failed == 2:
score = 3.0
else:
score = 0.0
summary = (
f'PDF/UA-1 non-compliant per veraPDF: {n_failed} rule(s) failed '
f'across {n_failed_checks} individual check(s).'
)
return score, False, summary
def _build_response_text(summary: str, criteria: List[Dict], verapdf: Optional[Dict]) -> str:
"""Plain-text response shown in the QC report's response block."""
lines = [summary, '']
if verapdf:
lines.append('── veraPDF PDF/UA-1 ──')
verdict = 'COMPLIANT' if verapdf.get('compliant') else 'NOT COMPLIANT'
lines.append(f' Verdict: {verdict}')
lines.append(
f' Rules: {verapdf.get("passed_rules", 0)} passed / '
f'{verapdf.get("failed_rules", 0)} failed'
)
lines.append(
f' Checks: {verapdf.get("passed_checks", 0)} passed / '
f'{verapdf.get("failed_checks", 0)} failed'
)
for r in verapdf.get('failed_rule_details', []):
tag_str = ', '.join(r.get('tags') or []) or ''
lines.append('')
lines.append(
f' ✗ Clause {r["clause"]}-{r["test_number"]} '
f'(×{r["failed_checks"]}, {tag_str})'
)
lines.append(f' {r["description"]}')
for s in r.get('sample_errors', [])[:1]:
lines.append(f' e.g. {s}')
lines.append('')
lines.append('── Fast deterministic criteria ──')
for c in criteria:
marker = '' if c['passed'] else ''
lines.append(f" {marker} {c['code']}{c['title']}: {c['note']}")
return '\n'.join(lines)
# ─────────────────────────────────────────────────────────────────────────────
# veraPDF integration
# ─────────────────────────────────────────────────────────────────────────────
def _resolve_verapdf_binary() -> Optional[str]:
"""Locate the veraPDF executable. Order: VERAPDF_BIN env > PATH >
project-local vendor install. Returns None if veraPDF is not
installed; the check then falls back to deterministic-only mode.
"""
env_path = os.environ.get('VERAPDF_BIN')
if env_path and os.path.isfile(env_path) and os.access(env_path, os.X_OK):
return env_path
path_lookup = shutil.which('verapdf')
if path_lookup:
return path_lookup
if os.path.isfile(_VERAPDF_VENDOR_PATH) and os.access(_VERAPDF_VENDOR_PATH, os.X_OK):
return _VERAPDF_VENDOR_PATH
return None
def _run_verapdf(pdf_path: str) -> Optional[Dict]:
"""Run veraPDF PDF/UA-1 validation. Returns a structured result dict
or None when veraPDF is not installed. Returns a dict with 'error'
populated if the subprocess ran but failed in some recoverable way.
"""
binary = _resolve_verapdf_binary()
if not binary:
return None
try:
result = subprocess.run(
[binary, '-f', 'ua1', '--format', 'xml', '--maxfailuresdisplayed', '3', pdf_path],
capture_output=True,
text=True,
timeout=_VERAPDF_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return {'available': True, 'binary': binary, 'error': f'veraPDF timed out after {_VERAPDF_TIMEOUT_SECONDS}s'}
except Exception as e:
return {'available': True, 'binary': binary, 'error': f'veraPDF subprocess failed: {e}'}
if not result.stdout:
return {
'available': True,
'binary': binary,
'error': 'veraPDF produced no output',
'stderr': (result.stderr or '')[:500],
}
try:
root = ET.fromstring(result.stdout)
except ET.ParseError as e:
return {
'available': True,
'binary': binary,
'error': f'Could not parse veraPDF XML: {e}',
}
vr = root.find('.//validationReport')
if vr is None:
return {
'available': True,
'binary': binary,
'error': 'No validationReport in veraPDF output',
}
details = vr.find('details')
rules: List[Dict] = []
if details is not None:
for rule in details.findall('rule'):
tags = (rule.get('tags') or '').split(',')
tags = [t for t in tags if t]
rules.append({
'specification': rule.get('specification'),
'clause': rule.get('clause'),
'test_number': rule.get('testNumber'),
'tags': tags,
'failed_checks': int(rule.get('failedChecks') or 0),
'description': (rule.findtext('description') or '').strip(),
'sample_errors': [
(c.findtext('errorMessage') or '').strip()
for c in rule.findall('check')[:2]
],
})
def _detail_int(name: str) -> int:
if details is None:
return 0
try:
return int(details.get(name) or 0)
except (TypeError, ValueError):
return 0
return {
'available': True,
'binary': binary,
'compliant': vr.get('isCompliant') == 'true',
'profile': vr.get('profileName', 'PDF/UA-1'),
'statement': vr.get('statement', ''),
'passed_rules': _detail_int('passedRules'),
'failed_rules': _detail_int('failedRules'),
'passed_checks': _detail_int('passedChecks'),
'failed_checks': _detail_int('failedChecks'),
'failed_rule_details': rules,
}