PDF-accessibility-saas/enterprise_pdf_checker.py

2216 lines
87 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Enterprise PDF Accessibility Checker
Quality-first comprehensive WCAG 2.1 validation
Features:
- Google Cloud Vision API for OCR and image analysis
- Anthropic Claude for alt text validation and content analysis
- Complete color contrast checking
- Readability analysis
- Form field validation
- Heading structure analysis
- Link quality checking
- Comprehensive reporting
"""
import sys
import os
import json
import re
import base64
import hashlib
import time
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field, asdict
from enum import Enum
from datetime import datetime
from io import BytesIO
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
# Load environment variables from .env file (optional)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
# dotenv not installed, that's okay - will use environment variables
pass
# Setup logging
from logger_config import setup_logger
logger = setup_logger(__name__, "pdf_checker.log")
# Import retry helper for API resilience
from retry_helper import retry_with_backoff, safe_execute, RetryableError
# Import remediation module
try:
from pdf_remediation import VeraPDFValidator, PDFRemediator
except ImportError:
logger.warning("Remediation module not found - auto-fix features disabled")
VeraPDFValidator = None
PDFRemediator = None
# Core PDF libraries
try:
from pypdf import PdfReader, PdfWriter
import pdfplumber
from PIL import Image
import numpy as np
except ImportError:
logger.error("Core libraries not installed")
logger.error("Install: pip install pypdf pdfplumber pillow numpy")
sys.exit(1)
# OCR and analysis
try:
import pytesseract
from pdf2image import convert_from_path
except ImportError:
logger.warning("OCR libraries not available. Install: pip install pytesseract pdf2image")
pytesseract = None
# Readability
try:
from textblob import TextBlob
except ImportError:
logger.warning("TextBlob not available. Install: pip install textblob")
TextBlob = None
# Google Cloud Vision
try:
from google.cloud import vision
from google.cloud import documentai_v1 as documentai
except ImportError:
logger.warning("Google Cloud libraries not available")
logger.info("Install: pip install google-cloud-vision google-cloud-documentai")
vision = None
# Anthropic Claude
try:
import anthropic
except ImportError:
logger.warning("Anthropic library not available")
logger.info("Install: pip install anthropic")
anthropic = None
# Language detection
try:
from langdetect import detect as langdetect_detect, LangDetectException
except ImportError:
logger.warning("langdetect not available — language detection disabled")
langdetect_detect = None
LangDetectException = Exception
# WCAG 2.1 criterion → conformance level
WCAG_LEVELS: Dict[str, str] = {
'1.1.1': 'A', '1.2.1': 'A', '1.2.2': 'A', '1.2.3': 'A',
'1.2.4': 'AA', '1.2.5': 'AA',
'1.3.1': 'A', '1.3.2': 'A', '1.3.3': 'A',
'1.3.4': 'AA', '1.3.5': 'AA',
'1.4.1': 'A', '1.4.2': 'A',
'1.4.3': 'AA', '1.4.4': 'AA', '1.4.5': 'AA',
'1.4.10': 'AA', '1.4.11': 'AA', '1.4.12': 'AA', '1.4.13': 'AA',
'2.1.1': 'A', '2.1.2': 'A', '2.1.4': 'A',
'2.2.1': 'A', '2.2.2': 'A',
'2.3.1': 'A',
'2.4.1': 'A', '2.4.2': 'A', '2.4.3': 'A', '2.4.4': 'A',
'2.4.5': 'AA', '2.4.6': 'AA', '2.4.7': 'AA',
'2.5.1': 'A', '2.5.2': 'A', '2.5.3': 'A', '2.5.4': 'A',
'3.1.1': 'A', '3.1.2': 'AA', '3.1.5': 'AAA',
'3.2.1': 'A', '3.2.2': 'A', '3.2.3': 'AA', '3.2.4': 'AA',
'3.3.1': 'A', '3.3.2': 'A', '3.3.3': 'AA', '3.3.4': 'AA',
'4.1.1': 'A', '4.1.2': 'A', '4.1.3': 'AA',
}
class Severity(Enum):
"""Issue severity levels"""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
SUCCESS = "SUCCESS"
@dataclass
class AccessibilityIssue:
"""Represents an accessibility issue"""
severity: Severity
category: str
description: str
page_number: Optional[int] = None
recommendation: str = ""
wcag_criterion: str = ""
details: Dict[str, Any] = field(default_factory=dict)
coordinates: Optional[Dict[str, float]] = None # x0, y0, x1, y1 for highlighting
def to_dict(self):
"""Convert to dictionary for JSON serialization"""
levels = [WCAG_LEVELS.get(c.strip(), '') for c in self.wcag_criterion.split(',') if c.strip()]
levels = [l for l in levels if l]
level_order = ['A', 'AA', 'AAA']
wcag_level = min(levels, key=lambda l: level_order.index(l)) if levels else ''
return {
'severity': self.severity.value,
'category': self.category,
'description': self.description,
'page_number': self.page_number,
'recommendation': self.recommendation,
'wcag_criterion': self.wcag_criterion,
'wcag_level': wcag_level,
'details': self.details,
'coordinates': self.coordinates
}
@dataclass
class CheckResult:
"""Results from a specific check"""
check_name: str
passed: bool
issues: List[AccessibilityIssue] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
duration: float = 0.0
class CacheManager:
"""Manages caching of API results to reduce costs"""
def __init__(self, cache_dir: str = ".cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def get_cache_key(self, data: bytes, prefix: str = "") -> str:
"""Generate cache key from data"""
hash_obj = hashlib.sha256(data)
return f"{prefix}_{hash_obj.hexdigest()}"
def get(self, key: str) -> Optional[Dict]:
"""Retrieve cached result"""
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
try:
with open(cache_file, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError, OSError):
return None
return None
def set(self, key: str, data: Dict):
"""Store result in cache"""
cache_file = self.cache_dir / f"{key}.json"
with open(cache_file, 'w') as f:
json.dump(data, f)
class ColorContrastChecker:
"""WCAG color contrast validation"""
WCAG_AA_NORMAL = 4.5
WCAG_AA_LARGE = 3.0
WCAG_AAA_NORMAL = 7.0
WCAG_AAA_LARGE = 4.5
@staticmethod
def get_luminance(rgb: Tuple[int, int, int]) -> float:
"""Calculate relative luminance per WCAG formula"""
r, g, b = [x / 255.0 for x in rgb]
r = r / 12.92 if r <= 0.03928 else ((r + 0.055) / 1.055) ** 2.4
g = g / 12.92 if g <= 0.03928 else ((g + 0.055) / 1.055) ** 2.4
b = b / 12.92 if b <= 0.03928 else ((b + 0.055) / 1.055) ** 2.4
return 0.2126 * r + 0.7152 * g + 0.0722 * b
@staticmethod
def calculate_contrast_ratio(color1: Tuple[int, int, int],
color2: Tuple[int, int, int]) -> float:
"""Calculate WCAG contrast ratio"""
l1 = ColorContrastChecker.get_luminance(color1)
l2 = ColorContrastChecker.get_luminance(color2)
lighter = max(l1, l2)
darker = min(l1, l2)
return (lighter + 0.05) / (darker + 0.05)
@staticmethod
def check_image_contrast(image: Image.Image, sample_size: int = 1000) -> Dict:
"""Sample image for contrast issues.
Compares pixel pairs that are 8px apart vertically — more likely to
cross a text-stroke / background boundary than adjacent pixels.
Only considers pairs where luminance actually differs (|Δlum| > 0.08),
which filters out uniform photo areas and focuses on real edges.
"""
if image.mode != 'RGB':
image = image.convert('RGB')
width, height = image.size
rng = np.random.default_rng(seed=42)
significant = [] # pairs that cross a meaningful light/dark boundary
attempts = min(sample_size * 4, width * height // 20)
for _ in range(attempts):
x = int(rng.integers(0, width))
y = int(rng.integers(0, max(1, height - 9)))
try:
c1 = image.getpixel((x, y))
c2 = image.getpixel((x, y + 8))
l1 = ColorContrastChecker.get_luminance(c1)
l2 = ColorContrastChecker.get_luminance(c2)
if abs(l1 - l2) < 0.08:
continue # near-uniform area (photo gradient, blank space) — skip
ratio = ColorContrastChecker.calculate_contrast_ratio(c1, c2)
significant.append({'ratio': ratio, 'colors': (c1, c2), 'position': (x, y)})
if len(significant) >= sample_size:
break
except (IndexError, TypeError, ValueError):
continue
if len(significant) < 20:
return {'error': 'Insufficient contrast edges to analyse (image-only page)'}
fail_aa = [s for s in significant if s['ratio'] < ColorContrastChecker.WCAG_AA_NORMAL]
fail_large = [s for s in significant if s['ratio'] < ColorContrastChecker.WCAG_AA_LARGE]
return {
'total_samples': len(significant),
'fail_aa_normal_count': len(fail_aa),
'fail_aa_large_count': len(fail_large),
'fail_aa_normal_percent': len(fail_aa) / len(significant) * 100,
'fail_aa_large_percent': len(fail_large) / len(significant) * 100,
'worst_ratio': min(s['ratio'] for s in significant),
'best_ratio': max(s['ratio'] for s in significant),
'avg_ratio': sum(s['ratio'] for s in significant) / len(significant),
}
class ReadabilityAnalyzer:
"""Content readability analysis"""
@staticmethod
def count_syllables(word: str) -> int:
"""Count syllables in a word"""
word = word.lower().strip()
vowels = 'aeiouy'
syllable_count = 0
previous_was_vowel = False
for char in word:
is_vowel = char in vowels
if is_vowel and not previous_was_vowel:
syllable_count += 1
previous_was_vowel = is_vowel
if word.endswith('e') and syllable_count > 1:
syllable_count -= 1
return max(1, syllable_count)
@staticmethod
def analyze(text: str) -> Dict:
"""Comprehensive readability analysis"""
if not text or len(text.strip()) < 50:
return {'error': 'Insufficient text for analysis'}
# Clean text
text = re.sub(r'\s+', ' ', text.strip())
# Basic metrics
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
words = re.findall(r'\b\w+\b', text)
if not sentences or not words:
return {'error': 'Could not parse text'}
total_sentences = len(sentences)
total_words = len(words)
total_syllables = sum(ReadabilityAnalyzer.count_syllables(w) for w in words)
# Flesch Reading Ease (0-100, higher = easier)
flesch_reading_ease = (
206.835
- 1.015 * (total_words / total_sentences)
- 84.6 * (total_syllables / total_words)
)
# Flesch-Kincaid Grade Level
fk_grade_level = (
0.39 * (total_words / total_sentences)
+ 11.8 * (total_syllables / total_words)
- 15.59
)
# Find issues
long_sentences = [s for s in sentences if len(s.split()) > 25]
complex_words = [w for w in words if ReadabilityAnalyzer.count_syllables(w) > 3]
return {
'flesch_reading_ease': round(flesch_reading_ease, 2),
'flesch_kincaid_grade': round(fk_grade_level, 2),
'total_words': total_words,
'total_sentences': total_sentences,
'avg_words_per_sentence': round(total_words / total_sentences, 2),
'long_sentences_count': len(long_sentences),
'complex_words_count': len(complex_words),
'complex_words_percent': round(len(complex_words) / total_words * 100, 2)
}
class EnterprisePDFChecker:
"""Enterprise-grade PDF accessibility checker"""
def __init__(self, pdf_path: str, config: Dict[str, Any] = None, quick_mode: bool = False, generate_images: bool = True):
self.pdf_path = Path(pdf_path)
self.config = config or {}
self.quick_mode = quick_mode
self.generate_images = generate_images
self.issues: List[AccessibilityIssue] = []
self.check_results: List[CheckResult] = []
self.pdf_reader = None
self.pdf_plumber = None
self.cache = CacheManager()
self.page_images: Dict[int, str] = {} # page_num -> image_path
self.verapdf_results: Optional[Dict] = None
self.remediation_suggestions: Optional[Dict] = None
self._detected_lang: str = 'en' # detected language of the document
# API clients
self.vision_client = None
self.anthropic_client = None
self.api_timeout = 10.0 # 10 second timeout for API calls
# Initialize API clients
config = self.config
google_creds_path = config.get('google_credentials_path')
if google_creds_path and os.path.isfile(google_creds_path):
# Valid credentials file exists
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = google_creds_path
if vision:
try:
self.vision_client = vision.ImageAnnotatorClient()
logger.info("Google Cloud Vision initialized with credentials file")
except Exception as e:
logger.warning(f"Google Vision initialization failed: {str(e)}")
elif config.get('google_api_key'):
# Use API key directly
if vision:
# Note: Vision API with API key requires different initialization
# For now, store key for use in requests
self.google_api_key = config['google_api_key']
logger.info(f"Using Google API key: {self.google_api_key[:20]}...")
elif google_creds_path:
# Path provided but file doesn't exist
logger.warning(f"Google credentials file not found: {google_creds_path}")
logger.warning("Skipping Google Cloud Vision (advanced OCR disabled)")
if config.get('anthropic_api_key') and anthropic:
try:
self.anthropic_client = anthropic.Anthropic(api_key=config['anthropic_api_key'])
logger.info("Anthropic Claude initialized")
except Exception as e:
logger.warning(f"Anthropic initialization failed: {str(e)}")
# Stats
self.stats = {
'start_time': datetime.now(),
'total_checks': 0,
'api_calls': 0,
'cached_calls': 0,
'total_cost_estimate': 0.0
}
def add_issue(self, severity: Severity, category: str, description: str, **kwargs):
"""Add an accessibility issue"""
issue = AccessibilityIssue(
severity=severity,
category=category,
description=description,
**kwargs
)
self.issues.append(issue)
# Per-check wall-clock timeouts (seconds). Heavy checks get more time.
_CHECK_TIMEOUTS = {
"Image Accessibility": 180,
"OCR Quality": 180,
"Color Contrast": 120,
"PDF/UA Structure (veraPDF)": 120,
"Content Readability": 60,
}
_DEFAULT_CHECK_TIMEOUT = 90
def run_check(self, check_func, check_name: str) -> CheckResult:
"""Run a check with a per-check timeout and record results."""
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
start_time = time.time()
result = CheckResult(check_name=check_name, passed=True)
issues_before = len(self.issues)
timeout = self._CHECK_TIMEOUTS.get(check_name, self._DEFAULT_CHECK_TIMEOUT)
try:
with ThreadPoolExecutor(max_workers=1) as ex:
future = ex.submit(check_func)
future.result(timeout=timeout)
# Check passed if no critical/error issues added by THIS check
new_issues = self.issues[issues_before:]
critical_errors = [i for i in new_issues
if i.severity in [Severity.CRITICAL, Severity.ERROR]]
result.passed = len(critical_errors) == 0
except FuturesTimeout:
logger.warning(f"{check_name} timed out after {timeout}s — skipping")
self.add_issue(
Severity.WARNING,
check_name,
f"Check timed out after {timeout}s and was skipped",
details={'timeout': timeout}
)
result.passed = False
except Exception as e:
self.add_issue(
Severity.CRITICAL,
check_name,
f"Check failed with error: {str(e)}",
details={'error': str(e), 'traceback': traceback.format_exc()}
)
result.passed = False
result.duration = time.time() - start_time
self.check_results.append(result)
self.stats['total_checks'] += 1
return result
def check_all(self) -> Dict[str, Any]:
"""Run all accessibility checks"""
logger.info("Enterprise PDF Accessibility Check")
logger.info(f"File: {self.pdf_path.name}")
logger.info("=" * 60)
try:
self.pdf_reader = PdfReader(str(self.pdf_path))
self.pdf_plumber = pdfplumber.open(str(self.pdf_path))
# Run all checks
checks = [
(self._check_basic_structure, "Document Structure"),
(self._check_metadata, "Metadata"),
(self._check_language, "Language Declaration"),
(self._check_text_extractability, "Text Extractability"),
(self._check_ocr_quality, "OCR Quality"),
(self._check_images_comprehensive, "Image Accessibility"),
(self._check_color_contrast, "Color Contrast"),
(self._check_readability, "Content Readability"),
(self._check_links, "Link Quality"),
(self._check_headings, "Heading Structure"),
(self._check_tab_order, "Tab Order"),
(self._check_role_mapping, "Role Mapping"),
(self._check_forms, "Form Accessibility"),
(self._check_tables, "Table Structure"),
(self._check_reading_order, "Reading Order"),
(self._check_fonts, "Font Accessibility"),
(self._check_security, "Security Settings"),
(self._check_bookmarks, "Navigation Aids"),
(self._check_verapdf_validation, "PDF/UA Structure (veraPDF)"),
]
for check_func, check_name in checks:
logger.info(f"Running: {check_name}...")
result = self.run_check(check_func, check_name)
status = "PASS" if result.passed else "FAIL"
logger.info(f"{status} ({result.duration:.2f}s)")
# Analyze remediation options
self._analyze_remediation_options()
except Exception as e:
self.add_issue(
Severity.CRITICAL,
"File Access",
f"Could not process PDF: {str(e)}",
details={'error': str(e)}
)
finally:
if self.pdf_plumber:
self.pdf_plumber.close()
self.stats['end_time'] = datetime.now()
self.stats['duration'] = (self.stats['end_time'] - self.stats['start_time']).total_seconds()
return self._generate_summary()
# ==================== CORE CHECKS ====================
def _check_basic_structure(self):
"""Check PDF structure and tagging"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/MarkInfo" not in catalog:
self.add_issue(
Severity.CRITICAL,
"Document Structure",
"PDF is not tagged - completely inaccessible to screen readers",
wcag_criterion="1.3.1, 4.1.2",
recommendation="Tag the PDF using Adobe Acrobat Pro or authoring software"
)
return
mark_info = catalog.get("/MarkInfo", {})
marked = mark_info.get("/Marked", False)
if not marked:
self.add_issue(
Severity.CRITICAL,
"Document Structure",
"PDF marked as untagged in metadata",
wcag_criterion="1.3.1",
recommendation="Enable document tagging"
)
else:
self.add_issue(
Severity.SUCCESS,
"Document Structure",
"PDF is properly tagged",
wcag_criterion="1.3.1"
)
def _check_metadata(self):
"""Check document metadata"""
meta = self.pdf_reader.metadata
if not meta:
self.add_issue(
Severity.ERROR,
"Metadata",
"No document metadata found",
wcag_criterion="2.4.2",
recommendation="Add title, author, and subject metadata"
)
return
# Check title
if not meta.title or not meta.title.strip():
self.add_issue(
Severity.ERROR,
"Metadata",
"Document title is missing",
wcag_criterion="2.4.2",
recommendation="Add a descriptive title"
)
else:
self.add_issue(
Severity.SUCCESS,
"Metadata",
f"Document has title: '{meta.title}'",
wcag_criterion="2.4.2"
)
# Check author
if not meta.author or not meta.author.strip():
self.add_issue(
Severity.WARNING,
"Metadata",
"Author information is missing",
recommendation="Add author metadata"
)
# Check subject
if not meta.subject or not meta.subject.strip():
self.add_issue(
Severity.INFO,
"Metadata",
"Subject/description is missing",
recommendation="Add a brief description"
)
def _check_language(self):
"""Check language declaration (WCAG 3.1.1) and detect actual content language."""
catalog = self.pdf_reader.trailer.get("/Root", {})
# --- Detect actual language from content ---
sample_text = ""
for page in self.pdf_plumber.pages[:3]:
t = page.extract_text()
if t:
sample_text += t + " "
if len(sample_text) > 500:
break
if langdetect_detect and len(sample_text.strip()) >= 50:
try:
self._detected_lang = langdetect_detect(sample_text)
except LangDetectException:
self._detected_lang = 'en'
# --- Check declared /Lang ---
if "/Lang" not in catalog:
suggestion = self._detected_lang if self._detected_lang else 'en-US'
# Map ISO 639-1 codes to BCP-47 tags
lang_map = {
'uk': 'uk-UA', 'ru': 'ru-RU', 'de': 'de-DE', 'fr': 'fr-FR',
'es': 'es-ES', 'pl': 'pl-PL', 'it': 'it-IT', 'pt': 'pt-PT',
'nl': 'nl-NL', 'cs': 'cs-CZ', 'sk': 'sk-SK', 'ro': 'ro-RO',
'hu': 'hu-HU', 'bg': 'bg-BG', 'hr': 'hr-HR', 'ar': 'ar-SA',
'zh': 'zh-CN', 'ja': 'ja-JP', 'ko': 'ko-KR', 'en': 'en-US',
}
bcp47 = lang_map.get(self._detected_lang, self._detected_lang)
self.add_issue(
Severity.ERROR,
"Language",
"Document language not specified",
wcag_criterion="3.1.1",
recommendation=f"Set document language (detected content language: '{bcp47}')",
details={'detected_language': self._detected_lang}
)
else:
declared_lang = str(catalog["/Lang"]).lower()
# Compare declared lang prefix with detected lang
declared_prefix = declared_lang.split('-')[0].split('_')[0]
if (langdetect_detect and len(sample_text.strip()) >= 50
and self._detected_lang != 'en' # English is common false-positive
and declared_prefix != self._detected_lang
and self._detected_lang not in declared_prefix):
self.add_issue(
Severity.WARNING,
"Language",
f"Declared language '{catalog['/Lang']}' may not match content "
f"(detected: '{self._detected_lang}')",
wcag_criterion="3.1.1",
recommendation="Verify the /Lang entry matches the document's actual language",
details={'declared_language': str(catalog["/Lang"]),
'detected_language': self._detected_lang}
)
else:
self.add_issue(
Severity.SUCCESS,
"Language",
f"Document language set to: {catalog['/Lang']}",
wcag_criterion="3.1.1",
details={'declared_language': str(catalog["/Lang"]),
'detected_language': self._detected_lang}
)
def _check_text_extractability(self):
"""Check if text can be extracted"""
total_pages = len(self.pdf_reader.pages)
pages_without_text = 0
page_details = []
for i, page in enumerate(self.pdf_plumber.pages):
text = page.extract_text()
char_count = len(text) if text else 0
if char_count < 10:
pages_without_text += 1
page_details.append(i + 1)
if pages_without_text == total_pages:
self.add_issue(
Severity.CRITICAL,
"Text Accessibility",
"No extractable text found - document appears to be scanned images",
wcag_criterion="1.1.1",
recommendation="Run OCR or recreate from source with selectable text",
details={'pages_affected': page_details}
)
elif pages_without_text > 0:
self.add_issue(
Severity.WARNING,
"Text Accessibility",
f"{pages_without_text} of {total_pages} pages have no extractable text",
wcag_criterion="1.1.1",
recommendation="Review pages without text",
details={'pages_affected': page_details}
)
def _check_ocr_quality(self):
"""Check OCR quality if document appears scanned"""
if not pytesseract:
return
if self.quick_mode:
logger.info("Skipping OCR analysis (quick mode)")
return
logger.info("Running OCR analysis...")
try:
# Reduced DPI from 300 to 150 for faster processing
images = convert_from_path(str(self.pdf_path), dpi=150, first_page=1, last_page=min(2, len(self.pdf_reader.pages)))
for i, image in enumerate(images):
# Get OCR data with confidence
ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
confidences = [int(c) for c in ocr_data['conf'] if c != '-1']
if confidences:
avg_confidence = sum(confidences) / len(confidences)
if avg_confidence < 60:
self.add_issue(
Severity.WARNING,
"OCR Quality",
f"Page {i+1}: Low OCR confidence ({avg_confidence:.1f}%)",
wcag_criterion="1.1.1",
recommendation="Poor scan quality - rescan or manual review needed",
page_number=i+1,
details={'confidence': avg_confidence}
)
except Exception as e:
logger.warning(f"OCR check skipped: {str(e)}")
def _check_images_comprehensive(self):
"""Comprehensive image accessibility check with AI"""
logger.info("Analyzing images with AI...")
total_images = 0
analyzed_images = 0
# Collect all images first
image_tasks = []
for page_num, page in enumerate(self.pdf_plumber.pages):
images = page.images
total_images += len(images)
for img_idx, img in enumerate(images):
try:
image_data = self._extract_image_from_page(page, img)
if image_data:
# Include coordinates for highlighting
coords = {
'x0': img['x0'],
'y0': img['top'],
'x1': img['x1'],
'y1': img['bottom']
}
image_tasks.append((image_data, page_num + 1, img_idx + 1, coords))
except Exception as e:
logger.warning(f"Failed to extract image on page {page_num + 1}: {str(e)}")
if total_images == 0:
self.add_issue(
Severity.INFO,
"Images",
"No images found in document",
wcag_criterion="1.1.1"
)
return
logger.info(f"Found {total_images} images to analyze...")
# Cap analysis: skip very small images (likely decorative/icons)
image_tasks = [t for t in image_tasks if self._image_data_size(t[0]) > 2048]
# Limit to 10 images max — more would just waste API calls on brochure backgrounds
MAX_IMAGES = 10
if len(image_tasks) > MAX_IMAGES:
logger.info(f"Capping image analysis at {MAX_IMAGES} (of {len(image_tasks)}) images")
image_tasks = image_tasks[:MAX_IMAGES]
# Skip AI analysis in quick mode
if self.quick_mode:
logger.info("Skipping AI image analysis (quick mode)")
self.add_issue(
Severity.INFO,
"Images",
f"Found {total_images} images - run without --quick for AI analysis",
wcag_criterion="1.1.1"
)
return
# Process images in parallel with progress updates
def analyze_single_image(task_data):
image_data, page_num, img_num, coords = task_data
result = {'page': page_num, 'img': img_num, 'analyzed': False, 'coords': coords}
try:
# Check cache first
cache_key = self.cache.get_cache_key(image_data, "claude_vision")
cached_result = self.cache.get(cache_key)
if cached_result:
analysis = cached_result
result['cached'] = True
else:
# Analyze with Claude (timeout via concurrent.futures)
with ThreadPoolExecutor(max_workers=1) as img_exec:
future = img_exec.submit(self._analyze_image_with_claude, image_data)
try:
analysis = future.result(timeout=30)
except Exception:
analysis = None
if analysis and 'error' not in analysis:
self.cache.set(cache_key, analysis)
result['cached'] = False
if analysis and 'error' not in analysis:
result['analysis'] = analysis
result['analyzed'] = True
# Also check with Google Vision for additional data
if self.vision_client:
vision_analysis = self._analyze_image_with_google(image_data)
if vision_analysis:
result['vision_analysis'] = vision_analysis
except Exception as e:
result['error'] = str(e)
return result
# Use ThreadPoolExecutor for parallel processing
max_workers = 5 if not self.quick_mode else 1
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(analyze_single_image, task): task for task in image_tasks}
for future in as_completed(futures):
try:
result = future.result()
analyzed_images += 1
cache_status = " (cached)" if result.get('cached') else ""
logger.info(f"Analyzed image {analyzed_images}/{total_images} (Page {result['page']}){cache_status}")
if result.get('analyzed'):
self._process_image_analysis(result['analysis'], result['page'], result['img'], result.get('coords'))
if result.get('cached'):
self.stats['cached_calls'] += 1
else:
self.stats['api_calls'] += 1
self.stats['total_cost_estimate'] += 0.015
if result.get('vision_analysis'):
self._process_google_vision_results(result['vision_analysis'], result['page'], result['img'], result.get('coords'))
if result.get('error'):
logger.warning(f"Error analyzing image on page {result['page']}: {result['error']}")
except Exception as e:
logger.warning(f"Image analysis error: {str(e)}")
logger.info(f"Completed analysis of {analyzed_images}/{total_images} images")
@retry_with_backoff(max_retries=3, initial_delay=1.0)
def _analyze_image_with_claude(self, image_bytes: bytes) -> Optional[Dict]:
"""Analyze image with Claude Vision (with automatic retry on failure)"""
if not self.anthropic_client:
return None
try:
base64_image = base64.b64encode(image_bytes).decode('utf-8')
message = self.anthropic_client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
timeout=self.api_timeout,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image,
},
},
{
"type": "text",
"text": """Analyze this image for PDF accessibility (WCAG 2.1):
1. Provide concise alt text (1-2 sentences, max 125 characters)
2. Is this decorative or informational?
3. Does it contain text? If yes, what text?
4. Does it use color as the only means of conveying information?
5. Are there any accessibility concerns?
6. Quality rating (1-10) if this were to be used in a PDF
7. For images of people: describe their role, action, or function — not physical
appearance (race, ethnicity, age, gender, disability) unless directly relevant
to the image's informational purpose. A human reviewer will verify descriptions
of people.
8. If a brand name, logo, or product name is visible, use the specific brand name
in the alt text (e.g., "Scotch tape" not "adhesive tape", "Nike Air Max" not "sneakers").
Respond in JSON format:
{
"alt_text": "...",
"type": "decorative|informational|complex",
"has_text": true|false,
"text_content": "...",
"color_only_info": true|false,
"concerns": ["..."],
"quality_rating": 1-10,
"recommendation": "...",
"contains_people": true|false,
"brands_detected": ["..."]
}"""
}
],
}
],
)
response_text = message.content[0].text
# Try to parse JSON from response
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {'error': 'Could not parse response'}
except Exception as e:
return {'error': str(e)}
@retry_with_backoff(max_retries=3, initial_delay=1.0)
def _analyze_image_with_google(self, image_bytes: bytes) -> Optional[Dict]:
"""Analyze image with Google Vision (with automatic retry on failure)"""
if not self.vision_client:
return None
try:
image = vision.Image(content=image_bytes)
# Multiple detection types with timeout
response = self.vision_client.annotate_image(
{
'image': image,
'features': [
{'type_': vision.Feature.Type.TEXT_DETECTION},
{'type_': vision.Feature.Type.LABEL_DETECTION},
{'type_': vision.Feature.Type.IMAGE_PROPERTIES},
{'type_': vision.Feature.Type.OBJECT_LOCALIZATION},
],
},
timeout=self.api_timeout
)
self.stats['api_calls'] += 1
self.stats['total_cost_estimate'] += 0.0015
return {
'has_text': bool(response.text_annotations),
'text_content': response.text_annotations[0].description if response.text_annotations else None,
'labels': [label.description for label in response.label_annotations[:5]],
'objects': [obj.name for obj in response.localized_object_annotations]
}
except Exception as e:
return {'error': str(e)}
def _process_image_analysis(self, analysis: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None):
"""Process Claude's image analysis results"""
# Check if text in image
if analysis.get('has_text'):
self.add_issue(
Severity.ERROR,
"Images - Text in Image",
f"Page {page_num}, Image {img_num}: Contains text: '{analysis.get('text_content', '')[:50]}'",
wcag_criterion="1.4.5",
recommendation="Replace image with actual text or provide text alternative",
page_number=page_num,
details=analysis,
coordinates=coordinates
)
# Check alt text quality
if analysis.get('type') == 'informational':
alt_text = analysis.get('alt_text', '')
if len(alt_text) > 125:
self.add_issue(
Severity.WARNING,
"Images - Alt Text",
f"Page {page_num}, Image {img_num}: Suggested alt text is too long ({len(alt_text)} chars)",
wcag_criterion="1.1.1",
recommendation=f"Shorten alt text. Suggested: '{alt_text[:100]}...'",
page_number=page_num,
coordinates=coordinates
)
else:
self.add_issue(
Severity.INFO,
"Images - Alt Text",
f"Page {page_num}, Image {img_num}: Suggested alt text: '{alt_text}'",
wcag_criterion="1.1.1",
page_number=page_num,
coordinates=coordinates
)
# Check for color-only information
if analysis.get('color_only_info'):
self.add_issue(
Severity.ERROR,
"Images - Color Only",
f"Page {page_num}, Image {img_num}: Uses color as only means of conveying information",
wcag_criterion="1.4.1",
recommendation="Add patterns, labels, or text descriptions",
page_number=page_num,
coordinates=coordinates
)
# Flag images containing people for human review
if analysis.get('contains_people'):
self.add_issue(
Severity.INFO,
"Images - People",
f"Page {page_num}, Image {img_num}: Image contains people — alt text description "
"should be verified by a human reviewer to ensure ethical and accurate representation.",
wcag_criterion="1.1.1",
recommendation="Review alt text to confirm it describes role/action rather than physical appearance.",
page_number=page_num,
coordinates=coordinates
)
# Note any detected brand names for reviewer awareness
brands = [b for b in analysis.get('brands_detected', []) if b]
if brands:
self.add_issue(
Severity.INFO,
"Images - Brands",
f"Page {page_num}, Image {img_num}: Brand name(s) detected: {', '.join(brands[:5])}. "
"Verify the alt text uses the specific brand name.",
wcag_criterion="1.1.1",
page_number=page_num,
coordinates=coordinates
)
# Quality concerns — capped at 2 per image, downgraded to INFO
# (these are advisory notes, not WCAG violations)
concerns = analysis.get('concerns', [])
for concern in concerns[:2]:
self.add_issue(
Severity.INFO,
"Images - Quality",
f"Page {page_num}, Image {img_num}: {concern}",
wcag_criterion="1.1.1",
page_number=page_num,
coordinates=coordinates
)
def _process_google_vision_results(self, results: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None):
"""Process Google Vision results — only report actionable findings."""
pass # Label detections alone are not accessibility issues; Claude already provides alt text
def _check_color_contrast(self):
"""Check color contrast using image analysis"""
logger.info("Checking color contrast...")
if self.quick_mode:
logger.info("Skipping detailed contrast analysis (quick mode)")
return
try:
# Reduced DPI from 150 to 100 for faster processing
images = convert_from_path(str(self.pdf_path), dpi=100, first_page=1, last_page=min(3, len(self.pdf_reader.pages)))
for i, image in enumerate(images):
contrast_results = ColorContrastChecker.check_image_contrast(image)
if 'error' in contrast_results:
continue
# Only flag edges that actually cross a light/dark boundary (filtered in sampler).
# >60% of those edges failing = genuine contrast problem.
# 30-60% = worth a warning. Below 30% = pass.
fail_pct = contrast_results['fail_aa_normal_percent']
if fail_pct > 60:
self.add_issue(
Severity.ERROR,
"Color Contrast",
f"Page {i+1}: {fail_pct:.1f}% of text-edge samples fail WCAG AA (4.5:1) — "
f"low contrast text likely present",
wcag_criterion="1.4.3",
recommendation="Use Colour Contrast Analyser to identify and fix low-contrast text",
page_number=i+1,
details=contrast_results
)
elif fail_pct > 30:
self.add_issue(
Severity.WARNING,
"Color Contrast",
f"Page {i+1}: {fail_pct:.1f}% of text-edge samples fail WCAG AA — "
f"verify contrast manually with Colour Contrast Analyser",
wcag_criterion="1.4.3",
recommendation="Check text against its background using the Colour Contrast Analyser tool",
page_number=i+1,
details=contrast_results
)
except Exception as e:
logger.warning(f"Contrast check skipped: {str(e)}")
def _check_readability(self):
"""Check content readability (language-aware: Flesch only for English)."""
# Extract all text
all_text = ""
for page in self.pdf_plumber.pages:
text = page.extract_text()
if text:
all_text += text + "\n"
if len(all_text) < 100:
return
# Flesch Reading Ease is an English-only formula — skip for other languages
is_english = self._detected_lang in ('en', 'en-us', 'en-gb')
if is_english:
analysis = ReadabilityAnalyzer.analyze(all_text)
if 'error' in analysis:
return
# Check Flesch Reading Ease — readability is advisory, cap at WARNING
if analysis['flesch_reading_ease'] < 60:
self.add_issue(
Severity.WARNING,
"Readability",
f"Content is difficult to read (Flesch score: {analysis['flesch_reading_ease']}/100)",
wcag_criterion="3.1.5",
recommendation="Simplify language to reach 8th-9th grade level (target score: 60+)",
details=analysis
)
# Check grade level
if analysis['flesch_kincaid_grade'] > 10:
self.add_issue(
Severity.WARNING,
"Readability",
f"Content requires grade {analysis['flesch_kincaid_grade']} reading level",
wcag_criterion="3.1.5",
recommendation="Target grade 8-10 for general audiences",
details=analysis
)
# Long-sentence check is language-agnostic
sentences = [s.strip() for s in re.split(r'[.!?]+', all_text) if s.strip()]
long_sentences = [s for s in sentences if len(s.split()) > 25]
if len(long_sentences) > 5:
self.add_issue(
Severity.INFO,
"Readability",
f"{len(long_sentences)} sentences exceed 25 words",
wcag_criterion="3.1.5",
recommendation="Break long sentences for better comprehension",
details={'long_sentences_count': len(long_sentences),
'detected_language': self._detected_lang}
)
def _check_links(self):
"""Check link quality (WCAG 2.4.4) — only checks actual hyperlink label text."""
unclear_patterns = [
# English
r'\bclick here\b', r'\bhere\b', r'\bread more\b',
r'\bmore\b', r'\bthis\b', r'\blink\b',
# Ukrainian
r'\атисніть тут\b', r'\ут\b', r'\окладніше\b',
r'\bбільше\b', r'\bцe\b', r'\bпосилання\b',
# Russian
r'\ажмите здесь\b', r'\bздесь\b', r'\bподробнее\b',
r'\bбольше\b', r'\bэто\b', r'\bссылка\b',
# German
r'\bhier klicken\b', r'\bhier\b', r'\bmehr lesen\b',
r'\bmehr\b', r'\bdies\b', r'\blink\b',
# French
r'\bcliquez ici\b', r'\bici\b', r'\blire la suite\b',
r'\bplus\b', r'\bceci\b', r'\blien\b',
# Spanish
r'\bhaz clic aquí\b', r'\baquí\b', r'\beer más\b',
r'\bmás\b', r'\besto\b', r'\benlace\b',
# Polish
r'\bkliknij tutaj\b', r'\btutaj\b', r'\bczytaj więcej\b',
r'\bwięcej\b', r'\bto\b', r'\blink\b',
]
for i, (page_plumber, page_pypdf) in enumerate(
zip(self.pdf_plumber.pages, self.pdf_reader.pages)
):
annots_raw = page_pypdf.get("/Annots")
if not annots_raw:
continue
page_height = float(page_plumber.height)
page_flagged = False
for annot_ref in annots_raw:
try:
annot = annot_ref.get_object()
except Exception:
continue
# Only process URI hyperlinks
if annot.get("/Subtype") != "/Link":
continue
action = annot.get("/A")
if not action or action.get("/S") != "/URI":
continue
# Get annotation bounding box (PDF coords: bottom-left origin)
rect = annot.get("/Rect")
if not rect or len(rect) < 4:
continue
x0, y0, x1, y1 = (float(rect[0]), float(rect[1]),
float(rect[2]), float(rect[3]))
# Convert to pdfplumber coords (top-left origin)
top = page_height - y1
bottom = page_height - y0
if x0 >= x1 or top >= bottom:
continue
# Extract only the text inside the hyperlink rectangle
try:
link_text = (
page_plumber.within_bbox((x0, top, x1, bottom))
.extract_text() or ""
).strip()
except Exception:
continue
if not link_text:
continue # image-only link — skip
for pattern in unclear_patterns:
if re.search(pattern, link_text, re.IGNORECASE):
self.add_issue(
Severity.WARNING,
"Link Text",
f"Page {i+1}: Unclear link text \"{link_text}\" — should describe the destination",
wcag_criterion="2.4.4",
recommendation="Use descriptive link text that makes sense out of context",
page_number=i+1
)
page_flagged = True
break # one issue per link is enough
if page_flagged:
break # one issue per page
def _check_headings(self):
"""Check heading structure and hierarchy"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/StructTreeRoot" not in catalog:
self.add_issue(
Severity.ERROR, "Headings",
"No structure tree - cannot verify heading hierarchy",
wcag_criterion="1.3.1",
recommendation="Tag document with proper heading structure")
return
struct_tree = catalog["/StructTreeRoot"]
if hasattr(struct_tree, 'get_object'):
struct_tree = struct_tree.get_object()
# Load RoleMap so custom tag names (e.g. /Heading1) resolve to standard ones (/H1)
role_map = {}
if "/RoleMap" in struct_tree:
rm = struct_tree["/RoleMap"]
if hasattr(rm, 'get_object'):
rm = rm.get_object()
try:
for key, value in rm.items():
role_map[str(key)] = str(value)
except (AttributeError, TypeError):
pass
headings = []
HEADING_TAGS = {"/H1", "/H2", "/H3", "/H4", "/H5", "/H6"}
def walk_tree(element, depth=0):
if depth > 100:
return
try:
if hasattr(element, 'get_object'):
element = element.get_object()
if isinstance(element, dict):
tag = str(element.get("/S", ""))
mapped_tag = role_map.get(tag, tag)
if mapped_tag in HEADING_TAGS:
headings.append(int(mapped_tag[2]))
kids = element.get("/K", [])
if isinstance(kids, list):
for kid in kids:
walk_tree(kid, depth + 1)
elif kids:
walk_tree(kids, depth + 1)
except (AttributeError, TypeError, KeyError):
pass
try:
walk_tree(struct_tree)
except Exception as e:
logger.warning(f"Could not fully parse structure tree: {e}")
if not headings:
self.add_issue(
Severity.WARNING, "Headings",
"No heading tags (H1-H6) found in structure tree",
wcag_criterion="1.3.1",
recommendation="Add heading tags to establish document hierarchy")
return
if headings[0] != 1:
self.add_issue(
Severity.ERROR, "Headings",
f"Document does not start with H1 (starts with H{headings[0]})",
wcag_criterion="1.3.1",
recommendation="First heading should be H1")
for i in range(1, len(headings)):
if headings[i] > headings[i - 1] + 1:
self.add_issue(
Severity.WARNING, "Headings",
f"Heading level skipped: H{headings[i - 1]} to H{headings[i]}",
wcag_criterion="1.3.1",
recommendation="Do not skip heading levels")
heading_str = ", ".join(f"H{h}" for h in headings[:10])
if len(headings) > 10:
heading_str += "..."
has_issues = any(
i.severity in [Severity.ERROR, Severity.WARNING]
for i in self.issues if i.category == "Headings"
)
self.add_issue(
Severity.INFO if has_issues else Severity.SUCCESS, "Headings",
f"Found {len(headings)} headings: {heading_str}",
wcag_criterion="1.3.1")
def _check_tab_order(self):
"""Check tab order is set for pages"""
pages_without_tabs = []
for i, page in enumerate(self.pdf_reader.pages):
if "/Tabs" not in page:
pages_without_tabs.append(i + 1)
if pages_without_tabs:
if len(pages_without_tabs) == len(self.pdf_reader.pages):
self.add_issue(
Severity.ERROR, "Tab Order",
"No pages have tab order defined",
wcag_criterion="2.4.3",
recommendation="Set /Tabs to /S (structure order) for all pages")
else:
self.add_issue(
Severity.WARNING, "Tab Order",
f"{len(pages_without_tabs)} page(s) missing tab order",
wcag_criterion="2.4.3",
recommendation="Set /Tabs entry on all pages")
else:
tab_types = set()
for page in self.pdf_reader.pages:
tab_types.add(str(page.get("/Tabs", "")))
self.add_issue(
Severity.SUCCESS, "Tab Order",
f"Tab order set on all pages (types: {', '.join(tab_types)})",
wcag_criterion="2.4.3")
def _check_role_mapping(self):
"""Check role mapping for custom tags"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/StructTreeRoot" not in catalog:
return # Already flagged by heading/structure checks
struct_tree = catalog["/StructTreeRoot"]
if hasattr(struct_tree, 'get_object'):
struct_tree = struct_tree.get_object()
if "/RoleMap" in struct_tree:
role_map = struct_tree["/RoleMap"]
if hasattr(role_map, 'get_object'):
role_map = role_map.get_object()
standard_roles = {
"/P", "/H1", "/H2", "/H3", "/H4", "/H5", "/H6",
"/Table", "/TR", "/TD", "/TH", "/L", "/LI", "/Lbl",
"/LBody", "/Span", "/Link", "/Figure", "/Form",
"/Sect", "/Art", "/Div", "/BlockQuote", "/TOC", "/TOCI"
}
mapped = {}
try:
for key, value in role_map.items():
mapped[key] = str(value)
except (AttributeError, TypeError):
pass
unmapped = {k: v for k, v in mapped.items() if v not in standard_roles}
if unmapped:
self.add_issue(
Severity.WARNING, "Role Mapping",
f"{len(unmapped)} custom role(s) map to non-standard tags",
wcag_criterion="1.3.1",
recommendation="Ensure all custom roles map to standard PDF tags")
else:
self.add_issue(
Severity.SUCCESS, "Role Mapping",
f"All {len(mapped)} custom roles correctly mapped",
wcag_criterion="1.3.1")
else:
self.add_issue(
Severity.INFO, "Role Mapping",
"No custom role mapping (document uses standard tags only)",
wcag_criterion="1.3.1")
def _check_forms(self):
"""Check form field accessibility"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/AcroForm" not in catalog:
return
acro_form = catalog["/AcroForm"]
if "/Fields" not in acro_form:
return
fields = acro_form["/Fields"]
field_issues = []
for field in fields:
field = field.get_object()
field_name = field.get("/T", "Unnamed")
has_tooltip = "/TU" in field
if not has_tooltip:
field_issues.append(field_name)
if field_issues:
self.add_issue(
Severity.ERROR,
"Forms",
f"{len(field_issues)} form field(s) missing descriptions/tooltips",
wcag_criterion="3.3.2, 4.1.2",
recommendation="Add tooltip descriptions to all form fields",
details={'fields': field_issues}
)
else:
self.add_issue(
Severity.SUCCESS,
"Forms",
f"All {len(fields)} form fields have descriptions",
wcag_criterion="3.3.2"
)
def _check_tables(self):
"""Check table accessibility using PDF structure tree (tagged tables)."""
catalog = self.pdf_reader.trailer.get("/Root", {})
struct_tree = catalog.get("/StructTreeRoot")
tables_found = 0
tables_ok = 0
if struct_tree:
def walk(node, depth=0):
nonlocal tables_found, tables_ok
if depth > 50:
return
try:
obj = node.get_object() if hasattr(node, 'get_object') else node
if not isinstance(obj, dict):
return
role = obj.get("/S") or obj.get("/Type")
if role and str(role) == "/Table":
tables_found += 1
ok = self._analyze_table(obj, tables_found)
if ok:
tables_ok += 1
return # don't recurse into table internals
kids = obj.get("/K", [])
if not isinstance(kids, list):
kids = [kids]
for kid in kids:
if kid is not None:
walk(kid, depth + 1)
except Exception:
pass
try:
walk(struct_tree)
except Exception as e:
logger.warning(f"Structure tree walk failed: {e}")
if tables_found == 0:
# Fallback: visual detection via pdfplumber (for untagged docs)
visual_tables = 0
for i, page in enumerate(self.pdf_plumber.pages):
try:
tbls = page.find_tables()
visual_tables += len(tbls)
except Exception:
pass
if visual_tables > 0:
self.add_issue(
Severity.WARNING,
"Tables",
f"{visual_tables} visual table(s) detected but not tagged in structure tree",
wcag_criterion="1.3.1",
recommendation="Tag tables with proper Table/TR/TH/TD structure elements"
)
else:
self.add_issue(
Severity.INFO,
"Tables",
"No tables detected in document",
wcag_criterion="1.3.1"
)
elif tables_ok == tables_found:
self.add_issue(
Severity.SUCCESS,
"Tables",
f"{tables_found} table(s) with proper header and scope structure",
wcag_criterion="1.3.1"
)
def _analyze_table(self, table_obj: dict, table_num: int) -> bool:
"""Analyse a single /Table structure element. Returns True if no issues found."""
kids = table_obj.get("/K", [])
if not isinstance(kids, list):
kids = [kids]
stats = {
'rows': 0, 'th_cells': 0, 'td_cells': 0,
'th_with_scope': 0, 'has_caption': False,
}
self._collect_table_stats(kids, stats)
issues_added = False
total_cells = stats['th_cells'] + stats['td_cells']
if stats['rows'] == 0 and total_cells == 0:
self.add_issue(
Severity.WARNING,
"Tables",
f"Table {table_num}: empty — no TR/TH/TD elements found in structure tree",
wcag_criterion="1.3.1",
recommendation="Ensure the table is properly tagged with TR rows and TH/TD cells"
)
return False
if stats['th_cells'] == 0:
self.add_issue(
Severity.ERROR,
"Tables",
f"Table {table_num}: no header cells (TH) — {stats['rows']} row(s), {total_cells} data cell(s). "
f"Screen readers cannot identify column or row headers.",
wcag_criterion="1.3.1",
recommendation="Mark header cells as TH with scope='col' (column headers) or scope='row' (row headers)"
)
issues_added = True
elif stats['th_with_scope'] < stats['th_cells']:
missing = stats['th_cells'] - stats['th_with_scope']
self.add_issue(
Severity.WARNING,
"Tables",
f"Table {table_num}: {missing} of {stats['th_cells']} TH header cell(s) missing scope attribute",
wcag_criterion="1.3.1",
recommendation="Add scope='col' to column headers and scope='row' to row headers"
)
issues_added = True
if not stats['has_caption'] and total_cells > 6:
self.add_issue(
Severity.INFO,
"Tables",
f"Table {table_num}: no Caption element ({stats['rows']} rows, ~{total_cells} cells). "
f"A Caption helps screen readers identify the table — ensure a visible title exists nearby.",
wcag_criterion="1.3.1",
recommendation="Add a Caption as the first child of the Table element if no visible title precedes it"
)
# Not counted as a hard issue — don't set issues_added = True
return not issues_added
def _collect_table_stats(self, kids: list, stats: dict, depth: int = 0):
"""Recursively collect structural stats from a table's children."""
if depth > 15:
return
for kid in kids:
try:
obj = kid.get_object() if hasattr(kid, 'get_object') else kid
if not isinstance(obj, dict):
continue
role = str(obj.get("/S") or obj.get("/Type") or "")
if role == "/TR":
stats['rows'] += 1
elif role == "/TH":
stats['th_cells'] += 1
if self._th_has_scope(obj):
stats['th_with_scope'] += 1
elif role == "/TD":
stats['td_cells'] += 1
elif role == "/Caption":
stats['has_caption'] = True
sub_kids = obj.get("/K", [])
if not isinstance(sub_kids, list):
sub_kids = [sub_kids]
if sub_kids:
self._collect_table_stats(sub_kids, stats, depth + 1)
except Exception:
continue
def _th_has_scope(self, th_obj: dict) -> bool:
"""Return True if a TH element carries a Scope attribute."""
attrs = th_obj.get("/A")
if not attrs:
return False
try:
# /A can be a single attribute dict or a list of dicts
a = attrs.get_object() if hasattr(attrs, 'get_object') else attrs
if isinstance(a, dict):
return "/Scope" in a
if isinstance(a, list):
for item in a:
try:
d = item.get_object() if hasattr(item, 'get_object') else item
if isinstance(d, dict) and "/Scope" in d:
return True
except Exception:
pass
except Exception:
pass
return False
def _check_reading_order(self):
"""Check reading order"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/StructTreeRoot" not in catalog:
self.add_issue(
Severity.ERROR,
"Reading Order",
"No structure tree - reading order cannot be determined",
wcag_criterion="1.3.2",
recommendation="Tag document to establish proper reading order"
)
else:
self.add_issue(
Severity.INFO,
"Reading Order",
"Structure tree present - verify reading order with screen reader",
wcag_criterion="1.3.2",
recommendation="Test with NVDA or JAWS to verify logical reading order"
)
def _check_fonts(self):
"""Check font embedding"""
embedded_count = 0
non_embedded_fonts: set = set()
for page in self.pdf_reader.pages:
resources = page.get("/Resources", {})
if "/Font" not in resources:
continue
fonts = resources["/Font"]
for font_key, font_ref in fonts.items():
try:
font_obj = font_ref.get_object()
except Exception:
continue
is_embedded = (
"/FontFile" in font_obj
or "/FontFile2" in font_obj
or "/FontFile3" in font_obj
or "/FontDescriptor" in font_obj and (
"/FontFile" in font_obj["/FontDescriptor"].get_object()
or "/FontFile2" in font_obj["/FontDescriptor"].get_object()
or "/FontFile3" in font_obj["/FontDescriptor"].get_object()
)
)
if is_embedded:
embedded_count += 1
else:
base_font = font_obj.get("/BaseFont", font_key)
non_embedded_fonts.add(str(base_font).lstrip('/'))
if non_embedded_fonts:
self.add_issue(
Severity.WARNING,
"Fonts",
f"{len(non_embedded_fonts)} fonts not embedded",
wcag_criterion="1.4.4",
recommendation="Embed all fonts for consistent rendering",
details={"non_embedded_fonts": sorted(non_embedded_fonts)}
)
def _check_security(self):
"""Check security settings"""
if self.pdf_reader.is_encrypted:
self.add_issue(
Severity.WARNING,
"Security",
"Document is encrypted",
recommendation="Ensure assistive technology can access content"
)
def _check_bookmarks(self):
"""Check navigation bookmarks"""
outlines = self.pdf_reader.outline
total_pages = len(self.pdf_reader.pages)
if not outlines and total_pages > 5:
self.add_issue(
Severity.INFO,
"Navigation",
"No bookmarks found",
wcag_criterion="2.4.5",
recommendation=f"Add bookmarks for {total_pages}-page document to aid navigation"
)
elif outlines:
self.add_issue(
Severity.SUCCESS,
"Navigation",
"Document has navigation bookmarks",
wcag_criterion="2.4.5"
)
def _check_verapdf_validation(self):
"""Run veraPDF PDF/UA validation"""
if not VeraPDFValidator:
logger.warning("veraPDF not available - skipping")
return
logger.info("Running veraPDF PDF/UA validation...")
try:
validator = VeraPDFValidator()
results = validator.validate(str(self.pdf_path))
if 'error' in results:
logger.warning(f"veraPDF validation error: {results['error']}")
return
self.verapdf_results = results
# Report compliance status
if results['compliant']:
self.add_issue(
Severity.SUCCESS,
"PDF/UA Compliance",
f"Document passes PDF/UA-1 validation ({results['passed_rules']} rules passed)",
wcag_criterion="PDF/UA",
recommendation="Document meets PDF/UA structure requirements"
)
else:
self.add_issue(
Severity.ERROR,
"PDF/UA Compliance",
f"Document fails PDF/UA-1 validation ({results['failed_rules']} rules failed, {results['failed_checks']} checks failed)",
wcag_criterion="PDF/UA",
recommendation="Fix structure issues reported by veraPDF"
)
# Add specific errors as issues
for error in results.get('errors', [])[:10]: # Limit to first 10
self.add_issue(
Severity.WARNING,
"PDF/UA Structure",
f"Clause {error['clause']}: {error['description'][:150]}",
wcag_criterion="PDF/UA",
recommendation="Consult veraPDF documentation for this clause"
)
logger.info(f"veraPDF: {results['passed_rules']} passed, {results['failed_rules']} failed")
except Exception as e:
logger.warning(f"veraPDF check error: {str(e)}")
def _analyze_remediation_options(self):
"""Analyze what can be auto-fixed"""
if not PDFRemediator:
return
logger.info("Analyzing auto-remediation options...")
try:
remediator = PDFRemediator(str(self.pdf_path))
suggestions = remediator.analyze_and_suggest_fixes()
self.remediation_suggestions = suggestions
# Count fixable issues
total_fixable = sum(
len([f for f in fixes if f.get('auto_fixable')])
for fixes in suggestions.values()
)
if total_fixable > 0:
logger.info(f"{total_fixable} issues can be auto-fixed")
else:
logger.info("No auto-fixable issues found")
except Exception as e:
logger.warning(f"Remediation analysis error: {str(e)}")
# ==================== HELPER METHODS ====================
def _extract_image_from_page(self, page, img_info) -> Optional[bytes]:
"""Extract image bytes from PDF page"""
try:
# Get image coordinates
x0, y0, x1, y1 = img_info['x0'], img_info['top'], img_info['x1'], img_info['bottom']
# Crop page to image area
cropped = page.crop((x0, y0, x1, y1))
# Convert to PIL Image
pil_image = cropped.to_image(resolution=150).original
# Convert to bytes
buffer = BytesIO()
pil_image.save(buffer, format='JPEG', quality=85)
return buffer.getvalue()
except Exception as e:
return None
def _image_data_size(self, image_data: bytes) -> int:
"""Return byte size of image data — used to filter out tiny decorative images."""
return len(image_data) if image_data else 0
def _generate_page_images(self, output_dir: Path, dpi: int = 150):
"""Generate PNG images for each page for visual display"""
if not self.generate_images:
return
logger.info("Generating page images for visual display...")
try:
from pdf2image import convert_from_path
except ImportError:
logger.warning("pdf2image not available - skipping page image generation")
return
try:
output_dir.mkdir(parents=True, exist_ok=True)
# Convert pages to images
# Store DPI for coordinate scaling
self.page_image_dpi = dpi
images = convert_from_path(
str(self.pdf_path),
dpi=dpi,
fmt='png'
)
for page_num, image in enumerate(images, start=1):
# Save as PNG
image_filename = f"page_{page_num}.png"
image_path = output_dir / image_filename
image.save(image_path, 'PNG')
self.page_images[page_num] = image_filename
logger.info(f"Page {page_num}/{len(images)}")
logger.info(f"Generated {len(images)} page images at {dpi} DPI")
except Exception as e:
logger.warning(f"Could not generate page images: {str(e)}")
# ==================== REPORTING ====================
def _build_matterhorn_summary(self) -> dict:
"""Build Matterhorn Protocol PDF/UA-1 checkpoint summary."""
# Map check names to Matterhorn checkpoint IDs
CHECK_TO_MATTERHORN = {
"Document Structure": ["01", "02", "09"],
"Metadata": ["06", "07"],
"Language Declaration": ["11"],
"Text Extractability": ["01", "08"],
"OCR Quality": ["08"],
"Image Accessibility": ["13"],
"Color Contrast": ["04"],
"Content Readability": [],
"Link Quality": ["27", "28"],
"Heading Structure": ["14"],
"Tab Order": ["28"],
"Role Mapping": ["02"],
"Form Accessibility": ["24", "28"],
"Table Structure": ["15"],
"Reading Order": ["09"],
"Font Accessibility": ["31"],
"Security Settings": ["26"],
"Navigation Aids": ["27"],
"PDF/UA Structure (veraPDF)": [], # Covers all M conditions
}
# Checkpoint definitions: id, name, how (M=machine/H=human)
CHECKPOINTS = [
("01", "Real content tagged", "M"),
("02", "Role mapping", "M"),
("03", "Flickering content", "H"),
("04", "Color and contrast", "H"),
("05", "Sound content", "H"),
("06", "Metadata title", "M"),
("07", "Metadata language", "M"),
("08", "Text content", "M"),
("09", "Reading order", "M"),
("10", "Tab order", "M"),
("11", "Natural language", "M"),
("12", "Character encoding", "M"),
("13", "Graphics / alt text", "H"),
("14", "Headings", "M"),
("15", "Tables", "M"),
("16", "Lists", "M"),
("17", "Mathematical expressions", "H"),
("18", "Page headers / footers", "H"),
("19", "Notes / references", "H"),
("20", "Optional content", "M"),
("21", "Embedded files", "M"),
("22", "Article threads", "H"),
("23", "Digital signatures", "H"),
("24", "Non-interactive forms", "H"),
("25", "XFA forms", "M"),
("26", "Security", "M"),
("27", "Navigation", "M"),
("28", "Annotations", "M"),
("29", "Actions", "M"),
("30", "XObjects", "M"),
("31", "Fonts", "M"),
]
# Build a map: checkpoint_id -> pass/fail/not_tested from our check results
cp_status: dict = {} # id -> "PASS" | "FAIL" | "NOT_TESTED"
check_name_to_result = {cr.check_name: cr.passed for cr in self.check_results}
# Determine which checkpoints are covered and whether they passed
for check_name, cp_ids in CHECK_TO_MATTERHORN.items():
result_passed = check_name_to_result.get(check_name)
if result_passed is None:
continue
for cp_id in cp_ids:
if cp_id not in cp_status:
cp_status[cp_id] = "PASS" if result_passed else "FAIL"
elif not result_passed:
# Any failure overrides a pass
cp_status[cp_id] = "FAIL"
# Handle PDF/UA veraPDF: if it passed, mark all M checkpoints as PASS unless already FAIL
verapdf_passed = check_name_to_result.get("PDF/UA Structure (veraPDF)")
if verapdf_passed:
for cp_id, _, how in CHECKPOINTS:
if how == "M" and cp_id not in cp_status:
cp_status[cp_id] = "PASS"
checkpoints_out = []
any_fail = False
for cp_id, cp_name, cp_how in CHECKPOINTS:
status = cp_status.get(cp_id, "NOT_TESTED")
if status == "FAIL":
any_fail = True
checkpoints_out.append({
"id": cp_id,
"name": cp_name,
"how": cp_how,
"status": status,
})
return {
"standard": "PDF/UA-1",
"overall_passed": not any_fail,
"checkpoints": checkpoints_out,
}
def _generate_summary(self) -> Dict[str, Any]:
"""Generate comprehensive summary"""
severity_counts = {
'critical': len([i for i in self.issues if i.severity == Severity.CRITICAL]),
'error': len([i for i in self.issues if i.severity == Severity.ERROR]),
'warning': len([i for i in self.issues if i.severity == Severity.WARNING]),
'info': len([i for i in self.issues if i.severity == Severity.INFO]),
'success': len([i for i in self.issues if i.severity == Severity.SUCCESS])
}
# Calculate score based on check-pass ratio
passed_checks = len([cr for cr in self.check_results if cr.passed])
total_checks = len(self.check_results)
base_score = round(100 * passed_checks / total_checks) if total_checks else 0
# Soft penalty for critical/error issues (capped at 20)
penalty = min(20, severity_counts['critical'] * 5 + severity_counts['error'] * 2)
score = max(0, base_score - penalty)
# Convert datetime objects to strings for JSON serialization
stats_serializable = {}
for key, value in self.stats.items():
if isinstance(value, datetime):
stats_serializable[key] = value.isoformat()
else:
stats_serializable[key] = value
# Count auto-fixable issues
auto_fixable_count = 0
if self.remediation_suggestions:
auto_fixable_count = sum(
len([f for f in fixes if f.get('auto_fixable')])
for fixes in self.remediation_suggestions.values()
)
# WCAG compliance summary
failing_criteria: set = set()
for issue in self.issues:
if issue.severity in (Severity.CRITICAL, Severity.ERROR):
for c in issue.wcag_criterion.split(','):
c = c.strip()
if c and c != 'PDF/UA':
failing_criteria.add(c)
level_a_fails = sorted([c for c in failing_criteria if WCAG_LEVELS.get(c) == 'A'])
level_aa_fails = sorted([c for c in failing_criteria if WCAG_LEVELS.get(c) in ('A', 'AA')])
wcag_compliance = {
'level_a': len(level_a_fails) == 0,
'level_aa': len(level_aa_fails) == 0,
'level_a_failures': level_a_fails,
'level_aa_failures': level_aa_fails,
}
# Prioritised next steps
next_steps = []
seen_recs: set = set()
for sev in (Severity.CRITICAL, Severity.ERROR, Severity.WARNING):
for issue in self.issues:
if issue.severity != sev:
continue
action = issue.recommendation or issue.description
if action in seen_recs:
continue
seen_recs.add(action)
next_steps.append({
'priority': 1 if sev == Severity.CRITICAL else 2 if sev == Severity.ERROR else 3,
'category': issue.category,
'action': action,
'wcag': issue.wcag_criterion,
'wcag_level': WCAG_LEVELS.get(issue.wcag_criterion.split(',')[0].strip(), ''),
})
if len(next_steps) >= 8:
break
if len(next_steps) >= 8:
break
return {
'filename': self.pdf_path.name,
'total_pages': len(self.pdf_reader.pages),
'accessibility_score': score,
'score_breakdown': {
'checks_passed': passed_checks,
'checks_total': total_checks,
'base_score': base_score,
'penalty': penalty,
'final_score': score,
'per_check': [
{'name': cr.check_name, 'passed': cr.passed}
for cr in self.check_results
]
},
'matterhorn_summary': self._build_matterhorn_summary(),
'severity_counts': severity_counts,
'total_issues': len(self.issues),
'auto_fixable_count': auto_fixable_count,
'stats': stats_serializable,
'page_images': self.page_images, # Map of page_num -> image_filename
'page_image_dpi': getattr(self, 'page_image_dpi', 150), # DPI for coordinate scaling
'verapdf_validation': self.verapdf_results,
'remediation_suggestions': self.remediation_suggestions,
'checks_performed': [
{
'name': cr.check_name,
'passed': cr.passed,
'duration': cr.duration
}
for cr in self.check_results
],
'issues': [issue.to_dict() for issue in self.issues],
'wcag_compliance': wcag_compliance,
'next_steps': next_steps,
}
def generate_json_report(self) -> str:
"""Generate JSON report"""
summary = self._generate_summary()
return json.dumps(summary, indent=2)
def run_full_check(self) -> Dict[str, Any]:
"""Alias for check_all - maintains backward compatibility"""
return self.check_all()
def to_dict(self) -> Dict[str, Any]:
"""Convert results to dictionary"""
return self._generate_summary()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(
description="Enterprise PDF Accessibility Checker",
epilog="Environment variables can be set in a .env file (see .env.example)"
)
parser.add_argument("pdf_file", help="PDF file to check")
parser.add_argument("--google-credentials", help="Path to Google Cloud credentials JSON (or set GOOGLE_APPLICATION_CREDENTIALS in .env)")
parser.add_argument("--google-key", help="Google API key string (or set GOOGLE_API_KEY in .env)")
parser.add_argument("--anthropic-key", help="Anthropic API key (or set ANTHROPIC_API_KEY in .env)")
parser.add_argument("--output", "-o", help="Output JSON file")
parser.add_argument("--quick", action="store_true", help="Quick mode - skip expensive checks (OCR, AI image analysis, color contrast)")
args = parser.parse_args()
# Load from .env file as defaults, CLI args override
config = {
'google_credentials_path': args.google_credentials or os.getenv('GOOGLE_APPLICATION_CREDENTIALS'),
'google_api_key': args.google_key or os.getenv('GOOGLE_API_KEY'),
'anthropic_api_key': args.anthropic_key or os.getenv('ANTHROPIC_API_KEY')
}
# Show what we're using
if args.quick:
print("⚡ Quick mode enabled - skipping expensive checks\n")
checker = EnterprisePDFChecker(args.pdf_file, config, quick_mode=args.quick)
summary = checker.check_all()
# Generate page images if output specified
if args.output:
output_path = Path(args.output)
images_dir = output_path.parent / f"{output_path.stem}_images"
checker._generate_page_images(images_dir)
report = checker.generate_json_report()
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"\n📄 Report saved: {args.output}")
if checker.page_images:
print(f"📸 Page images saved to: {images_dir}")
else:
print("\n" + "="*60)
print("SUMMARY")
print("="*60)
print(f"Score: {summary['accessibility_score']}/100")
print(f"Critical: {summary['severity_counts']['critical']}")
print(f"Errors: {summary['severity_counts']['error']}")
print(f"Warnings: {summary['severity_counts']['warning']}")
print(f"API Calls: {summary['stats']['api_calls']}")
print(f"Cost: ${summary['stats']['total_cost_estimate']:.2f}")
if __name__ == "__main__":
main()