#!/usr/bin/env python3 """ Enterprise PDF Accessibility Checker Quality-first comprehensive WCAG 2.1 validation Features: - Google Cloud Vision API for OCR and image analysis - Anthropic Claude for alt text validation and content analysis - Complete color contrast checking - Readability analysis - Form field validation - Heading structure analysis - Link quality checking - Comprehensive reporting """ import sys import os import json import re import base64 import hashlib import time import subprocess from pathlib import Path from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass, field, asdict from enum import Enum from datetime import datetime from io import BytesIO import traceback from concurrent.futures import ThreadPoolExecutor, as_completed # Load environment variables from .env file (optional) try: from dotenv import load_dotenv load_dotenv() except ImportError: # dotenv not installed, that's okay - will use environment variables pass # Setup logging from logger_config import setup_logger logger = setup_logger(__name__, "pdf_checker.log") # Import retry helper for API resilience from retry_helper import retry_with_backoff, safe_execute, RetryableError # Import remediation module try: from pdf_remediation import VeraPDFValidator, PDFRemediator except ImportError: logger.warning("Remediation module not found - auto-fix features disabled") VeraPDFValidator = None PDFRemediator = None # Core PDF libraries try: from pypdf import PdfReader, PdfWriter import pdfplumber from PIL import Image import numpy as np except ImportError: logger.error("Core libraries not installed") logger.error("Install: pip install pypdf pdfplumber pillow numpy") sys.exit(1) # OCR and analysis try: import pytesseract from pdf2image import convert_from_path except ImportError: logger.warning("OCR libraries not available. Install: pip install pytesseract pdf2image") pytesseract = None # Readability try: from textblob import TextBlob except ImportError: logger.warning("TextBlob not available. Install: pip install textblob") TextBlob = None # Google Cloud Vision try: from google.cloud import vision from google.cloud import documentai_v1 as documentai except ImportError: logger.warning("Google Cloud libraries not available") logger.info("Install: pip install google-cloud-vision google-cloud-documentai") vision = None # Anthropic Claude try: import anthropic except ImportError: logger.warning("Anthropic library not available") logger.info("Install: pip install anthropic") anthropic = None # Language detection try: from langdetect import detect as langdetect_detect, LangDetectException except ImportError: logger.warning("langdetect not available — language detection disabled") langdetect_detect = None LangDetectException = Exception # WCAG 2.1 criterion → conformance level WCAG_LEVELS: Dict[str, str] = { '1.1.1': 'A', '1.2.1': 'A', '1.2.2': 'A', '1.2.3': 'A', '1.2.4': 'AA', '1.2.5': 'AA', '1.3.1': 'A', '1.3.2': 'A', '1.3.3': 'A', '1.3.4': 'AA', '1.3.5': 'AA', '1.4.1': 'A', '1.4.2': 'A', '1.4.3': 'AA', '1.4.4': 'AA', '1.4.5': 'AA', '1.4.10': 'AA', '1.4.11': 'AA', '1.4.12': 'AA', '1.4.13': 'AA', '2.1.1': 'A', '2.1.2': 'A', '2.1.4': 'A', '2.2.1': 'A', '2.2.2': 'A', '2.3.1': 'A', '2.4.1': 'A', '2.4.2': 'A', '2.4.3': 'A', '2.4.4': 'A', '2.4.5': 'AA', '2.4.6': 'AA', '2.4.7': 'AA', '2.5.1': 'A', '2.5.2': 'A', '2.5.3': 'A', '2.5.4': 'A', '3.1.1': 'A', '3.1.2': 'AA', '3.1.5': 'AAA', '3.2.1': 'A', '3.2.2': 'A', '3.2.3': 'AA', '3.2.4': 'AA', '3.3.1': 'A', '3.3.2': 'A', '3.3.3': 'AA', '3.3.4': 'AA', '4.1.1': 'A', '4.1.2': 'A', '4.1.3': 'AA', } class Severity(Enum): """Issue severity levels""" CRITICAL = "CRITICAL" ERROR = "ERROR" WARNING = "WARNING" INFO = "INFO" SUCCESS = "SUCCESS" @dataclass class AccessibilityIssue: """Represents an accessibility issue""" severity: Severity category: str description: str page_number: Optional[int] = None recommendation: str = "" wcag_criterion: str = "" details: Dict[str, Any] = field(default_factory=dict) coordinates: Optional[Dict[str, float]] = None # x0, y0, x1, y1 for highlighting def to_dict(self): """Convert to dictionary for JSON serialization""" levels = [WCAG_LEVELS.get(c.strip(), '') for c in self.wcag_criterion.split(',') if c.strip()] levels = [l for l in levels if l] level_order = ['A', 'AA', 'AAA'] wcag_level = min(levels, key=lambda l: level_order.index(l)) if levels else '' return { 'severity': self.severity.value, 'category': self.category, 'description': self.description, 'page_number': self.page_number, 'recommendation': self.recommendation, 'wcag_criterion': self.wcag_criterion, 'wcag_level': wcag_level, 'details': self.details, 'coordinates': self.coordinates } @dataclass class CheckResult: """Results from a specific check""" check_name: str passed: bool issues: List[AccessibilityIssue] = field(default_factory=list) metadata: Dict[str, Any] = field(default_factory=dict) duration: float = 0.0 class CacheManager: """Manages caching of API results to reduce costs""" def __init__(self, cache_dir: str = ".cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) def get_cache_key(self, data: bytes, prefix: str = "") -> str: """Generate cache key from data""" hash_obj = hashlib.sha256(data) return f"{prefix}_{hash_obj.hexdigest()}" def get(self, key: str) -> Optional[Dict]: """Retrieve cached result""" cache_file = self.cache_dir / f"{key}.json" if cache_file.exists(): try: with open(cache_file, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError, OSError): return None return None def set(self, key: str, data: Dict): """Store result in cache""" cache_file = self.cache_dir / f"{key}.json" with open(cache_file, 'w') as f: json.dump(data, f) class ColorContrastChecker: """WCAG color contrast validation""" WCAG_AA_NORMAL = 4.5 WCAG_AA_LARGE = 3.0 WCAG_AAA_NORMAL = 7.0 WCAG_AAA_LARGE = 4.5 @staticmethod def get_luminance(rgb: Tuple[int, int, int]) -> float: """Calculate relative luminance per WCAG formula""" r, g, b = [x / 255.0 for x in rgb] r = r / 12.92 if r <= 0.03928 else ((r + 0.055) / 1.055) ** 2.4 g = g / 12.92 if g <= 0.03928 else ((g + 0.055) / 1.055) ** 2.4 b = b / 12.92 if b <= 0.03928 else ((b + 0.055) / 1.055) ** 2.4 return 0.2126 * r + 0.7152 * g + 0.0722 * b @staticmethod def calculate_contrast_ratio(color1: Tuple[int, int, int], color2: Tuple[int, int, int]) -> float: """Calculate WCAG contrast ratio""" l1 = ColorContrastChecker.get_luminance(color1) l2 = ColorContrastChecker.get_luminance(color2) lighter = max(l1, l2) darker = min(l1, l2) return (lighter + 0.05) / (darker + 0.05) @staticmethod def check_image_contrast(image: Image.Image, sample_size: int = 1000) -> Dict: """Sample image for contrast issues. Compares pixel pairs that are 8px apart vertically — more likely to cross a text-stroke / background boundary than adjacent pixels. Only considers pairs where luminance actually differs (|Δlum| > 0.08), which filters out uniform photo areas and focuses on real edges. """ if image.mode != 'RGB': image = image.convert('RGB') width, height = image.size rng = np.random.default_rng(seed=42) significant = [] # pairs that cross a meaningful light/dark boundary attempts = min(sample_size * 4, width * height // 20) for _ in range(attempts): x = int(rng.integers(0, width)) y = int(rng.integers(0, max(1, height - 9))) try: c1 = image.getpixel((x, y)) c2 = image.getpixel((x, y + 8)) l1 = ColorContrastChecker.get_luminance(c1) l2 = ColorContrastChecker.get_luminance(c2) if abs(l1 - l2) < 0.08: continue # near-uniform area (photo gradient, blank space) — skip ratio = ColorContrastChecker.calculate_contrast_ratio(c1, c2) significant.append({'ratio': ratio, 'colors': (c1, c2), 'position': (x, y)}) if len(significant) >= sample_size: break except (IndexError, TypeError, ValueError): continue if len(significant) < 20: return {'error': 'Insufficient contrast edges to analyse (image-only page)'} fail_aa = [s for s in significant if s['ratio'] < ColorContrastChecker.WCAG_AA_NORMAL] fail_large = [s for s in significant if s['ratio'] < ColorContrastChecker.WCAG_AA_LARGE] return { 'total_samples': len(significant), 'fail_aa_normal_count': len(fail_aa), 'fail_aa_large_count': len(fail_large), 'fail_aa_normal_percent': len(fail_aa) / len(significant) * 100, 'fail_aa_large_percent': len(fail_large) / len(significant) * 100, 'worst_ratio': min(s['ratio'] for s in significant), 'best_ratio': max(s['ratio'] for s in significant), 'avg_ratio': sum(s['ratio'] for s in significant) / len(significant), } class ReadabilityAnalyzer: """Content readability analysis""" @staticmethod def count_syllables(word: str) -> int: """Count syllables in a word""" word = word.lower().strip() vowels = 'aeiouy' syllable_count = 0 previous_was_vowel = False for char in word: is_vowel = char in vowels if is_vowel and not previous_was_vowel: syllable_count += 1 previous_was_vowel = is_vowel if word.endswith('e') and syllable_count > 1: syllable_count -= 1 return max(1, syllable_count) @staticmethod def analyze(text: str) -> Dict: """Comprehensive readability analysis""" if not text or len(text.strip()) < 50: return {'error': 'Insufficient text for analysis'} # Clean text text = re.sub(r'\s+', ' ', text.strip()) # Basic metrics sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if s.strip()] words = re.findall(r'\b\w+\b', text) if not sentences or not words: return {'error': 'Could not parse text'} total_sentences = len(sentences) total_words = len(words) total_syllables = sum(ReadabilityAnalyzer.count_syllables(w) for w in words) # Flesch Reading Ease (0-100, higher = easier) flesch_reading_ease = ( 206.835 - 1.015 * (total_words / total_sentences) - 84.6 * (total_syllables / total_words) ) # Flesch-Kincaid Grade Level fk_grade_level = ( 0.39 * (total_words / total_sentences) + 11.8 * (total_syllables / total_words) - 15.59 ) # Find issues long_sentences = [s for s in sentences if len(s.split()) > 25] complex_words = [w for w in words if ReadabilityAnalyzer.count_syllables(w) > 3] return { 'flesch_reading_ease': round(flesch_reading_ease, 2), 'flesch_kincaid_grade': round(fk_grade_level, 2), 'total_words': total_words, 'total_sentences': total_sentences, 'avg_words_per_sentence': round(total_words / total_sentences, 2), 'long_sentences_count': len(long_sentences), 'complex_words_count': len(complex_words), 'complex_words_percent': round(len(complex_words) / total_words * 100, 2) } class EnterprisePDFChecker: """Enterprise-grade PDF accessibility checker""" def __init__(self, pdf_path: str, config: Dict[str, Any] = None, quick_mode: bool = False, generate_images: bool = True): self.pdf_path = Path(pdf_path) self.config = config or {} self.quick_mode = quick_mode self.generate_images = generate_images self.issues: List[AccessibilityIssue] = [] self.check_results: List[CheckResult] = [] self.pdf_reader = None self.pdf_plumber = None self.cache = CacheManager() self.page_images: Dict[int, str] = {} # page_num -> image_path self.verapdf_results: Optional[Dict] = None self.remediation_suggestions: Optional[Dict] = None self._detected_lang: str = 'en' # detected language of the document # API clients self.vision_client = None self.anthropic_client = None self.api_timeout = 10.0 # 10 second timeout for API calls # Initialize API clients config = self.config google_creds_path = config.get('google_credentials_path') if google_creds_path and os.path.isfile(google_creds_path): # Valid credentials file exists os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = google_creds_path if vision: try: self.vision_client = vision.ImageAnnotatorClient() logger.info("Google Cloud Vision initialized with credentials file") except Exception as e: logger.warning(f"Google Vision initialization failed: {str(e)}") elif config.get('google_api_key'): # Use API key directly if vision: # Note: Vision API with API key requires different initialization # For now, store key for use in requests self.google_api_key = config['google_api_key'] logger.info(f"Using Google API key: {self.google_api_key[:20]}...") elif google_creds_path: # Path provided but file doesn't exist logger.warning(f"Google credentials file not found: {google_creds_path}") logger.warning("Skipping Google Cloud Vision (advanced OCR disabled)") if config.get('anthropic_api_key') and anthropic: try: self.anthropic_client = anthropic.Anthropic(api_key=config['anthropic_api_key']) logger.info("Anthropic Claude initialized") except Exception as e: logger.warning(f"Anthropic initialization failed: {str(e)}") # Stats self.stats = { 'start_time': datetime.now(), 'total_checks': 0, 'api_calls': 0, 'cached_calls': 0, 'total_cost_estimate': 0.0 } def add_issue(self, severity: Severity, category: str, description: str, **kwargs): """Add an accessibility issue""" issue = AccessibilityIssue( severity=severity, category=category, description=description, **kwargs ) self.issues.append(issue) # Per-check wall-clock timeouts (seconds). Heavy checks get more time. _CHECK_TIMEOUTS = { "Image Accessibility": 180, "OCR Quality": 180, "Color Contrast": 120, "PDF/UA Structure (veraPDF)": 120, "Content Readability": 60, } _DEFAULT_CHECK_TIMEOUT = 90 def run_check(self, check_func, check_name: str) -> CheckResult: """Run a check with a per-check timeout and record results.""" from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout start_time = time.time() result = CheckResult(check_name=check_name, passed=True) issues_before = len(self.issues) timeout = self._CHECK_TIMEOUTS.get(check_name, self._DEFAULT_CHECK_TIMEOUT) try: with ThreadPoolExecutor(max_workers=1) as ex: future = ex.submit(check_func) future.result(timeout=timeout) # Check passed if no critical/error issues added by THIS check new_issues = self.issues[issues_before:] critical_errors = [i for i in new_issues if i.severity in [Severity.CRITICAL, Severity.ERROR]] result.passed = len(critical_errors) == 0 except FuturesTimeout: logger.warning(f"{check_name} timed out after {timeout}s — skipping") self.add_issue( Severity.WARNING, check_name, f"Check timed out after {timeout}s and was skipped", details={'timeout': timeout} ) result.passed = False except Exception as e: self.add_issue( Severity.CRITICAL, check_name, f"Check failed with error: {str(e)}", details={'error': str(e), 'traceback': traceback.format_exc()} ) result.passed = False result.duration = time.time() - start_time self.check_results.append(result) self.stats['total_checks'] += 1 return result def check_all(self) -> Dict[str, Any]: """Run all accessibility checks""" logger.info("Enterprise PDF Accessibility Check") logger.info(f"File: {self.pdf_path.name}") logger.info("=" * 60) try: self.pdf_reader = PdfReader(str(self.pdf_path)) self.pdf_plumber = pdfplumber.open(str(self.pdf_path)) # Run all checks checks = [ (self._check_basic_structure, "Document Structure"), (self._check_metadata, "Metadata"), (self._check_language, "Language Declaration"), (self._check_text_extractability, "Text Extractability"), (self._check_ocr_quality, "OCR Quality"), (self._check_images_comprehensive, "Image Accessibility"), (self._check_color_contrast, "Color Contrast"), (self._check_readability, "Content Readability"), (self._check_links, "Link Quality"), (self._check_headings, "Heading Structure"), (self._check_tab_order, "Tab Order"), (self._check_role_mapping, "Role Mapping"), (self._check_forms, "Form Accessibility"), (self._check_tables, "Table Structure"), (self._check_reading_order, "Reading Order"), (self._check_fonts, "Font Accessibility"), (self._check_security, "Security Settings"), (self._check_bookmarks, "Navigation Aids"), (self._check_verapdf_validation, "PDF/UA Structure (veraPDF)"), ] for check_func, check_name in checks: logger.info(f"Running: {check_name}...") result = self.run_check(check_func, check_name) status = "PASS" if result.passed else "FAIL" logger.info(f"{status} ({result.duration:.2f}s)") # Analyze remediation options self._analyze_remediation_options() except Exception as e: self.add_issue( Severity.CRITICAL, "File Access", f"Could not process PDF: {str(e)}", details={'error': str(e)} ) finally: if self.pdf_plumber: self.pdf_plumber.close() self.stats['end_time'] = datetime.now() self.stats['duration'] = (self.stats['end_time'] - self.stats['start_time']).total_seconds() return self._generate_summary() # ==================== CORE CHECKS ==================== def _check_basic_structure(self): """Check PDF structure and tagging""" catalog = self.pdf_reader.trailer.get("/Root", {}) if "/MarkInfo" not in catalog: self.add_issue( Severity.CRITICAL, "Document Structure", "PDF is not tagged - completely inaccessible to screen readers", wcag_criterion="1.3.1, 4.1.2", recommendation="Tag the PDF using Adobe Acrobat Pro or authoring software" ) return mark_info = catalog.get("/MarkInfo", {}) marked = mark_info.get("/Marked", False) if not marked: self.add_issue( Severity.CRITICAL, "Document Structure", "PDF marked as untagged in metadata", wcag_criterion="1.3.1", recommendation="Enable document tagging" ) else: self.add_issue( Severity.SUCCESS, "Document Structure", "PDF is properly tagged", wcag_criterion="1.3.1" ) def _check_metadata(self): """Check document metadata""" meta = self.pdf_reader.metadata if not meta: self.add_issue( Severity.ERROR, "Metadata", "No document metadata found", wcag_criterion="2.4.2", recommendation="Add title, author, and subject metadata" ) return # Check title if not meta.title or not meta.title.strip(): self.add_issue( Severity.ERROR, "Metadata", "Document title is missing", wcag_criterion="2.4.2", recommendation="Add a descriptive title" ) else: self.add_issue( Severity.SUCCESS, "Metadata", f"Document has title: '{meta.title}'", wcag_criterion="2.4.2" ) # Check author if not meta.author or not meta.author.strip(): self.add_issue( Severity.WARNING, "Metadata", "Author information is missing", recommendation="Add author metadata" ) # Check subject if not meta.subject or not meta.subject.strip(): self.add_issue( Severity.INFO, "Metadata", "Subject/description is missing", recommendation="Add a brief description" ) def _check_language(self): """Check language declaration (WCAG 3.1.1) and detect actual content language.""" catalog = self.pdf_reader.trailer.get("/Root", {}) # --- Detect actual language from content --- sample_text = "" for page in self.pdf_plumber.pages[:3]: t = page.extract_text() if t: sample_text += t + " " if len(sample_text) > 500: break if langdetect_detect and len(sample_text.strip()) >= 50: try: self._detected_lang = langdetect_detect(sample_text) except LangDetectException: self._detected_lang = 'en' # --- Check declared /Lang --- if "/Lang" not in catalog: suggestion = self._detected_lang if self._detected_lang else 'en-US' # Map ISO 639-1 codes to BCP-47 tags lang_map = { 'uk': 'uk-UA', 'ru': 'ru-RU', 'de': 'de-DE', 'fr': 'fr-FR', 'es': 'es-ES', 'pl': 'pl-PL', 'it': 'it-IT', 'pt': 'pt-PT', 'nl': 'nl-NL', 'cs': 'cs-CZ', 'sk': 'sk-SK', 'ro': 'ro-RO', 'hu': 'hu-HU', 'bg': 'bg-BG', 'hr': 'hr-HR', 'ar': 'ar-SA', 'zh': 'zh-CN', 'ja': 'ja-JP', 'ko': 'ko-KR', 'en': 'en-US', } bcp47 = lang_map.get(self._detected_lang, self._detected_lang) self.add_issue( Severity.ERROR, "Language", "Document language not specified", wcag_criterion="3.1.1", recommendation=f"Set document language (detected content language: '{bcp47}')", details={'detected_language': self._detected_lang} ) else: declared_lang = str(catalog["/Lang"]).lower() # Compare declared lang prefix with detected lang declared_prefix = declared_lang.split('-')[0].split('_')[0] if (langdetect_detect and len(sample_text.strip()) >= 50 and self._detected_lang != 'en' # English is common false-positive and declared_prefix != self._detected_lang and self._detected_lang not in declared_prefix): self.add_issue( Severity.WARNING, "Language", f"Declared language '{catalog['/Lang']}' may not match content " f"(detected: '{self._detected_lang}')", wcag_criterion="3.1.1", recommendation="Verify the /Lang entry matches the document's actual language", details={'declared_language': str(catalog["/Lang"]), 'detected_language': self._detected_lang} ) else: self.add_issue( Severity.SUCCESS, "Language", f"Document language set to: {catalog['/Lang']}", wcag_criterion="3.1.1", details={'declared_language': str(catalog["/Lang"]), 'detected_language': self._detected_lang} ) def _check_text_extractability(self): """Check if text can be extracted""" total_pages = len(self.pdf_reader.pages) pages_without_text = 0 page_details = [] for i, page in enumerate(self.pdf_plumber.pages): text = page.extract_text() char_count = len(text) if text else 0 if char_count < 10: pages_without_text += 1 page_details.append(i + 1) if pages_without_text == total_pages: self.add_issue( Severity.CRITICAL, "Text Accessibility", "No extractable text found - document appears to be scanned images", wcag_criterion="1.1.1", recommendation="Run OCR or recreate from source with selectable text", details={'pages_affected': page_details} ) elif pages_without_text > 0: self.add_issue( Severity.WARNING, "Text Accessibility", f"{pages_without_text} of {total_pages} pages have no extractable text", wcag_criterion="1.1.1", recommendation="Review pages without text", details={'pages_affected': page_details} ) def _check_ocr_quality(self): """Check OCR quality if document appears scanned""" if not pytesseract: return if self.quick_mode: logger.info("Skipping OCR analysis (quick mode)") return logger.info("Running OCR analysis...") try: # Reduced DPI from 300 to 150 for faster processing images = convert_from_path(str(self.pdf_path), dpi=150, first_page=1, last_page=min(2, len(self.pdf_reader.pages))) for i, image in enumerate(images): # Get OCR data with confidence ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT) confidences = [int(c) for c in ocr_data['conf'] if c != '-1'] if confidences: avg_confidence = sum(confidences) / len(confidences) if avg_confidence < 60: self.add_issue( Severity.WARNING, "OCR Quality", f"Page {i+1}: Low OCR confidence ({avg_confidence:.1f}%)", wcag_criterion="1.1.1", recommendation="Poor scan quality - rescan or manual review needed", page_number=i+1, details={'confidence': avg_confidence} ) except Exception as e: logger.warning(f"OCR check skipped: {str(e)}") def _check_images_comprehensive(self): """Comprehensive image accessibility check with AI""" logger.info("Analyzing images with AI...") total_images = 0 analyzed_images = 0 # Collect all images first image_tasks = [] for page_num, page in enumerate(self.pdf_plumber.pages): images = page.images total_images += len(images) for img_idx, img in enumerate(images): try: image_data = self._extract_image_from_page(page, img) if image_data: # Include coordinates for highlighting coords = { 'x0': img['x0'], 'y0': img['top'], 'x1': img['x1'], 'y1': img['bottom'] } image_tasks.append((image_data, page_num + 1, img_idx + 1, coords)) except Exception as e: logger.warning(f"Failed to extract image on page {page_num + 1}: {str(e)}") if total_images == 0: self.add_issue( Severity.INFO, "Images", "No images found in document", wcag_criterion="1.1.1" ) return logger.info(f"Found {total_images} images to analyze...") # Cap analysis: skip very small images (likely decorative/icons) image_tasks = [t for t in image_tasks if self._image_data_size(t[0]) > 2048] # Limit to 10 images max — more would just waste API calls on brochure backgrounds MAX_IMAGES = 10 if len(image_tasks) > MAX_IMAGES: logger.info(f"Capping image analysis at {MAX_IMAGES} (of {len(image_tasks)}) images") image_tasks = image_tasks[:MAX_IMAGES] # Skip AI analysis in quick mode if self.quick_mode: logger.info("Skipping AI image analysis (quick mode)") self.add_issue( Severity.INFO, "Images", f"Found {total_images} images - run without --quick for AI analysis", wcag_criterion="1.1.1" ) return # Process images in parallel with progress updates def analyze_single_image(task_data): image_data, page_num, img_num, coords = task_data result = {'page': page_num, 'img': img_num, 'analyzed': False, 'coords': coords} try: # Check cache first cache_key = self.cache.get_cache_key(image_data, "claude_vision") cached_result = self.cache.get(cache_key) if cached_result: analysis = cached_result result['cached'] = True else: # Analyze with Claude (timeout via concurrent.futures) with ThreadPoolExecutor(max_workers=1) as img_exec: future = img_exec.submit(self._analyze_image_with_claude, image_data) try: analysis = future.result(timeout=30) except Exception: analysis = None if analysis and 'error' not in analysis: self.cache.set(cache_key, analysis) result['cached'] = False if analysis and 'error' not in analysis: result['analysis'] = analysis result['analyzed'] = True # Also check with Google Vision for additional data if self.vision_client: vision_analysis = self._analyze_image_with_google(image_data) if vision_analysis: result['vision_analysis'] = vision_analysis except Exception as e: result['error'] = str(e) return result # Use ThreadPoolExecutor for parallel processing max_workers = 5 if not self.quick_mode else 1 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(analyze_single_image, task): task for task in image_tasks} for future in as_completed(futures): try: result = future.result() analyzed_images += 1 cache_status = " (cached)" if result.get('cached') else "" logger.info(f"Analyzed image {analyzed_images}/{total_images} (Page {result['page']}){cache_status}") if result.get('analyzed'): self._process_image_analysis(result['analysis'], result['page'], result['img'], result.get('coords')) if result.get('cached'): self.stats['cached_calls'] += 1 else: self.stats['api_calls'] += 1 self.stats['total_cost_estimate'] += 0.015 if result.get('vision_analysis'): self._process_google_vision_results(result['vision_analysis'], result['page'], result['img'], result.get('coords')) if result.get('error'): logger.warning(f"Error analyzing image on page {result['page']}: {result['error']}") except Exception as e: logger.warning(f"Image analysis error: {str(e)}") logger.info(f"Completed analysis of {analyzed_images}/{total_images} images") @retry_with_backoff(max_retries=3, initial_delay=1.0) def _analyze_image_with_claude(self, image_bytes: bytes) -> Optional[Dict]: """Analyze image with Claude Vision (with automatic retry on failure)""" if not self.anthropic_client: return None try: base64_image = base64.b64encode(image_bytes).decode('utf-8') message = self.anthropic_client.messages.create( model="claude-sonnet-4-5-20250929", max_tokens=1024, timeout=self.api_timeout, messages=[ { "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": base64_image, }, }, { "type": "text", "text": """Analyze this image for PDF accessibility (WCAG 2.1): 1. Provide concise alt text (1-2 sentences, max 125 characters) 2. Is this decorative or informational? 3. Does it contain text? If yes, what text? 4. Does it use color as the only means of conveying information? 5. Are there any accessibility concerns? 6. Quality rating (1-10) if this were to be used in a PDF 7. For images of people: describe their role, action, or function — not physical appearance (race, ethnicity, age, gender, disability) unless directly relevant to the image's informational purpose. A human reviewer will verify descriptions of people. 8. If a brand name, logo, or product name is visible, use the specific brand name in the alt text (e.g., "Scotch tape" not "adhesive tape", "Nike Air Max" not "sneakers"). Respond in JSON format: { "alt_text": "...", "type": "decorative|informational|complex", "has_text": true|false, "text_content": "...", "color_only_info": true|false, "concerns": ["..."], "quality_rating": 1-10, "recommendation": "...", "contains_people": true|false, "brands_detected": ["..."] }""" } ], } ], ) response_text = message.content[0].text # Try to parse JSON from response json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: return json.loads(json_match.group()) return {'error': 'Could not parse response'} except Exception as e: return {'error': str(e)} @retry_with_backoff(max_retries=3, initial_delay=1.0) def _analyze_image_with_google(self, image_bytes: bytes) -> Optional[Dict]: """Analyze image with Google Vision (with automatic retry on failure)""" if not self.vision_client: return None try: image = vision.Image(content=image_bytes) # Multiple detection types with timeout response = self.vision_client.annotate_image( { 'image': image, 'features': [ {'type_': vision.Feature.Type.TEXT_DETECTION}, {'type_': vision.Feature.Type.LABEL_DETECTION}, {'type_': vision.Feature.Type.IMAGE_PROPERTIES}, {'type_': vision.Feature.Type.OBJECT_LOCALIZATION}, ], }, timeout=self.api_timeout ) self.stats['api_calls'] += 1 self.stats['total_cost_estimate'] += 0.0015 return { 'has_text': bool(response.text_annotations), 'text_content': response.text_annotations[0].description if response.text_annotations else None, 'labels': [label.description for label in response.label_annotations[:5]], 'objects': [obj.name for obj in response.localized_object_annotations] } except Exception as e: return {'error': str(e)} def _process_image_analysis(self, analysis: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None): """Process Claude's image analysis results""" # Check if text in image if analysis.get('has_text'): self.add_issue( Severity.ERROR, "Images - Text in Image", f"Page {page_num}, Image {img_num}: Contains text: '{analysis.get('text_content', '')[:50]}'", wcag_criterion="1.4.5", recommendation="Replace image with actual text or provide text alternative", page_number=page_num, details=analysis, coordinates=coordinates ) # Check alt text quality if analysis.get('type') == 'informational': alt_text = analysis.get('alt_text', '') if len(alt_text) > 125: self.add_issue( Severity.WARNING, "Images - Alt Text", f"Page {page_num}, Image {img_num}: Suggested alt text is too long ({len(alt_text)} chars)", wcag_criterion="1.1.1", recommendation=f"Shorten alt text. Suggested: '{alt_text[:100]}...'", page_number=page_num, coordinates=coordinates ) else: self.add_issue( Severity.INFO, "Images - Alt Text", f"Page {page_num}, Image {img_num}: Suggested alt text: '{alt_text}'", wcag_criterion="1.1.1", page_number=page_num, coordinates=coordinates ) # Check for color-only information if analysis.get('color_only_info'): self.add_issue( Severity.ERROR, "Images - Color Only", f"Page {page_num}, Image {img_num}: Uses color as only means of conveying information", wcag_criterion="1.4.1", recommendation="Add patterns, labels, or text descriptions", page_number=page_num, coordinates=coordinates ) # Flag images containing people for human review if analysis.get('contains_people'): self.add_issue( Severity.INFO, "Images - People", f"Page {page_num}, Image {img_num}: Image contains people — alt text description " "should be verified by a human reviewer to ensure ethical and accurate representation.", wcag_criterion="1.1.1", recommendation="Review alt text to confirm it describes role/action rather than physical appearance.", page_number=page_num, coordinates=coordinates ) # Note any detected brand names for reviewer awareness brands = [b for b in analysis.get('brands_detected', []) if b] if brands: self.add_issue( Severity.INFO, "Images - Brands", f"Page {page_num}, Image {img_num}: Brand name(s) detected: {', '.join(brands[:5])}. " "Verify the alt text uses the specific brand name.", wcag_criterion="1.1.1", page_number=page_num, coordinates=coordinates ) # Quality concerns — capped at 2 per image, downgraded to INFO # (these are advisory notes, not WCAG violations) concerns = analysis.get('concerns', []) for concern in concerns[:2]: self.add_issue( Severity.INFO, "Images - Quality", f"Page {page_num}, Image {img_num}: {concern}", wcag_criterion="1.1.1", page_number=page_num, coordinates=coordinates ) def _process_google_vision_results(self, results: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None): """Process Google Vision results — only report actionable findings.""" pass # Label detections alone are not accessibility issues; Claude already provides alt text def _check_color_contrast(self): """Check color contrast using image analysis""" logger.info("Checking color contrast...") if self.quick_mode: logger.info("Skipping detailed contrast analysis (quick mode)") return try: # Reduced DPI from 150 to 100 for faster processing images = convert_from_path(str(self.pdf_path), dpi=100, first_page=1, last_page=min(3, len(self.pdf_reader.pages))) for i, image in enumerate(images): contrast_results = ColorContrastChecker.check_image_contrast(image) if 'error' in contrast_results: continue # Only flag edges that actually cross a light/dark boundary (filtered in sampler). # >60% of those edges failing = genuine contrast problem. # 30-60% = worth a warning. Below 30% = pass. fail_pct = contrast_results['fail_aa_normal_percent'] if fail_pct > 60: self.add_issue( Severity.ERROR, "Color Contrast", f"Page {i+1}: {fail_pct:.1f}% of text-edge samples fail WCAG AA (4.5:1) — " f"low contrast text likely present", wcag_criterion="1.4.3", recommendation="Use Colour Contrast Analyser to identify and fix low-contrast text", page_number=i+1, details=contrast_results ) elif fail_pct > 30: self.add_issue( Severity.WARNING, "Color Contrast", f"Page {i+1}: {fail_pct:.1f}% of text-edge samples fail WCAG AA — " f"verify contrast manually with Colour Contrast Analyser", wcag_criterion="1.4.3", recommendation="Check text against its background using the Colour Contrast Analyser tool", page_number=i+1, details=contrast_results ) except Exception as e: logger.warning(f"Contrast check skipped: {str(e)}") def _check_readability(self): """Check content readability (language-aware: Flesch only for English).""" # Extract all text all_text = "" for page in self.pdf_plumber.pages: text = page.extract_text() if text: all_text += text + "\n" if len(all_text) < 100: return # Flesch Reading Ease is an English-only formula — skip for other languages is_english = self._detected_lang in ('en', 'en-us', 'en-gb') if is_english: analysis = ReadabilityAnalyzer.analyze(all_text) if 'error' in analysis: return # Check Flesch Reading Ease — readability is advisory, cap at WARNING if analysis['flesch_reading_ease'] < 60: self.add_issue( Severity.WARNING, "Readability", f"Content is difficult to read (Flesch score: {analysis['flesch_reading_ease']}/100)", wcag_criterion="3.1.5", recommendation="Simplify language to reach 8th-9th grade level (target score: 60+)", details=analysis ) # Check grade level if analysis['flesch_kincaid_grade'] > 10: self.add_issue( Severity.WARNING, "Readability", f"Content requires grade {analysis['flesch_kincaid_grade']} reading level", wcag_criterion="3.1.5", recommendation="Target grade 8-10 for general audiences", details=analysis ) # Long-sentence check is language-agnostic sentences = [s.strip() for s in re.split(r'[.!?]+', all_text) if s.strip()] long_sentences = [s for s in sentences if len(s.split()) > 25] if len(long_sentences) > 5: self.add_issue( Severity.INFO, "Readability", f"{len(long_sentences)} sentences exceed 25 words", wcag_criterion="3.1.5", recommendation="Break long sentences for better comprehension", details={'long_sentences_count': len(long_sentences), 'detected_language': self._detected_lang} ) def _check_links(self): """Check link quality (WCAG 2.4.4) — only checks actual hyperlink label text.""" unclear_patterns = [ # English r'\bclick here\b', r'\bhere\b', r'\bread more\b', r'\bmore\b', r'\bthis\b', r'\blink\b', # Ukrainian r'\bнатисніть тут\b', r'\bтут\b', r'\bдокладніше\b', r'\bбільше\b', r'\bцe\b', r'\bпосилання\b', # Russian r'\bнажмите здесь\b', r'\bздесь\b', r'\bподробнее\b', r'\bбольше\b', r'\bэто\b', r'\bссылка\b', # German r'\bhier klicken\b', r'\bhier\b', r'\bmehr lesen\b', r'\bmehr\b', r'\bdies\b', r'\blink\b', # French r'\bcliquez ici\b', r'\bici\b', r'\blire la suite\b', r'\bplus\b', r'\bceci\b', r'\blien\b', # Spanish r'\bhaz clic aquí\b', r'\baquí\b', r'\beer más\b', r'\bmás\b', r'\besto\b', r'\benlace\b', # Polish r'\bkliknij tutaj\b', r'\btutaj\b', r'\bczytaj więcej\b', r'\bwięcej\b', r'\bto\b', r'\blink\b', ] for i, (page_plumber, page_pypdf) in enumerate( zip(self.pdf_plumber.pages, self.pdf_reader.pages) ): annots_raw = page_pypdf.get("/Annots") if not annots_raw: continue page_height = float(page_plumber.height) page_flagged = False for annot_ref in annots_raw: try: annot = annot_ref.get_object() except Exception: continue # Only process URI hyperlinks if annot.get("/Subtype") != "/Link": continue action = annot.get("/A") if not action or action.get("/S") != "/URI": continue # Get annotation bounding box (PDF coords: bottom-left origin) rect = annot.get("/Rect") if not rect or len(rect) < 4: continue x0, y0, x1, y1 = (float(rect[0]), float(rect[1]), float(rect[2]), float(rect[3])) # Convert to pdfplumber coords (top-left origin) top = page_height - y1 bottom = page_height - y0 if x0 >= x1 or top >= bottom: continue # Extract only the text inside the hyperlink rectangle try: link_text = ( page_plumber.within_bbox((x0, top, x1, bottom)) .extract_text() or "" ).strip() except Exception: continue if not link_text: continue # image-only link — skip for pattern in unclear_patterns: if re.search(pattern, link_text, re.IGNORECASE): self.add_issue( Severity.WARNING, "Link Text", f"Page {i+1}: Unclear link text \"{link_text}\" — should describe the destination", wcag_criterion="2.4.4", recommendation="Use descriptive link text that makes sense out of context", page_number=i+1 ) page_flagged = True break # one issue per link is enough if page_flagged: break # one issue per page def _check_headings(self): """Check heading structure and hierarchy""" catalog = self.pdf_reader.trailer.get("/Root", {}) if "/StructTreeRoot" not in catalog: self.add_issue( Severity.ERROR, "Headings", "No structure tree - cannot verify heading hierarchy", wcag_criterion="1.3.1", recommendation="Tag document with proper heading structure") return struct_tree = catalog["/StructTreeRoot"] if hasattr(struct_tree, 'get_object'): struct_tree = struct_tree.get_object() # Load RoleMap so custom tag names (e.g. /Heading1) resolve to standard ones (/H1) role_map = {} if "/RoleMap" in struct_tree: rm = struct_tree["/RoleMap"] if hasattr(rm, 'get_object'): rm = rm.get_object() try: for key, value in rm.items(): role_map[str(key)] = str(value) except (AttributeError, TypeError): pass headings = [] HEADING_TAGS = {"/H1", "/H2", "/H3", "/H4", "/H5", "/H6"} def walk_tree(element, depth=0): if depth > 100: return try: if hasattr(element, 'get_object'): element = element.get_object() if isinstance(element, dict): tag = str(element.get("/S", "")) mapped_tag = role_map.get(tag, tag) if mapped_tag in HEADING_TAGS: headings.append(int(mapped_tag[2])) kids = element.get("/K", []) if isinstance(kids, list): for kid in kids: walk_tree(kid, depth + 1) elif kids: walk_tree(kids, depth + 1) except (AttributeError, TypeError, KeyError): pass try: walk_tree(struct_tree) except Exception as e: logger.warning(f"Could not fully parse structure tree: {e}") if not headings: self.add_issue( Severity.WARNING, "Headings", "No heading tags (H1-H6) found in structure tree", wcag_criterion="1.3.1", recommendation="Add heading tags to establish document hierarchy") return if headings[0] != 1: self.add_issue( Severity.ERROR, "Headings", f"Document does not start with H1 (starts with H{headings[0]})", wcag_criterion="1.3.1", recommendation="First heading should be H1") for i in range(1, len(headings)): if headings[i] > headings[i - 1] + 1: self.add_issue( Severity.WARNING, "Headings", f"Heading level skipped: H{headings[i - 1]} to H{headings[i]}", wcag_criterion="1.3.1", recommendation="Do not skip heading levels") heading_str = ", ".join(f"H{h}" for h in headings[:10]) if len(headings) > 10: heading_str += "..." has_issues = any( i.severity in [Severity.ERROR, Severity.WARNING] for i in self.issues if i.category == "Headings" ) self.add_issue( Severity.INFO if has_issues else Severity.SUCCESS, "Headings", f"Found {len(headings)} headings: {heading_str}", wcag_criterion="1.3.1") def _check_tab_order(self): """Check tab order is set for pages""" pages_without_tabs = [] for i, page in enumerate(self.pdf_reader.pages): if "/Tabs" not in page: pages_without_tabs.append(i + 1) if pages_without_tabs: if len(pages_without_tabs) == len(self.pdf_reader.pages): self.add_issue( Severity.ERROR, "Tab Order", "No pages have tab order defined", wcag_criterion="2.4.3", recommendation="Set /Tabs to /S (structure order) for all pages") else: self.add_issue( Severity.WARNING, "Tab Order", f"{len(pages_without_tabs)} page(s) missing tab order", wcag_criterion="2.4.3", recommendation="Set /Tabs entry on all pages") else: tab_types = set() for page in self.pdf_reader.pages: tab_types.add(str(page.get("/Tabs", ""))) self.add_issue( Severity.SUCCESS, "Tab Order", f"Tab order set on all pages (types: {', '.join(tab_types)})", wcag_criterion="2.4.3") def _check_role_mapping(self): """Check role mapping for custom tags""" catalog = self.pdf_reader.trailer.get("/Root", {}) if "/StructTreeRoot" not in catalog: return # Already flagged by heading/structure checks struct_tree = catalog["/StructTreeRoot"] if hasattr(struct_tree, 'get_object'): struct_tree = struct_tree.get_object() if "/RoleMap" in struct_tree: role_map = struct_tree["/RoleMap"] if hasattr(role_map, 'get_object'): role_map = role_map.get_object() standard_roles = { "/P", "/H1", "/H2", "/H3", "/H4", "/H5", "/H6", "/Table", "/TR", "/TD", "/TH", "/L", "/LI", "/Lbl", "/LBody", "/Span", "/Link", "/Figure", "/Form", "/Sect", "/Art", "/Div", "/BlockQuote", "/TOC", "/TOCI" } mapped = {} try: for key, value in role_map.items(): mapped[key] = str(value) except (AttributeError, TypeError): pass unmapped = {k: v for k, v in mapped.items() if v not in standard_roles} if unmapped: self.add_issue( Severity.WARNING, "Role Mapping", f"{len(unmapped)} custom role(s) map to non-standard tags", wcag_criterion="1.3.1", recommendation="Ensure all custom roles map to standard PDF tags") else: self.add_issue( Severity.SUCCESS, "Role Mapping", f"All {len(mapped)} custom roles correctly mapped", wcag_criterion="1.3.1") else: self.add_issue( Severity.INFO, "Role Mapping", "No custom role mapping (document uses standard tags only)", wcag_criterion="1.3.1") def _check_forms(self): """Check form field accessibility""" catalog = self.pdf_reader.trailer.get("/Root", {}) if "/AcroForm" not in catalog: return acro_form = catalog["/AcroForm"] if "/Fields" not in acro_form: return fields = acro_form["/Fields"] field_issues = [] for field in fields: field = field.get_object() field_name = field.get("/T", "Unnamed") has_tooltip = "/TU" in field if not has_tooltip: field_issues.append(field_name) if field_issues: self.add_issue( Severity.ERROR, "Forms", f"{len(field_issues)} form field(s) missing descriptions/tooltips", wcag_criterion="3.3.2, 4.1.2", recommendation="Add tooltip descriptions to all form fields", details={'fields': field_issues} ) else: self.add_issue( Severity.SUCCESS, "Forms", f"All {len(fields)} form fields have descriptions", wcag_criterion="3.3.2" ) def _check_tables(self): """Check table accessibility using PDF structure tree (tagged tables).""" catalog = self.pdf_reader.trailer.get("/Root", {}) struct_tree = catalog.get("/StructTreeRoot") tables_found = 0 tables_ok = 0 if struct_tree: def walk(node, depth=0): nonlocal tables_found, tables_ok if depth > 50: return try: obj = node.get_object() if hasattr(node, 'get_object') else node if not isinstance(obj, dict): return role = obj.get("/S") or obj.get("/Type") if role and str(role) == "/Table": tables_found += 1 ok = self._analyze_table(obj, tables_found) if ok: tables_ok += 1 return # don't recurse into table internals kids = obj.get("/K", []) if not isinstance(kids, list): kids = [kids] for kid in kids: if kid is not None: walk(kid, depth + 1) except Exception: pass try: walk(struct_tree) except Exception as e: logger.warning(f"Structure tree walk failed: {e}") if tables_found == 0: # Fallback: visual detection via pdfplumber (for untagged docs) visual_tables = 0 for i, page in enumerate(self.pdf_plumber.pages): try: tbls = page.find_tables() visual_tables += len(tbls) except Exception: pass if visual_tables > 0: self.add_issue( Severity.WARNING, "Tables", f"{visual_tables} visual table(s) detected but not tagged in structure tree", wcag_criterion="1.3.1", recommendation="Tag tables with proper Table/TR/TH/TD structure elements" ) else: self.add_issue( Severity.INFO, "Tables", "No tables detected in document", wcag_criterion="1.3.1" ) elif tables_ok == tables_found: self.add_issue( Severity.SUCCESS, "Tables", f"{tables_found} table(s) with proper header and scope structure", wcag_criterion="1.3.1" ) def _analyze_table(self, table_obj: dict, table_num: int) -> bool: """Analyse a single /Table structure element. Returns True if no issues found.""" kids = table_obj.get("/K", []) if not isinstance(kids, list): kids = [kids] stats = { 'rows': 0, 'th_cells': 0, 'td_cells': 0, 'th_with_scope': 0, 'has_caption': False, } self._collect_table_stats(kids, stats) issues_added = False total_cells = stats['th_cells'] + stats['td_cells'] if stats['rows'] == 0 and total_cells == 0: self.add_issue( Severity.WARNING, "Tables", f"Table {table_num}: empty — no TR/TH/TD elements found in structure tree", wcag_criterion="1.3.1", recommendation="Ensure the table is properly tagged with TR rows and TH/TD cells" ) return False if stats['th_cells'] == 0: self.add_issue( Severity.ERROR, "Tables", f"Table {table_num}: no header cells (TH) — {stats['rows']} row(s), {total_cells} data cell(s). " f"Screen readers cannot identify column or row headers.", wcag_criterion="1.3.1", recommendation="Mark header cells as TH with scope='col' (column headers) or scope='row' (row headers)" ) issues_added = True elif stats['th_with_scope'] < stats['th_cells']: missing = stats['th_cells'] - stats['th_with_scope'] self.add_issue( Severity.WARNING, "Tables", f"Table {table_num}: {missing} of {stats['th_cells']} TH header cell(s) missing scope attribute", wcag_criterion="1.3.1", recommendation="Add scope='col' to column headers and scope='row' to row headers" ) issues_added = True if not stats['has_caption'] and total_cells > 6: self.add_issue( Severity.INFO, "Tables", f"Table {table_num}: no Caption element ({stats['rows']} rows, ~{total_cells} cells). " f"A Caption helps screen readers identify the table — ensure a visible title exists nearby.", wcag_criterion="1.3.1", recommendation="Add a Caption as the first child of the Table element if no visible title precedes it" ) # Not counted as a hard issue — don't set issues_added = True return not issues_added def _collect_table_stats(self, kids: list, stats: dict, depth: int = 0): """Recursively collect structural stats from a table's children.""" if depth > 15: return for kid in kids: try: obj = kid.get_object() if hasattr(kid, 'get_object') else kid if not isinstance(obj, dict): continue role = str(obj.get("/S") or obj.get("/Type") or "") if role == "/TR": stats['rows'] += 1 elif role == "/TH": stats['th_cells'] += 1 if self._th_has_scope(obj): stats['th_with_scope'] += 1 elif role == "/TD": stats['td_cells'] += 1 elif role == "/Caption": stats['has_caption'] = True sub_kids = obj.get("/K", []) if not isinstance(sub_kids, list): sub_kids = [sub_kids] if sub_kids: self._collect_table_stats(sub_kids, stats, depth + 1) except Exception: continue def _th_has_scope(self, th_obj: dict) -> bool: """Return True if a TH element carries a Scope attribute.""" attrs = th_obj.get("/A") if not attrs: return False try: # /A can be a single attribute dict or a list of dicts a = attrs.get_object() if hasattr(attrs, 'get_object') else attrs if isinstance(a, dict): return "/Scope" in a if isinstance(a, list): for item in a: try: d = item.get_object() if hasattr(item, 'get_object') else item if isinstance(d, dict) and "/Scope" in d: return True except Exception: pass except Exception: pass return False def _check_reading_order(self): """Check reading order""" catalog = self.pdf_reader.trailer.get("/Root", {}) if "/StructTreeRoot" not in catalog: self.add_issue( Severity.ERROR, "Reading Order", "No structure tree - reading order cannot be determined", wcag_criterion="1.3.2", recommendation="Tag document to establish proper reading order" ) else: self.add_issue( Severity.INFO, "Reading Order", "Structure tree present - verify reading order with screen reader", wcag_criterion="1.3.2", recommendation="Test with NVDA or JAWS to verify logical reading order" ) def _check_fonts(self): """Check font embedding""" embedded_count = 0 non_embedded_fonts: set = set() for page in self.pdf_reader.pages: resources = page.get("/Resources", {}) if "/Font" not in resources: continue fonts = resources["/Font"] for font_key, font_ref in fonts.items(): try: font_obj = font_ref.get_object() except Exception: continue is_embedded = ( "/FontFile" in font_obj or "/FontFile2" in font_obj or "/FontFile3" in font_obj or "/FontDescriptor" in font_obj and ( "/FontFile" in font_obj["/FontDescriptor"].get_object() or "/FontFile2" in font_obj["/FontDescriptor"].get_object() or "/FontFile3" in font_obj["/FontDescriptor"].get_object() ) ) if is_embedded: embedded_count += 1 else: base_font = font_obj.get("/BaseFont", font_key) non_embedded_fonts.add(str(base_font).lstrip('/')) if non_embedded_fonts: self.add_issue( Severity.WARNING, "Fonts", f"{len(non_embedded_fonts)} fonts not embedded", wcag_criterion="1.4.4", recommendation="Embed all fonts for consistent rendering", details={"non_embedded_fonts": sorted(non_embedded_fonts)} ) def _check_security(self): """Check security settings""" if self.pdf_reader.is_encrypted: self.add_issue( Severity.WARNING, "Security", "Document is encrypted", recommendation="Ensure assistive technology can access content" ) def _check_bookmarks(self): """Check navigation bookmarks""" outlines = self.pdf_reader.outline total_pages = len(self.pdf_reader.pages) if not outlines and total_pages > 5: self.add_issue( Severity.INFO, "Navigation", "No bookmarks found", wcag_criterion="2.4.5", recommendation=f"Add bookmarks for {total_pages}-page document to aid navigation" ) elif outlines: self.add_issue( Severity.SUCCESS, "Navigation", "Document has navigation bookmarks", wcag_criterion="2.4.5" ) def _check_verapdf_validation(self): """Run veraPDF PDF/UA validation""" if not VeraPDFValidator: logger.warning("veraPDF not available - skipping") return logger.info("Running veraPDF PDF/UA validation...") try: validator = VeraPDFValidator() results = validator.validate(str(self.pdf_path)) if 'error' in results: logger.warning(f"veraPDF validation error: {results['error']}") return self.verapdf_results = results # Report compliance status if results['compliant']: self.add_issue( Severity.SUCCESS, "PDF/UA Compliance", f"Document passes PDF/UA-1 validation ({results['passed_rules']} rules passed)", wcag_criterion="PDF/UA", recommendation="Document meets PDF/UA structure requirements" ) else: self.add_issue( Severity.ERROR, "PDF/UA Compliance", f"Document fails PDF/UA-1 validation ({results['failed_rules']} rules failed, {results['failed_checks']} checks failed)", wcag_criterion="PDF/UA", recommendation="Fix structure issues reported by veraPDF" ) # Add specific errors as issues for error in results.get('errors', [])[:10]: # Limit to first 10 self.add_issue( Severity.WARNING, "PDF/UA Structure", f"Clause {error['clause']}: {error['description'][:150]}", wcag_criterion="PDF/UA", recommendation="Consult veraPDF documentation for this clause" ) logger.info(f"veraPDF: {results['passed_rules']} passed, {results['failed_rules']} failed") except Exception as e: logger.warning(f"veraPDF check error: {str(e)}") def _analyze_remediation_options(self): """Analyze what can be auto-fixed""" if not PDFRemediator: return logger.info("Analyzing auto-remediation options...") try: remediator = PDFRemediator(str(self.pdf_path)) suggestions = remediator.analyze_and_suggest_fixes() self.remediation_suggestions = suggestions # Count fixable issues total_fixable = sum( len([f for f in fixes if f.get('auto_fixable')]) for fixes in suggestions.values() ) if total_fixable > 0: logger.info(f"{total_fixable} issues can be auto-fixed") else: logger.info("No auto-fixable issues found") except Exception as e: logger.warning(f"Remediation analysis error: {str(e)}") # ==================== HELPER METHODS ==================== def _extract_image_from_page(self, page, img_info) -> Optional[bytes]: """Extract image bytes from PDF page""" try: # Get image coordinates x0, y0, x1, y1 = img_info['x0'], img_info['top'], img_info['x1'], img_info['bottom'] # Crop page to image area cropped = page.crop((x0, y0, x1, y1)) # Convert to PIL Image pil_image = cropped.to_image(resolution=150).original # Convert to bytes buffer = BytesIO() pil_image.save(buffer, format='JPEG', quality=85) return buffer.getvalue() except Exception as e: return None def _image_data_size(self, image_data: bytes) -> int: """Return byte size of image data — used to filter out tiny decorative images.""" return len(image_data) if image_data else 0 def _generate_page_images(self, output_dir: Path, dpi: int = 150): """Generate PNG images for each page for visual display""" if not self.generate_images: return logger.info("Generating page images for visual display...") try: from pdf2image import convert_from_path except ImportError: logger.warning("pdf2image not available - skipping page image generation") return try: output_dir.mkdir(parents=True, exist_ok=True) # Convert pages to images # Store DPI for coordinate scaling self.page_image_dpi = dpi images = convert_from_path( str(self.pdf_path), dpi=dpi, fmt='png' ) for page_num, image in enumerate(images, start=1): # Save as PNG image_filename = f"page_{page_num}.png" image_path = output_dir / image_filename image.save(image_path, 'PNG') self.page_images[page_num] = image_filename logger.info(f"Page {page_num}/{len(images)}") logger.info(f"Generated {len(images)} page images at {dpi} DPI") except Exception as e: logger.warning(f"Could not generate page images: {str(e)}") # ==================== REPORTING ==================== def _build_matterhorn_summary(self) -> dict: """Build Matterhorn Protocol PDF/UA-1 checkpoint summary.""" # Map check names to Matterhorn checkpoint IDs CHECK_TO_MATTERHORN = { "Document Structure": ["01", "02", "09"], "Metadata": ["06", "07"], "Language Declaration": ["11"], "Text Extractability": ["01", "08"], "OCR Quality": ["08"], "Image Accessibility": ["13"], "Color Contrast": ["04"], "Content Readability": [], "Link Quality": ["27", "28"], "Heading Structure": ["14"], "Tab Order": ["28"], "Role Mapping": ["02"], "Form Accessibility": ["24", "28"], "Table Structure": ["15"], "Reading Order": ["09"], "Font Accessibility": ["31"], "Security Settings": ["26"], "Navigation Aids": ["27"], "PDF/UA Structure (veraPDF)": [], # Covers all M conditions } # Checkpoint definitions: id, name, how (M=machine/H=human) CHECKPOINTS = [ ("01", "Real content tagged", "M"), ("02", "Role mapping", "M"), ("03", "Flickering content", "H"), ("04", "Color and contrast", "H"), ("05", "Sound content", "H"), ("06", "Metadata – title", "M"), ("07", "Metadata – language", "M"), ("08", "Text content", "M"), ("09", "Reading order", "M"), ("10", "Tab order", "M"), ("11", "Natural language", "M"), ("12", "Character encoding", "M"), ("13", "Graphics / alt text", "H"), ("14", "Headings", "M"), ("15", "Tables", "M"), ("16", "Lists", "M"), ("17", "Mathematical expressions", "H"), ("18", "Page headers / footers", "H"), ("19", "Notes / references", "H"), ("20", "Optional content", "M"), ("21", "Embedded files", "M"), ("22", "Article threads", "H"), ("23", "Digital signatures", "H"), ("24", "Non-interactive forms", "H"), ("25", "XFA forms", "M"), ("26", "Security", "M"), ("27", "Navigation", "M"), ("28", "Annotations", "M"), ("29", "Actions", "M"), ("30", "XObjects", "M"), ("31", "Fonts", "M"), ] # Build a map: checkpoint_id -> pass/fail/not_tested from our check results cp_status: dict = {} # id -> "PASS" | "FAIL" | "NOT_TESTED" check_name_to_result = {cr.check_name: cr.passed for cr in self.check_results} # Determine which checkpoints are covered and whether they passed for check_name, cp_ids in CHECK_TO_MATTERHORN.items(): result_passed = check_name_to_result.get(check_name) if result_passed is None: continue for cp_id in cp_ids: if cp_id not in cp_status: cp_status[cp_id] = "PASS" if result_passed else "FAIL" elif not result_passed: # Any failure overrides a pass cp_status[cp_id] = "FAIL" # Handle PDF/UA veraPDF: if it passed, mark all M checkpoints as PASS unless already FAIL verapdf_passed = check_name_to_result.get("PDF/UA Structure (veraPDF)") if verapdf_passed: for cp_id, _, how in CHECKPOINTS: if how == "M" and cp_id not in cp_status: cp_status[cp_id] = "PASS" checkpoints_out = [] any_fail = False for cp_id, cp_name, cp_how in CHECKPOINTS: status = cp_status.get(cp_id, "NOT_TESTED") if status == "FAIL": any_fail = True checkpoints_out.append({ "id": cp_id, "name": cp_name, "how": cp_how, "status": status, }) return { "standard": "PDF/UA-1", "overall_passed": not any_fail, "checkpoints": checkpoints_out, } def _generate_summary(self) -> Dict[str, Any]: """Generate comprehensive summary""" severity_counts = { 'critical': len([i for i in self.issues if i.severity == Severity.CRITICAL]), 'error': len([i for i in self.issues if i.severity == Severity.ERROR]), 'warning': len([i for i in self.issues if i.severity == Severity.WARNING]), 'info': len([i for i in self.issues if i.severity == Severity.INFO]), 'success': len([i for i in self.issues if i.severity == Severity.SUCCESS]) } # Calculate score based on check-pass ratio passed_checks = len([cr for cr in self.check_results if cr.passed]) total_checks = len(self.check_results) base_score = round(100 * passed_checks / total_checks) if total_checks else 0 # Soft penalty for critical/error issues (capped at 20) penalty = min(20, severity_counts['critical'] * 5 + severity_counts['error'] * 2) score = max(0, base_score - penalty) # Convert datetime objects to strings for JSON serialization stats_serializable = {} for key, value in self.stats.items(): if isinstance(value, datetime): stats_serializable[key] = value.isoformat() else: stats_serializable[key] = value # Count auto-fixable issues auto_fixable_count = 0 if self.remediation_suggestions: auto_fixable_count = sum( len([f for f in fixes if f.get('auto_fixable')]) for fixes in self.remediation_suggestions.values() ) # WCAG compliance summary failing_criteria: set = set() for issue in self.issues: if issue.severity in (Severity.CRITICAL, Severity.ERROR): for c in issue.wcag_criterion.split(','): c = c.strip() if c and c != 'PDF/UA': failing_criteria.add(c) level_a_fails = sorted([c for c in failing_criteria if WCAG_LEVELS.get(c) == 'A']) level_aa_fails = sorted([c for c in failing_criteria if WCAG_LEVELS.get(c) in ('A', 'AA')]) wcag_compliance = { 'level_a': len(level_a_fails) == 0, 'level_aa': len(level_aa_fails) == 0, 'level_a_failures': level_a_fails, 'level_aa_failures': level_aa_fails, } # Prioritised next steps next_steps = [] seen_recs: set = set() for sev in (Severity.CRITICAL, Severity.ERROR, Severity.WARNING): for issue in self.issues: if issue.severity != sev: continue action = issue.recommendation or issue.description if action in seen_recs: continue seen_recs.add(action) next_steps.append({ 'priority': 1 if sev == Severity.CRITICAL else 2 if sev == Severity.ERROR else 3, 'category': issue.category, 'action': action, 'wcag': issue.wcag_criterion, 'wcag_level': WCAG_LEVELS.get(issue.wcag_criterion.split(',')[0].strip(), ''), }) if len(next_steps) >= 8: break if len(next_steps) >= 8: break return { 'filename': self.pdf_path.name, 'total_pages': len(self.pdf_reader.pages), 'accessibility_score': score, 'score_breakdown': { 'checks_passed': passed_checks, 'checks_total': total_checks, 'base_score': base_score, 'penalty': penalty, 'final_score': score, 'per_check': [ {'name': cr.check_name, 'passed': cr.passed} for cr in self.check_results ] }, 'matterhorn_summary': self._build_matterhorn_summary(), 'severity_counts': severity_counts, 'total_issues': len(self.issues), 'auto_fixable_count': auto_fixable_count, 'stats': stats_serializable, 'page_images': self.page_images, # Map of page_num -> image_filename 'page_image_dpi': getattr(self, 'page_image_dpi', 150), # DPI for coordinate scaling 'verapdf_validation': self.verapdf_results, 'remediation_suggestions': self.remediation_suggestions, 'checks_performed': [ { 'name': cr.check_name, 'passed': cr.passed, 'duration': cr.duration } for cr in self.check_results ], 'issues': [issue.to_dict() for issue in self.issues], 'wcag_compliance': wcag_compliance, 'next_steps': next_steps, } def generate_json_report(self) -> str: """Generate JSON report""" summary = self._generate_summary() return json.dumps(summary, indent=2) def run_full_check(self) -> Dict[str, Any]: """Alias for check_all - maintains backward compatibility""" return self.check_all() def to_dict(self) -> Dict[str, Any]: """Convert results to dictionary""" return self._generate_summary() def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser( description="Enterprise PDF Accessibility Checker", epilog="Environment variables can be set in a .env file (see .env.example)" ) parser.add_argument("pdf_file", help="PDF file to check") parser.add_argument("--google-credentials", help="Path to Google Cloud credentials JSON (or set GOOGLE_APPLICATION_CREDENTIALS in .env)") parser.add_argument("--google-key", help="Google API key string (or set GOOGLE_API_KEY in .env)") parser.add_argument("--anthropic-key", help="Anthropic API key (or set ANTHROPIC_API_KEY in .env)") parser.add_argument("--output", "-o", help="Output JSON file") parser.add_argument("--quick", action="store_true", help="Quick mode - skip expensive checks (OCR, AI image analysis, color contrast)") args = parser.parse_args() # Load from .env file as defaults, CLI args override config = { 'google_credentials_path': args.google_credentials or os.getenv('GOOGLE_APPLICATION_CREDENTIALS'), 'google_api_key': args.google_key or os.getenv('GOOGLE_API_KEY'), 'anthropic_api_key': args.anthropic_key or os.getenv('ANTHROPIC_API_KEY') } # Show what we're using if args.quick: print("⚡ Quick mode enabled - skipping expensive checks\n") checker = EnterprisePDFChecker(args.pdf_file, config, quick_mode=args.quick) summary = checker.check_all() # Generate page images if output specified if args.output: output_path = Path(args.output) images_dir = output_path.parent / f"{output_path.stem}_images" checker._generate_page_images(images_dir) report = checker.generate_json_report() if args.output: with open(args.output, 'w') as f: f.write(report) print(f"\n📄 Report saved: {args.output}") if checker.page_images: print(f"📸 Page images saved to: {images_dir}") else: print("\n" + "="*60) print("SUMMARY") print("="*60) print(f"Score: {summary['accessibility_score']}/100") print(f"Critical: {summary['severity_counts']['critical']}") print(f"Errors: {summary['severity_counts']['error']}") print(f"Warnings: {summary['severity_counts']['warning']}") print(f"API Calls: {summary['stats']['api_calls']}") print(f"Cost: ${summary['stats']['total_cost_estimate']:.2f}") if __name__ == "__main__": main()