2216 lines
87 KiB
Python
2216 lines
87 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Enterprise PDF Accessibility Checker
|
||
Quality-first comprehensive WCAG 2.1 validation
|
||
|
||
Features:
|
||
- Google Cloud Vision API for OCR and image analysis
|
||
- Anthropic Claude for alt text validation and content analysis
|
||
- Complete color contrast checking
|
||
- Readability analysis
|
||
- Form field validation
|
||
- Heading structure analysis
|
||
- Link quality checking
|
||
- Comprehensive reporting
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import json
|
||
import re
|
||
import base64
|
||
import hashlib
|
||
import time
|
||
import subprocess
|
||
from pathlib import Path
|
||
from typing import List, Dict, Any, Optional, Tuple
|
||
from dataclasses import dataclass, field, asdict
|
||
from enum import Enum
|
||
from datetime import datetime
|
||
from io import BytesIO
|
||
import traceback
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
# Load environment variables from .env file (optional)
|
||
try:
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
except ImportError:
|
||
# dotenv not installed, that's okay - will use environment variables
|
||
pass
|
||
|
||
# Setup logging
|
||
from logger_config import setup_logger
|
||
logger = setup_logger(__name__, "pdf_checker.log")
|
||
|
||
# Import retry helper for API resilience
|
||
from retry_helper import retry_with_backoff, safe_execute, RetryableError
|
||
|
||
# Import remediation module
|
||
try:
|
||
from pdf_remediation import VeraPDFValidator, PDFRemediator
|
||
except ImportError:
|
||
logger.warning("Remediation module not found - auto-fix features disabled")
|
||
VeraPDFValidator = None
|
||
PDFRemediator = None
|
||
|
||
# Core PDF libraries
|
||
try:
|
||
from pypdf import PdfReader, PdfWriter
|
||
import pdfplumber
|
||
from PIL import Image
|
||
import numpy as np
|
||
except ImportError:
|
||
logger.error("Core libraries not installed")
|
||
logger.error("Install: pip install pypdf pdfplumber pillow numpy")
|
||
sys.exit(1)
|
||
|
||
# OCR and analysis
|
||
try:
|
||
import pytesseract
|
||
from pdf2image import convert_from_path
|
||
except ImportError:
|
||
logger.warning("OCR libraries not available. Install: pip install pytesseract pdf2image")
|
||
pytesseract = None
|
||
|
||
# Readability
|
||
try:
|
||
from textblob import TextBlob
|
||
except ImportError:
|
||
logger.warning("TextBlob not available. Install: pip install textblob")
|
||
TextBlob = None
|
||
|
||
# Google Cloud Vision
|
||
try:
|
||
from google.cloud import vision
|
||
from google.cloud import documentai_v1 as documentai
|
||
except ImportError:
|
||
logger.warning("Google Cloud libraries not available")
|
||
logger.info("Install: pip install google-cloud-vision google-cloud-documentai")
|
||
vision = None
|
||
|
||
# Anthropic Claude
|
||
try:
|
||
import anthropic
|
||
except ImportError:
|
||
logger.warning("Anthropic library not available")
|
||
logger.info("Install: pip install anthropic")
|
||
anthropic = None
|
||
|
||
# Language detection
|
||
try:
|
||
from langdetect import detect as langdetect_detect, LangDetectException
|
||
except ImportError:
|
||
logger.warning("langdetect not available — language detection disabled")
|
||
langdetect_detect = None
|
||
LangDetectException = Exception
|
||
|
||
|
||
# WCAG 2.1 criterion → conformance level
|
||
WCAG_LEVELS: Dict[str, str] = {
|
||
'1.1.1': 'A', '1.2.1': 'A', '1.2.2': 'A', '1.2.3': 'A',
|
||
'1.2.4': 'AA', '1.2.5': 'AA',
|
||
'1.3.1': 'A', '1.3.2': 'A', '1.3.3': 'A',
|
||
'1.3.4': 'AA', '1.3.5': 'AA',
|
||
'1.4.1': 'A', '1.4.2': 'A',
|
||
'1.4.3': 'AA', '1.4.4': 'AA', '1.4.5': 'AA',
|
||
'1.4.10': 'AA', '1.4.11': 'AA', '1.4.12': 'AA', '1.4.13': 'AA',
|
||
'2.1.1': 'A', '2.1.2': 'A', '2.1.4': 'A',
|
||
'2.2.1': 'A', '2.2.2': 'A',
|
||
'2.3.1': 'A',
|
||
'2.4.1': 'A', '2.4.2': 'A', '2.4.3': 'A', '2.4.4': 'A',
|
||
'2.4.5': 'AA', '2.4.6': 'AA', '2.4.7': 'AA',
|
||
'2.5.1': 'A', '2.5.2': 'A', '2.5.3': 'A', '2.5.4': 'A',
|
||
'3.1.1': 'A', '3.1.2': 'AA', '3.1.5': 'AAA',
|
||
'3.2.1': 'A', '3.2.2': 'A', '3.2.3': 'AA', '3.2.4': 'AA',
|
||
'3.3.1': 'A', '3.3.2': 'A', '3.3.3': 'AA', '3.3.4': 'AA',
|
||
'4.1.1': 'A', '4.1.2': 'A', '4.1.3': 'AA',
|
||
}
|
||
|
||
|
||
class Severity(Enum):
|
||
"""Issue severity levels"""
|
||
CRITICAL = "CRITICAL"
|
||
ERROR = "ERROR"
|
||
WARNING = "WARNING"
|
||
INFO = "INFO"
|
||
SUCCESS = "SUCCESS"
|
||
|
||
|
||
@dataclass
|
||
class AccessibilityIssue:
|
||
"""Represents an accessibility issue"""
|
||
severity: Severity
|
||
category: str
|
||
description: str
|
||
page_number: Optional[int] = None
|
||
recommendation: str = ""
|
||
wcag_criterion: str = ""
|
||
details: Dict[str, Any] = field(default_factory=dict)
|
||
coordinates: Optional[Dict[str, float]] = None # x0, y0, x1, y1 for highlighting
|
||
|
||
def to_dict(self):
|
||
"""Convert to dictionary for JSON serialization"""
|
||
levels = [WCAG_LEVELS.get(c.strip(), '') for c in self.wcag_criterion.split(',') if c.strip()]
|
||
levels = [l for l in levels if l]
|
||
level_order = ['A', 'AA', 'AAA']
|
||
wcag_level = min(levels, key=lambda l: level_order.index(l)) if levels else ''
|
||
return {
|
||
'severity': self.severity.value,
|
||
'category': self.category,
|
||
'description': self.description,
|
||
'page_number': self.page_number,
|
||
'recommendation': self.recommendation,
|
||
'wcag_criterion': self.wcag_criterion,
|
||
'wcag_level': wcag_level,
|
||
'details': self.details,
|
||
'coordinates': self.coordinates
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class CheckResult:
|
||
"""Results from a specific check"""
|
||
check_name: str
|
||
passed: bool
|
||
issues: List[AccessibilityIssue] = field(default_factory=list)
|
||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||
duration: float = 0.0
|
||
|
||
|
||
class CacheManager:
|
||
"""Manages caching of API results to reduce costs"""
|
||
|
||
def __init__(self, cache_dir: str = ".cache"):
|
||
self.cache_dir = Path(cache_dir)
|
||
self.cache_dir.mkdir(exist_ok=True)
|
||
|
||
def get_cache_key(self, data: bytes, prefix: str = "") -> str:
|
||
"""Generate cache key from data"""
|
||
hash_obj = hashlib.sha256(data)
|
||
return f"{prefix}_{hash_obj.hexdigest()}"
|
||
|
||
def get(self, key: str) -> Optional[Dict]:
|
||
"""Retrieve cached result"""
|
||
cache_file = self.cache_dir / f"{key}.json"
|
||
if cache_file.exists():
|
||
try:
|
||
with open(cache_file, 'r') as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, IOError, OSError):
|
||
return None
|
||
return None
|
||
|
||
def set(self, key: str, data: Dict):
|
||
"""Store result in cache"""
|
||
cache_file = self.cache_dir / f"{key}.json"
|
||
with open(cache_file, 'w') as f:
|
||
json.dump(data, f)
|
||
|
||
|
||
class ColorContrastChecker:
|
||
"""WCAG color contrast validation"""
|
||
|
||
WCAG_AA_NORMAL = 4.5
|
||
WCAG_AA_LARGE = 3.0
|
||
WCAG_AAA_NORMAL = 7.0
|
||
WCAG_AAA_LARGE = 4.5
|
||
|
||
@staticmethod
|
||
def get_luminance(rgb: Tuple[int, int, int]) -> float:
|
||
"""Calculate relative luminance per WCAG formula"""
|
||
r, g, b = [x / 255.0 for x in rgb]
|
||
|
||
r = r / 12.92 if r <= 0.03928 else ((r + 0.055) / 1.055) ** 2.4
|
||
g = g / 12.92 if g <= 0.03928 else ((g + 0.055) / 1.055) ** 2.4
|
||
b = b / 12.92 if b <= 0.03928 else ((b + 0.055) / 1.055) ** 2.4
|
||
|
||
return 0.2126 * r + 0.7152 * g + 0.0722 * b
|
||
|
||
@staticmethod
|
||
def calculate_contrast_ratio(color1: Tuple[int, int, int],
|
||
color2: Tuple[int, int, int]) -> float:
|
||
"""Calculate WCAG contrast ratio"""
|
||
l1 = ColorContrastChecker.get_luminance(color1)
|
||
l2 = ColorContrastChecker.get_luminance(color2)
|
||
|
||
lighter = max(l1, l2)
|
||
darker = min(l1, l2)
|
||
|
||
return (lighter + 0.05) / (darker + 0.05)
|
||
|
||
@staticmethod
|
||
def check_image_contrast(image: Image.Image, sample_size: int = 1000) -> Dict:
|
||
"""Sample image for contrast issues.
|
||
|
||
Compares pixel pairs that are 8px apart vertically — more likely to
|
||
cross a text-stroke / background boundary than adjacent pixels.
|
||
Only considers pairs where luminance actually differs (|Δlum| > 0.08),
|
||
which filters out uniform photo areas and focuses on real edges.
|
||
"""
|
||
if image.mode != 'RGB':
|
||
image = image.convert('RGB')
|
||
|
||
width, height = image.size
|
||
rng = np.random.default_rng(seed=42)
|
||
significant = [] # pairs that cross a meaningful light/dark boundary
|
||
|
||
attempts = min(sample_size * 4, width * height // 20)
|
||
for _ in range(attempts):
|
||
x = int(rng.integers(0, width))
|
||
y = int(rng.integers(0, max(1, height - 9)))
|
||
|
||
try:
|
||
c1 = image.getpixel((x, y))
|
||
c2 = image.getpixel((x, y + 8))
|
||
l1 = ColorContrastChecker.get_luminance(c1)
|
||
l2 = ColorContrastChecker.get_luminance(c2)
|
||
|
||
if abs(l1 - l2) < 0.08:
|
||
continue # near-uniform area (photo gradient, blank space) — skip
|
||
|
||
ratio = ColorContrastChecker.calculate_contrast_ratio(c1, c2)
|
||
significant.append({'ratio': ratio, 'colors': (c1, c2), 'position': (x, y)})
|
||
|
||
if len(significant) >= sample_size:
|
||
break
|
||
except (IndexError, TypeError, ValueError):
|
||
continue
|
||
|
||
if len(significant) < 20:
|
||
return {'error': 'Insufficient contrast edges to analyse (image-only page)'}
|
||
|
||
fail_aa = [s for s in significant if s['ratio'] < ColorContrastChecker.WCAG_AA_NORMAL]
|
||
fail_large = [s for s in significant if s['ratio'] < ColorContrastChecker.WCAG_AA_LARGE]
|
||
|
||
return {
|
||
'total_samples': len(significant),
|
||
'fail_aa_normal_count': len(fail_aa),
|
||
'fail_aa_large_count': len(fail_large),
|
||
'fail_aa_normal_percent': len(fail_aa) / len(significant) * 100,
|
||
'fail_aa_large_percent': len(fail_large) / len(significant) * 100,
|
||
'worst_ratio': min(s['ratio'] for s in significant),
|
||
'best_ratio': max(s['ratio'] for s in significant),
|
||
'avg_ratio': sum(s['ratio'] for s in significant) / len(significant),
|
||
}
|
||
|
||
|
||
class ReadabilityAnalyzer:
|
||
"""Content readability analysis"""
|
||
|
||
@staticmethod
|
||
def count_syllables(word: str) -> int:
|
||
"""Count syllables in a word"""
|
||
word = word.lower().strip()
|
||
vowels = 'aeiouy'
|
||
syllable_count = 0
|
||
previous_was_vowel = False
|
||
|
||
for char in word:
|
||
is_vowel = char in vowels
|
||
if is_vowel and not previous_was_vowel:
|
||
syllable_count += 1
|
||
previous_was_vowel = is_vowel
|
||
|
||
if word.endswith('e') and syllable_count > 1:
|
||
syllable_count -= 1
|
||
|
||
return max(1, syllable_count)
|
||
|
||
@staticmethod
|
||
def analyze(text: str) -> Dict:
|
||
"""Comprehensive readability analysis"""
|
||
if not text or len(text.strip()) < 50:
|
||
return {'error': 'Insufficient text for analysis'}
|
||
|
||
# Clean text
|
||
text = re.sub(r'\s+', ' ', text.strip())
|
||
|
||
# Basic metrics
|
||
sentences = re.split(r'[.!?]+', text)
|
||
sentences = [s.strip() for s in sentences if s.strip()]
|
||
words = re.findall(r'\b\w+\b', text)
|
||
|
||
if not sentences or not words:
|
||
return {'error': 'Could not parse text'}
|
||
|
||
total_sentences = len(sentences)
|
||
total_words = len(words)
|
||
total_syllables = sum(ReadabilityAnalyzer.count_syllables(w) for w in words)
|
||
|
||
# Flesch Reading Ease (0-100, higher = easier)
|
||
flesch_reading_ease = (
|
||
206.835
|
||
- 1.015 * (total_words / total_sentences)
|
||
- 84.6 * (total_syllables / total_words)
|
||
)
|
||
|
||
# Flesch-Kincaid Grade Level
|
||
fk_grade_level = (
|
||
0.39 * (total_words / total_sentences)
|
||
+ 11.8 * (total_syllables / total_words)
|
||
- 15.59
|
||
)
|
||
|
||
# Find issues
|
||
long_sentences = [s for s in sentences if len(s.split()) > 25]
|
||
complex_words = [w for w in words if ReadabilityAnalyzer.count_syllables(w) > 3]
|
||
|
||
return {
|
||
'flesch_reading_ease': round(flesch_reading_ease, 2),
|
||
'flesch_kincaid_grade': round(fk_grade_level, 2),
|
||
'total_words': total_words,
|
||
'total_sentences': total_sentences,
|
||
'avg_words_per_sentence': round(total_words / total_sentences, 2),
|
||
'long_sentences_count': len(long_sentences),
|
||
'complex_words_count': len(complex_words),
|
||
'complex_words_percent': round(len(complex_words) / total_words * 100, 2)
|
||
}
|
||
|
||
|
||
class EnterprisePDFChecker:
|
||
"""Enterprise-grade PDF accessibility checker"""
|
||
|
||
def __init__(self, pdf_path: str, config: Dict[str, Any] = None, quick_mode: bool = False, generate_images: bool = True):
|
||
self.pdf_path = Path(pdf_path)
|
||
self.config = config or {}
|
||
self.quick_mode = quick_mode
|
||
self.generate_images = generate_images
|
||
self.issues: List[AccessibilityIssue] = []
|
||
self.check_results: List[CheckResult] = []
|
||
self.pdf_reader = None
|
||
self.pdf_plumber = None
|
||
self.cache = CacheManager()
|
||
self.page_images: Dict[int, str] = {} # page_num -> image_path
|
||
self.verapdf_results: Optional[Dict] = None
|
||
self.remediation_suggestions: Optional[Dict] = None
|
||
self._detected_lang: str = 'en' # detected language of the document
|
||
|
||
# API clients
|
||
self.vision_client = None
|
||
self.anthropic_client = None
|
||
self.api_timeout = 10.0 # 10 second timeout for API calls
|
||
|
||
# Initialize API clients
|
||
config = self.config
|
||
google_creds_path = config.get('google_credentials_path')
|
||
if google_creds_path and os.path.isfile(google_creds_path):
|
||
# Valid credentials file exists
|
||
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = google_creds_path
|
||
if vision:
|
||
try:
|
||
self.vision_client = vision.ImageAnnotatorClient()
|
||
logger.info("Google Cloud Vision initialized with credentials file")
|
||
except Exception as e:
|
||
logger.warning(f"Google Vision initialization failed: {str(e)}")
|
||
elif config.get('google_api_key'):
|
||
# Use API key directly
|
||
if vision:
|
||
# Note: Vision API with API key requires different initialization
|
||
# For now, store key for use in requests
|
||
self.google_api_key = config['google_api_key']
|
||
logger.info(f"Using Google API key: {self.google_api_key[:20]}...")
|
||
elif google_creds_path:
|
||
# Path provided but file doesn't exist
|
||
logger.warning(f"Google credentials file not found: {google_creds_path}")
|
||
logger.warning("Skipping Google Cloud Vision (advanced OCR disabled)")
|
||
|
||
if config.get('anthropic_api_key') and anthropic:
|
||
try:
|
||
self.anthropic_client = anthropic.Anthropic(api_key=config['anthropic_api_key'])
|
||
logger.info("Anthropic Claude initialized")
|
||
except Exception as e:
|
||
logger.warning(f"Anthropic initialization failed: {str(e)}")
|
||
|
||
# Stats
|
||
self.stats = {
|
||
'start_time': datetime.now(),
|
||
'total_checks': 0,
|
||
'api_calls': 0,
|
||
'cached_calls': 0,
|
||
'total_cost_estimate': 0.0
|
||
}
|
||
|
||
def add_issue(self, severity: Severity, category: str, description: str, **kwargs):
|
||
"""Add an accessibility issue"""
|
||
issue = AccessibilityIssue(
|
||
severity=severity,
|
||
category=category,
|
||
description=description,
|
||
**kwargs
|
||
)
|
||
self.issues.append(issue)
|
||
|
||
# Per-check wall-clock timeouts (seconds). Heavy checks get more time.
|
||
_CHECK_TIMEOUTS = {
|
||
"Image Accessibility": 180,
|
||
"OCR Quality": 180,
|
||
"Color Contrast": 120,
|
||
"PDF/UA Structure (veraPDF)": 120,
|
||
"Content Readability": 60,
|
||
}
|
||
_DEFAULT_CHECK_TIMEOUT = 90
|
||
|
||
def run_check(self, check_func, check_name: str) -> CheckResult:
|
||
"""Run a check with a per-check timeout and record results."""
|
||
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
|
||
start_time = time.time()
|
||
result = CheckResult(check_name=check_name, passed=True)
|
||
issues_before = len(self.issues)
|
||
timeout = self._CHECK_TIMEOUTS.get(check_name, self._DEFAULT_CHECK_TIMEOUT)
|
||
|
||
try:
|
||
with ThreadPoolExecutor(max_workers=1) as ex:
|
||
future = ex.submit(check_func)
|
||
future.result(timeout=timeout)
|
||
|
||
# Check passed if no critical/error issues added by THIS check
|
||
new_issues = self.issues[issues_before:]
|
||
critical_errors = [i for i in new_issues
|
||
if i.severity in [Severity.CRITICAL, Severity.ERROR]]
|
||
result.passed = len(critical_errors) == 0
|
||
except FuturesTimeout:
|
||
logger.warning(f"{check_name} timed out after {timeout}s — skipping")
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
check_name,
|
||
f"Check timed out after {timeout}s and was skipped",
|
||
details={'timeout': timeout}
|
||
)
|
||
result.passed = False
|
||
except Exception as e:
|
||
self.add_issue(
|
||
Severity.CRITICAL,
|
||
check_name,
|
||
f"Check failed with error: {str(e)}",
|
||
details={'error': str(e), 'traceback': traceback.format_exc()}
|
||
)
|
||
result.passed = False
|
||
|
||
result.duration = time.time() - start_time
|
||
self.check_results.append(result)
|
||
self.stats['total_checks'] += 1
|
||
|
||
return result
|
||
|
||
def check_all(self) -> Dict[str, Any]:
|
||
"""Run all accessibility checks"""
|
||
logger.info("Enterprise PDF Accessibility Check")
|
||
logger.info(f"File: {self.pdf_path.name}")
|
||
logger.info("=" * 60)
|
||
|
||
try:
|
||
self.pdf_reader = PdfReader(str(self.pdf_path))
|
||
self.pdf_plumber = pdfplumber.open(str(self.pdf_path))
|
||
|
||
# Run all checks
|
||
checks = [
|
||
(self._check_basic_structure, "Document Structure"),
|
||
(self._check_metadata, "Metadata"),
|
||
(self._check_language, "Language Declaration"),
|
||
(self._check_text_extractability, "Text Extractability"),
|
||
(self._check_ocr_quality, "OCR Quality"),
|
||
(self._check_images_comprehensive, "Image Accessibility"),
|
||
(self._check_color_contrast, "Color Contrast"),
|
||
(self._check_readability, "Content Readability"),
|
||
(self._check_links, "Link Quality"),
|
||
(self._check_headings, "Heading Structure"),
|
||
(self._check_tab_order, "Tab Order"),
|
||
(self._check_role_mapping, "Role Mapping"),
|
||
(self._check_forms, "Form Accessibility"),
|
||
(self._check_tables, "Table Structure"),
|
||
(self._check_reading_order, "Reading Order"),
|
||
(self._check_fonts, "Font Accessibility"),
|
||
(self._check_security, "Security Settings"),
|
||
(self._check_bookmarks, "Navigation Aids"),
|
||
(self._check_verapdf_validation, "PDF/UA Structure (veraPDF)"),
|
||
]
|
||
|
||
for check_func, check_name in checks:
|
||
logger.info(f"Running: {check_name}...")
|
||
result = self.run_check(check_func, check_name)
|
||
status = "PASS" if result.passed else "FAIL"
|
||
logger.info(f"{status} ({result.duration:.2f}s)")
|
||
|
||
# Analyze remediation options
|
||
self._analyze_remediation_options()
|
||
|
||
except Exception as e:
|
||
self.add_issue(
|
||
Severity.CRITICAL,
|
||
"File Access",
|
||
f"Could not process PDF: {str(e)}",
|
||
details={'error': str(e)}
|
||
)
|
||
finally:
|
||
if self.pdf_plumber:
|
||
self.pdf_plumber.close()
|
||
|
||
self.stats['end_time'] = datetime.now()
|
||
self.stats['duration'] = (self.stats['end_time'] - self.stats['start_time']).total_seconds()
|
||
|
||
return self._generate_summary()
|
||
|
||
# ==================== CORE CHECKS ====================
|
||
|
||
def _check_basic_structure(self):
|
||
"""Check PDF structure and tagging"""
|
||
catalog = self.pdf_reader.trailer.get("/Root", {})
|
||
|
||
if "/MarkInfo" not in catalog:
|
||
self.add_issue(
|
||
Severity.CRITICAL,
|
||
"Document Structure",
|
||
"PDF is not tagged - completely inaccessible to screen readers",
|
||
wcag_criterion="1.3.1, 4.1.2",
|
||
recommendation="Tag the PDF using Adobe Acrobat Pro or authoring software"
|
||
)
|
||
return
|
||
|
||
mark_info = catalog.get("/MarkInfo", {})
|
||
marked = mark_info.get("/Marked", False)
|
||
|
||
if not marked:
|
||
self.add_issue(
|
||
Severity.CRITICAL,
|
||
"Document Structure",
|
||
"PDF marked as untagged in metadata",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Enable document tagging"
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.SUCCESS,
|
||
"Document Structure",
|
||
"PDF is properly tagged",
|
||
wcag_criterion="1.3.1"
|
||
)
|
||
|
||
def _check_metadata(self):
|
||
"""Check document metadata"""
|
||
meta = self.pdf_reader.metadata
|
||
|
||
if not meta:
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Metadata",
|
||
"No document metadata found",
|
||
wcag_criterion="2.4.2",
|
||
recommendation="Add title, author, and subject metadata"
|
||
)
|
||
return
|
||
|
||
# Check title
|
||
if not meta.title or not meta.title.strip():
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Metadata",
|
||
"Document title is missing",
|
||
wcag_criterion="2.4.2",
|
||
recommendation="Add a descriptive title"
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.SUCCESS,
|
||
"Metadata",
|
||
f"Document has title: '{meta.title}'",
|
||
wcag_criterion="2.4.2"
|
||
)
|
||
|
||
# Check author
|
||
if not meta.author or not meta.author.strip():
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Metadata",
|
||
"Author information is missing",
|
||
recommendation="Add author metadata"
|
||
)
|
||
|
||
# Check subject
|
||
if not meta.subject or not meta.subject.strip():
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Metadata",
|
||
"Subject/description is missing",
|
||
recommendation="Add a brief description"
|
||
)
|
||
|
||
def _check_language(self):
|
||
"""Check language declaration (WCAG 3.1.1) and detect actual content language."""
|
||
catalog = self.pdf_reader.trailer.get("/Root", {})
|
||
|
||
# --- Detect actual language from content ---
|
||
sample_text = ""
|
||
for page in self.pdf_plumber.pages[:3]:
|
||
t = page.extract_text()
|
||
if t:
|
||
sample_text += t + " "
|
||
if len(sample_text) > 500:
|
||
break
|
||
|
||
if langdetect_detect and len(sample_text.strip()) >= 50:
|
||
try:
|
||
self._detected_lang = langdetect_detect(sample_text)
|
||
except LangDetectException:
|
||
self._detected_lang = 'en'
|
||
|
||
# --- Check declared /Lang ---
|
||
if "/Lang" not in catalog:
|
||
suggestion = self._detected_lang if self._detected_lang else 'en-US'
|
||
# Map ISO 639-1 codes to BCP-47 tags
|
||
lang_map = {
|
||
'uk': 'uk-UA', 'ru': 'ru-RU', 'de': 'de-DE', 'fr': 'fr-FR',
|
||
'es': 'es-ES', 'pl': 'pl-PL', 'it': 'it-IT', 'pt': 'pt-PT',
|
||
'nl': 'nl-NL', 'cs': 'cs-CZ', 'sk': 'sk-SK', 'ro': 'ro-RO',
|
||
'hu': 'hu-HU', 'bg': 'bg-BG', 'hr': 'hr-HR', 'ar': 'ar-SA',
|
||
'zh': 'zh-CN', 'ja': 'ja-JP', 'ko': 'ko-KR', 'en': 'en-US',
|
||
}
|
||
bcp47 = lang_map.get(self._detected_lang, self._detected_lang)
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Language",
|
||
"Document language not specified",
|
||
wcag_criterion="3.1.1",
|
||
recommendation=f"Set document language (detected content language: '{bcp47}')",
|
||
details={'detected_language': self._detected_lang}
|
||
)
|
||
else:
|
||
declared_lang = str(catalog["/Lang"]).lower()
|
||
# Compare declared lang prefix with detected lang
|
||
declared_prefix = declared_lang.split('-')[0].split('_')[0]
|
||
if (langdetect_detect and len(sample_text.strip()) >= 50
|
||
and self._detected_lang != 'en' # English is common false-positive
|
||
and declared_prefix != self._detected_lang
|
||
and self._detected_lang not in declared_prefix):
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Language",
|
||
f"Declared language '{catalog['/Lang']}' may not match content "
|
||
f"(detected: '{self._detected_lang}')",
|
||
wcag_criterion="3.1.1",
|
||
recommendation="Verify the /Lang entry matches the document's actual language",
|
||
details={'declared_language': str(catalog["/Lang"]),
|
||
'detected_language': self._detected_lang}
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.SUCCESS,
|
||
"Language",
|
||
f"Document language set to: {catalog['/Lang']}",
|
||
wcag_criterion="3.1.1",
|
||
details={'declared_language': str(catalog["/Lang"]),
|
||
'detected_language': self._detected_lang}
|
||
)
|
||
|
||
def _check_text_extractability(self):
|
||
"""Check if text can be extracted"""
|
||
total_pages = len(self.pdf_reader.pages)
|
||
pages_without_text = 0
|
||
page_details = []
|
||
|
||
for i, page in enumerate(self.pdf_plumber.pages):
|
||
text = page.extract_text()
|
||
char_count = len(text) if text else 0
|
||
|
||
if char_count < 10:
|
||
pages_without_text += 1
|
||
page_details.append(i + 1)
|
||
|
||
if pages_without_text == total_pages:
|
||
self.add_issue(
|
||
Severity.CRITICAL,
|
||
"Text Accessibility",
|
||
"No extractable text found - document appears to be scanned images",
|
||
wcag_criterion="1.1.1",
|
||
recommendation="Run OCR or recreate from source with selectable text",
|
||
details={'pages_affected': page_details}
|
||
)
|
||
elif pages_without_text > 0:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Text Accessibility",
|
||
f"{pages_without_text} of {total_pages} pages have no extractable text",
|
||
wcag_criterion="1.1.1",
|
||
recommendation="Review pages without text",
|
||
details={'pages_affected': page_details}
|
||
)
|
||
|
||
def _check_ocr_quality(self):
|
||
"""Check OCR quality if document appears scanned"""
|
||
if not pytesseract:
|
||
return
|
||
|
||
if self.quick_mode:
|
||
logger.info("Skipping OCR analysis (quick mode)")
|
||
return
|
||
|
||
logger.info("Running OCR analysis...")
|
||
|
||
try:
|
||
# Reduced DPI from 300 to 150 for faster processing
|
||
images = convert_from_path(str(self.pdf_path), dpi=150, first_page=1, last_page=min(2, len(self.pdf_reader.pages)))
|
||
|
||
for i, image in enumerate(images):
|
||
# Get OCR data with confidence
|
||
ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
|
||
|
||
confidences = [int(c) for c in ocr_data['conf'] if c != '-1']
|
||
if confidences:
|
||
avg_confidence = sum(confidences) / len(confidences)
|
||
|
||
if avg_confidence < 60:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"OCR Quality",
|
||
f"Page {i+1}: Low OCR confidence ({avg_confidence:.1f}%)",
|
||
wcag_criterion="1.1.1",
|
||
recommendation="Poor scan quality - rescan or manual review needed",
|
||
page_number=i+1,
|
||
details={'confidence': avg_confidence}
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"OCR check skipped: {str(e)}")
|
||
|
||
def _check_images_comprehensive(self):
|
||
"""Comprehensive image accessibility check with AI"""
|
||
logger.info("Analyzing images with AI...")
|
||
|
||
total_images = 0
|
||
analyzed_images = 0
|
||
|
||
# Collect all images first
|
||
image_tasks = []
|
||
for page_num, page in enumerate(self.pdf_plumber.pages):
|
||
images = page.images
|
||
total_images += len(images)
|
||
|
||
for img_idx, img in enumerate(images):
|
||
try:
|
||
image_data = self._extract_image_from_page(page, img)
|
||
if image_data:
|
||
# Include coordinates for highlighting
|
||
coords = {
|
||
'x0': img['x0'],
|
||
'y0': img['top'],
|
||
'x1': img['x1'],
|
||
'y1': img['bottom']
|
||
}
|
||
image_tasks.append((image_data, page_num + 1, img_idx + 1, coords))
|
||
except Exception as e:
|
||
logger.warning(f"Failed to extract image on page {page_num + 1}: {str(e)}")
|
||
|
||
if total_images == 0:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Images",
|
||
"No images found in document",
|
||
wcag_criterion="1.1.1"
|
||
)
|
||
return
|
||
|
||
logger.info(f"Found {total_images} images to analyze...")
|
||
|
||
# Cap analysis: skip very small images (likely decorative/icons)
|
||
image_tasks = [t for t in image_tasks if self._image_data_size(t[0]) > 2048]
|
||
|
||
# Limit to 10 images max — more would just waste API calls on brochure backgrounds
|
||
MAX_IMAGES = 10
|
||
if len(image_tasks) > MAX_IMAGES:
|
||
logger.info(f"Capping image analysis at {MAX_IMAGES} (of {len(image_tasks)}) images")
|
||
image_tasks = image_tasks[:MAX_IMAGES]
|
||
|
||
# Skip AI analysis in quick mode
|
||
if self.quick_mode:
|
||
logger.info("Skipping AI image analysis (quick mode)")
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Images",
|
||
f"Found {total_images} images - run without --quick for AI analysis",
|
||
wcag_criterion="1.1.1"
|
||
)
|
||
return
|
||
|
||
# Process images in parallel with progress updates
|
||
def analyze_single_image(task_data):
|
||
image_data, page_num, img_num, coords = task_data
|
||
result = {'page': page_num, 'img': img_num, 'analyzed': False, 'coords': coords}
|
||
|
||
try:
|
||
# Check cache first
|
||
cache_key = self.cache.get_cache_key(image_data, "claude_vision")
|
||
cached_result = self.cache.get(cache_key)
|
||
|
||
if cached_result:
|
||
analysis = cached_result
|
||
result['cached'] = True
|
||
else:
|
||
# Analyze with Claude (timeout via concurrent.futures)
|
||
with ThreadPoolExecutor(max_workers=1) as img_exec:
|
||
future = img_exec.submit(self._analyze_image_with_claude, image_data)
|
||
try:
|
||
analysis = future.result(timeout=30)
|
||
except Exception:
|
||
analysis = None
|
||
if analysis and 'error' not in analysis:
|
||
self.cache.set(cache_key, analysis)
|
||
result['cached'] = False
|
||
|
||
if analysis and 'error' not in analysis:
|
||
result['analysis'] = analysis
|
||
result['analyzed'] = True
|
||
|
||
# Also check with Google Vision for additional data
|
||
if self.vision_client:
|
||
vision_analysis = self._analyze_image_with_google(image_data)
|
||
if vision_analysis:
|
||
result['vision_analysis'] = vision_analysis
|
||
|
||
except Exception as e:
|
||
result['error'] = str(e)
|
||
|
||
return result
|
||
|
||
# Use ThreadPoolExecutor for parallel processing
|
||
max_workers = 5 if not self.quick_mode else 1
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
futures = {executor.submit(analyze_single_image, task): task for task in image_tasks}
|
||
|
||
for future in as_completed(futures):
|
||
try:
|
||
result = future.result()
|
||
analyzed_images += 1
|
||
cache_status = " (cached)" if result.get('cached') else ""
|
||
logger.info(f"Analyzed image {analyzed_images}/{total_images} (Page {result['page']}){cache_status}")
|
||
|
||
if result.get('analyzed'):
|
||
self._process_image_analysis(result['analysis'], result['page'], result['img'], result.get('coords'))
|
||
if result.get('cached'):
|
||
self.stats['cached_calls'] += 1
|
||
else:
|
||
self.stats['api_calls'] += 1
|
||
self.stats['total_cost_estimate'] += 0.015
|
||
|
||
if result.get('vision_analysis'):
|
||
self._process_google_vision_results(result['vision_analysis'], result['page'], result['img'], result.get('coords'))
|
||
|
||
if result.get('error'):
|
||
logger.warning(f"Error analyzing image on page {result['page']}: {result['error']}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Image analysis error: {str(e)}")
|
||
|
||
logger.info(f"Completed analysis of {analyzed_images}/{total_images} images")
|
||
|
||
@retry_with_backoff(max_retries=3, initial_delay=1.0)
|
||
def _analyze_image_with_claude(self, image_bytes: bytes) -> Optional[Dict]:
|
||
"""Analyze image with Claude Vision (with automatic retry on failure)"""
|
||
if not self.anthropic_client:
|
||
return None
|
||
|
||
try:
|
||
base64_image = base64.b64encode(image_bytes).decode('utf-8')
|
||
|
||
message = self.anthropic_client.messages.create(
|
||
model="claude-sonnet-4-5-20250929",
|
||
max_tokens=1024,
|
||
timeout=self.api_timeout,
|
||
messages=[
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{
|
||
"type": "image",
|
||
"source": {
|
||
"type": "base64",
|
||
"media_type": "image/jpeg",
|
||
"data": base64_image,
|
||
},
|
||
},
|
||
{
|
||
"type": "text",
|
||
"text": """Analyze this image for PDF accessibility (WCAG 2.1):
|
||
|
||
1. Provide concise alt text (1-2 sentences, max 125 characters)
|
||
2. Is this decorative or informational?
|
||
3. Does it contain text? If yes, what text?
|
||
4. Does it use color as the only means of conveying information?
|
||
5. Are there any accessibility concerns?
|
||
6. Quality rating (1-10) if this were to be used in a PDF
|
||
7. For images of people: describe their role, action, or function — not physical
|
||
appearance (race, ethnicity, age, gender, disability) unless directly relevant
|
||
to the image's informational purpose. A human reviewer will verify descriptions
|
||
of people.
|
||
8. If a brand name, logo, or product name is visible, use the specific brand name
|
||
in the alt text (e.g., "Scotch tape" not "adhesive tape", "Nike Air Max" not "sneakers").
|
||
|
||
Respond in JSON format:
|
||
{
|
||
"alt_text": "...",
|
||
"type": "decorative|informational|complex",
|
||
"has_text": true|false,
|
||
"text_content": "...",
|
||
"color_only_info": true|false,
|
||
"concerns": ["..."],
|
||
"quality_rating": 1-10,
|
||
"recommendation": "...",
|
||
"contains_people": true|false,
|
||
"brands_detected": ["..."]
|
||
}"""
|
||
}
|
||
],
|
||
}
|
||
],
|
||
)
|
||
|
||
response_text = message.content[0].text
|
||
# Try to parse JSON from response
|
||
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
|
||
if json_match:
|
||
return json.loads(json_match.group())
|
||
|
||
return {'error': 'Could not parse response'}
|
||
|
||
except Exception as e:
|
||
return {'error': str(e)}
|
||
|
||
@retry_with_backoff(max_retries=3, initial_delay=1.0)
|
||
def _analyze_image_with_google(self, image_bytes: bytes) -> Optional[Dict]:
|
||
"""Analyze image with Google Vision (with automatic retry on failure)"""
|
||
if not self.vision_client:
|
||
return None
|
||
|
||
try:
|
||
image = vision.Image(content=image_bytes)
|
||
|
||
# Multiple detection types with timeout
|
||
response = self.vision_client.annotate_image(
|
||
{
|
||
'image': image,
|
||
'features': [
|
||
{'type_': vision.Feature.Type.TEXT_DETECTION},
|
||
{'type_': vision.Feature.Type.LABEL_DETECTION},
|
||
{'type_': vision.Feature.Type.IMAGE_PROPERTIES},
|
||
{'type_': vision.Feature.Type.OBJECT_LOCALIZATION},
|
||
],
|
||
},
|
||
timeout=self.api_timeout
|
||
)
|
||
|
||
self.stats['api_calls'] += 1
|
||
self.stats['total_cost_estimate'] += 0.0015
|
||
|
||
return {
|
||
'has_text': bool(response.text_annotations),
|
||
'text_content': response.text_annotations[0].description if response.text_annotations else None,
|
||
'labels': [label.description for label in response.label_annotations[:5]],
|
||
'objects': [obj.name for obj in response.localized_object_annotations]
|
||
}
|
||
|
||
except Exception as e:
|
||
return {'error': str(e)}
|
||
|
||
def _process_image_analysis(self, analysis: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None):
|
||
"""Process Claude's image analysis results"""
|
||
|
||
# Check if text in image
|
||
if analysis.get('has_text'):
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Images - Text in Image",
|
||
f"Page {page_num}, Image {img_num}: Contains text: '{analysis.get('text_content', '')[:50]}'",
|
||
wcag_criterion="1.4.5",
|
||
recommendation="Replace image with actual text or provide text alternative",
|
||
page_number=page_num,
|
||
details=analysis,
|
||
coordinates=coordinates
|
||
)
|
||
|
||
# Check alt text quality
|
||
if analysis.get('type') == 'informational':
|
||
alt_text = analysis.get('alt_text', '')
|
||
if len(alt_text) > 125:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Images - Alt Text",
|
||
f"Page {page_num}, Image {img_num}: Suggested alt text is too long ({len(alt_text)} chars)",
|
||
wcag_criterion="1.1.1",
|
||
recommendation=f"Shorten alt text. Suggested: '{alt_text[:100]}...'",
|
||
page_number=page_num,
|
||
coordinates=coordinates
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Images - Alt Text",
|
||
f"Page {page_num}, Image {img_num}: Suggested alt text: '{alt_text}'",
|
||
wcag_criterion="1.1.1",
|
||
page_number=page_num,
|
||
coordinates=coordinates
|
||
)
|
||
|
||
# Check for color-only information
|
||
if analysis.get('color_only_info'):
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Images - Color Only",
|
||
f"Page {page_num}, Image {img_num}: Uses color as only means of conveying information",
|
||
wcag_criterion="1.4.1",
|
||
recommendation="Add patterns, labels, or text descriptions",
|
||
page_number=page_num,
|
||
coordinates=coordinates
|
||
)
|
||
|
||
# Flag images containing people for human review
|
||
if analysis.get('contains_people'):
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Images - People",
|
||
f"Page {page_num}, Image {img_num}: Image contains people — alt text description "
|
||
"should be verified by a human reviewer to ensure ethical and accurate representation.",
|
||
wcag_criterion="1.1.1",
|
||
recommendation="Review alt text to confirm it describes role/action rather than physical appearance.",
|
||
page_number=page_num,
|
||
coordinates=coordinates
|
||
)
|
||
|
||
# Note any detected brand names for reviewer awareness
|
||
brands = [b for b in analysis.get('brands_detected', []) if b]
|
||
if brands:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Images - Brands",
|
||
f"Page {page_num}, Image {img_num}: Brand name(s) detected: {', '.join(brands[:5])}. "
|
||
"Verify the alt text uses the specific brand name.",
|
||
wcag_criterion="1.1.1",
|
||
page_number=page_num,
|
||
coordinates=coordinates
|
||
)
|
||
|
||
# Quality concerns — capped at 2 per image, downgraded to INFO
|
||
# (these are advisory notes, not WCAG violations)
|
||
concerns = analysis.get('concerns', [])
|
||
for concern in concerns[:2]:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Images - Quality",
|
||
f"Page {page_num}, Image {img_num}: {concern}",
|
||
wcag_criterion="1.1.1",
|
||
page_number=page_num,
|
||
coordinates=coordinates
|
||
)
|
||
|
||
def _process_google_vision_results(self, results: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None):
|
||
"""Process Google Vision results — only report actionable findings."""
|
||
pass # Label detections alone are not accessibility issues; Claude already provides alt text
|
||
|
||
def _check_color_contrast(self):
|
||
"""Check color contrast using image analysis"""
|
||
logger.info("Checking color contrast...")
|
||
|
||
if self.quick_mode:
|
||
logger.info("Skipping detailed contrast analysis (quick mode)")
|
||
return
|
||
|
||
try:
|
||
# Reduced DPI from 150 to 100 for faster processing
|
||
images = convert_from_path(str(self.pdf_path), dpi=100, first_page=1, last_page=min(3, len(self.pdf_reader.pages)))
|
||
|
||
for i, image in enumerate(images):
|
||
contrast_results = ColorContrastChecker.check_image_contrast(image)
|
||
|
||
if 'error' in contrast_results:
|
||
continue
|
||
|
||
# Only flag edges that actually cross a light/dark boundary (filtered in sampler).
|
||
# >60% of those edges failing = genuine contrast problem.
|
||
# 30-60% = worth a warning. Below 30% = pass.
|
||
fail_pct = contrast_results['fail_aa_normal_percent']
|
||
if fail_pct > 60:
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Color Contrast",
|
||
f"Page {i+1}: {fail_pct:.1f}% of text-edge samples fail WCAG AA (4.5:1) — "
|
||
f"low contrast text likely present",
|
||
wcag_criterion="1.4.3",
|
||
recommendation="Use Colour Contrast Analyser to identify and fix low-contrast text",
|
||
page_number=i+1,
|
||
details=contrast_results
|
||
)
|
||
elif fail_pct > 30:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Color Contrast",
|
||
f"Page {i+1}: {fail_pct:.1f}% of text-edge samples fail WCAG AA — "
|
||
f"verify contrast manually with Colour Contrast Analyser",
|
||
wcag_criterion="1.4.3",
|
||
recommendation="Check text against its background using the Colour Contrast Analyser tool",
|
||
page_number=i+1,
|
||
details=contrast_results
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Contrast check skipped: {str(e)}")
|
||
|
||
def _check_readability(self):
|
||
"""Check content readability (language-aware: Flesch only for English)."""
|
||
# Extract all text
|
||
all_text = ""
|
||
for page in self.pdf_plumber.pages:
|
||
text = page.extract_text()
|
||
if text:
|
||
all_text += text + "\n"
|
||
|
||
if len(all_text) < 100:
|
||
return
|
||
|
||
# Flesch Reading Ease is an English-only formula — skip for other languages
|
||
is_english = self._detected_lang in ('en', 'en-us', 'en-gb')
|
||
|
||
if is_english:
|
||
analysis = ReadabilityAnalyzer.analyze(all_text)
|
||
|
||
if 'error' in analysis:
|
||
return
|
||
|
||
# Check Flesch Reading Ease — readability is advisory, cap at WARNING
|
||
if analysis['flesch_reading_ease'] < 60:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Readability",
|
||
f"Content is difficult to read (Flesch score: {analysis['flesch_reading_ease']}/100)",
|
||
wcag_criterion="3.1.5",
|
||
recommendation="Simplify language to reach 8th-9th grade level (target score: 60+)",
|
||
details=analysis
|
||
)
|
||
|
||
# Check grade level
|
||
if analysis['flesch_kincaid_grade'] > 10:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Readability",
|
||
f"Content requires grade {analysis['flesch_kincaid_grade']} reading level",
|
||
wcag_criterion="3.1.5",
|
||
recommendation="Target grade 8-10 for general audiences",
|
||
details=analysis
|
||
)
|
||
|
||
# Long-sentence check is language-agnostic
|
||
sentences = [s.strip() for s in re.split(r'[.!?]+', all_text) if s.strip()]
|
||
long_sentences = [s for s in sentences if len(s.split()) > 25]
|
||
if len(long_sentences) > 5:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Readability",
|
||
f"{len(long_sentences)} sentences exceed 25 words",
|
||
wcag_criterion="3.1.5",
|
||
recommendation="Break long sentences for better comprehension",
|
||
details={'long_sentences_count': len(long_sentences),
|
||
'detected_language': self._detected_lang}
|
||
)
|
||
|
||
def _check_links(self):
|
||
"""Check link quality (WCAG 2.4.4) — only checks actual hyperlink label text."""
|
||
unclear_patterns = [
|
||
# English
|
||
r'\bclick here\b', r'\bhere\b', r'\bread more\b',
|
||
r'\bmore\b', r'\bthis\b', r'\blink\b',
|
||
# Ukrainian
|
||
r'\bнатисніть тут\b', r'\bтут\b', r'\bдокладніше\b',
|
||
r'\bбільше\b', r'\bцe\b', r'\bпосилання\b',
|
||
# Russian
|
||
r'\bнажмите здесь\b', r'\bздесь\b', r'\bподробнее\b',
|
||
r'\bбольше\b', r'\bэто\b', r'\bссылка\b',
|
||
# German
|
||
r'\bhier klicken\b', r'\bhier\b', r'\bmehr lesen\b',
|
||
r'\bmehr\b', r'\bdies\b', r'\blink\b',
|
||
# French
|
||
r'\bcliquez ici\b', r'\bici\b', r'\blire la suite\b',
|
||
r'\bplus\b', r'\bceci\b', r'\blien\b',
|
||
# Spanish
|
||
r'\bhaz clic aquí\b', r'\baquí\b', r'\beer más\b',
|
||
r'\bmás\b', r'\besto\b', r'\benlace\b',
|
||
# Polish
|
||
r'\bkliknij tutaj\b', r'\btutaj\b', r'\bczytaj więcej\b',
|
||
r'\bwięcej\b', r'\bto\b', r'\blink\b',
|
||
]
|
||
|
||
for i, (page_plumber, page_pypdf) in enumerate(
|
||
zip(self.pdf_plumber.pages, self.pdf_reader.pages)
|
||
):
|
||
annots_raw = page_pypdf.get("/Annots")
|
||
if not annots_raw:
|
||
continue
|
||
|
||
page_height = float(page_plumber.height)
|
||
page_flagged = False
|
||
|
||
for annot_ref in annots_raw:
|
||
try:
|
||
annot = annot_ref.get_object()
|
||
except Exception:
|
||
continue
|
||
|
||
# Only process URI hyperlinks
|
||
if annot.get("/Subtype") != "/Link":
|
||
continue
|
||
action = annot.get("/A")
|
||
if not action or action.get("/S") != "/URI":
|
||
continue
|
||
|
||
# Get annotation bounding box (PDF coords: bottom-left origin)
|
||
rect = annot.get("/Rect")
|
||
if not rect or len(rect) < 4:
|
||
continue
|
||
x0, y0, x1, y1 = (float(rect[0]), float(rect[1]),
|
||
float(rect[2]), float(rect[3]))
|
||
|
||
# Convert to pdfplumber coords (top-left origin)
|
||
top = page_height - y1
|
||
bottom = page_height - y0
|
||
if x0 >= x1 or top >= bottom:
|
||
continue
|
||
|
||
# Extract only the text inside the hyperlink rectangle
|
||
try:
|
||
link_text = (
|
||
page_plumber.within_bbox((x0, top, x1, bottom))
|
||
.extract_text() or ""
|
||
).strip()
|
||
except Exception:
|
||
continue
|
||
|
||
if not link_text:
|
||
continue # image-only link — skip
|
||
|
||
for pattern in unclear_patterns:
|
||
if re.search(pattern, link_text, re.IGNORECASE):
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Link Text",
|
||
f"Page {i+1}: Unclear link text \"{link_text}\" — should describe the destination",
|
||
wcag_criterion="2.4.4",
|
||
recommendation="Use descriptive link text that makes sense out of context",
|
||
page_number=i+1
|
||
)
|
||
page_flagged = True
|
||
break # one issue per link is enough
|
||
|
||
if page_flagged:
|
||
break # one issue per page
|
||
|
||
def _check_headings(self):
|
||
"""Check heading structure and hierarchy"""
|
||
catalog = self.pdf_reader.trailer.get("/Root", {})
|
||
|
||
if "/StructTreeRoot" not in catalog:
|
||
self.add_issue(
|
||
Severity.ERROR, "Headings",
|
||
"No structure tree - cannot verify heading hierarchy",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Tag document with proper heading structure")
|
||
return
|
||
|
||
struct_tree = catalog["/StructTreeRoot"]
|
||
if hasattr(struct_tree, 'get_object'):
|
||
struct_tree = struct_tree.get_object()
|
||
|
||
# Load RoleMap so custom tag names (e.g. /Heading1) resolve to standard ones (/H1)
|
||
role_map = {}
|
||
if "/RoleMap" in struct_tree:
|
||
rm = struct_tree["/RoleMap"]
|
||
if hasattr(rm, 'get_object'):
|
||
rm = rm.get_object()
|
||
try:
|
||
for key, value in rm.items():
|
||
role_map[str(key)] = str(value)
|
||
except (AttributeError, TypeError):
|
||
pass
|
||
|
||
headings = []
|
||
HEADING_TAGS = {"/H1", "/H2", "/H3", "/H4", "/H5", "/H6"}
|
||
|
||
def walk_tree(element, depth=0):
|
||
if depth > 100:
|
||
return
|
||
try:
|
||
if hasattr(element, 'get_object'):
|
||
element = element.get_object()
|
||
if isinstance(element, dict):
|
||
tag = str(element.get("/S", ""))
|
||
mapped_tag = role_map.get(tag, tag)
|
||
if mapped_tag in HEADING_TAGS:
|
||
headings.append(int(mapped_tag[2]))
|
||
kids = element.get("/K", [])
|
||
if isinstance(kids, list):
|
||
for kid in kids:
|
||
walk_tree(kid, depth + 1)
|
||
elif kids:
|
||
walk_tree(kids, depth + 1)
|
||
except (AttributeError, TypeError, KeyError):
|
||
pass
|
||
|
||
try:
|
||
walk_tree(struct_tree)
|
||
except Exception as e:
|
||
logger.warning(f"Could not fully parse structure tree: {e}")
|
||
|
||
if not headings:
|
||
self.add_issue(
|
||
Severity.WARNING, "Headings",
|
||
"No heading tags (H1-H6) found in structure tree",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Add heading tags to establish document hierarchy")
|
||
return
|
||
|
||
if headings[0] != 1:
|
||
self.add_issue(
|
||
Severity.ERROR, "Headings",
|
||
f"Document does not start with H1 (starts with H{headings[0]})",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="First heading should be H1")
|
||
|
||
for i in range(1, len(headings)):
|
||
if headings[i] > headings[i - 1] + 1:
|
||
self.add_issue(
|
||
Severity.WARNING, "Headings",
|
||
f"Heading level skipped: H{headings[i - 1]} to H{headings[i]}",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Do not skip heading levels")
|
||
|
||
heading_str = ", ".join(f"H{h}" for h in headings[:10])
|
||
if len(headings) > 10:
|
||
heading_str += "..."
|
||
has_issues = any(
|
||
i.severity in [Severity.ERROR, Severity.WARNING]
|
||
for i in self.issues if i.category == "Headings"
|
||
)
|
||
self.add_issue(
|
||
Severity.INFO if has_issues else Severity.SUCCESS, "Headings",
|
||
f"Found {len(headings)} headings: {heading_str}",
|
||
wcag_criterion="1.3.1")
|
||
|
||
def _check_tab_order(self):
|
||
"""Check tab order is set for pages"""
|
||
pages_without_tabs = []
|
||
for i, page in enumerate(self.pdf_reader.pages):
|
||
if "/Tabs" not in page:
|
||
pages_without_tabs.append(i + 1)
|
||
|
||
if pages_without_tabs:
|
||
if len(pages_without_tabs) == len(self.pdf_reader.pages):
|
||
self.add_issue(
|
||
Severity.ERROR, "Tab Order",
|
||
"No pages have tab order defined",
|
||
wcag_criterion="2.4.3",
|
||
recommendation="Set /Tabs to /S (structure order) for all pages")
|
||
else:
|
||
self.add_issue(
|
||
Severity.WARNING, "Tab Order",
|
||
f"{len(pages_without_tabs)} page(s) missing tab order",
|
||
wcag_criterion="2.4.3",
|
||
recommendation="Set /Tabs entry on all pages")
|
||
else:
|
||
tab_types = set()
|
||
for page in self.pdf_reader.pages:
|
||
tab_types.add(str(page.get("/Tabs", "")))
|
||
self.add_issue(
|
||
Severity.SUCCESS, "Tab Order",
|
||
f"Tab order set on all pages (types: {', '.join(tab_types)})",
|
||
wcag_criterion="2.4.3")
|
||
|
||
def _check_role_mapping(self):
|
||
"""Check role mapping for custom tags"""
|
||
catalog = self.pdf_reader.trailer.get("/Root", {})
|
||
|
||
if "/StructTreeRoot" not in catalog:
|
||
return # Already flagged by heading/structure checks
|
||
|
||
struct_tree = catalog["/StructTreeRoot"]
|
||
if hasattr(struct_tree, 'get_object'):
|
||
struct_tree = struct_tree.get_object()
|
||
|
||
if "/RoleMap" in struct_tree:
|
||
role_map = struct_tree["/RoleMap"]
|
||
if hasattr(role_map, 'get_object'):
|
||
role_map = role_map.get_object()
|
||
|
||
standard_roles = {
|
||
"/P", "/H1", "/H2", "/H3", "/H4", "/H5", "/H6",
|
||
"/Table", "/TR", "/TD", "/TH", "/L", "/LI", "/Lbl",
|
||
"/LBody", "/Span", "/Link", "/Figure", "/Form",
|
||
"/Sect", "/Art", "/Div", "/BlockQuote", "/TOC", "/TOCI"
|
||
}
|
||
|
||
mapped = {}
|
||
try:
|
||
for key, value in role_map.items():
|
||
mapped[key] = str(value)
|
||
except (AttributeError, TypeError):
|
||
pass
|
||
|
||
unmapped = {k: v for k, v in mapped.items() if v not in standard_roles}
|
||
if unmapped:
|
||
self.add_issue(
|
||
Severity.WARNING, "Role Mapping",
|
||
f"{len(unmapped)} custom role(s) map to non-standard tags",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Ensure all custom roles map to standard PDF tags")
|
||
else:
|
||
self.add_issue(
|
||
Severity.SUCCESS, "Role Mapping",
|
||
f"All {len(mapped)} custom roles correctly mapped",
|
||
wcag_criterion="1.3.1")
|
||
else:
|
||
self.add_issue(
|
||
Severity.INFO, "Role Mapping",
|
||
"No custom role mapping (document uses standard tags only)",
|
||
wcag_criterion="1.3.1")
|
||
|
||
def _check_forms(self):
|
||
"""Check form field accessibility"""
|
||
catalog = self.pdf_reader.trailer.get("/Root", {})
|
||
|
||
if "/AcroForm" not in catalog:
|
||
return
|
||
|
||
acro_form = catalog["/AcroForm"]
|
||
if "/Fields" not in acro_form:
|
||
return
|
||
|
||
fields = acro_form["/Fields"]
|
||
field_issues = []
|
||
|
||
for field in fields:
|
||
field = field.get_object()
|
||
field_name = field.get("/T", "Unnamed")
|
||
has_tooltip = "/TU" in field
|
||
|
||
if not has_tooltip:
|
||
field_issues.append(field_name)
|
||
|
||
if field_issues:
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Forms",
|
||
f"{len(field_issues)} form field(s) missing descriptions/tooltips",
|
||
wcag_criterion="3.3.2, 4.1.2",
|
||
recommendation="Add tooltip descriptions to all form fields",
|
||
details={'fields': field_issues}
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.SUCCESS,
|
||
"Forms",
|
||
f"All {len(fields)} form fields have descriptions",
|
||
wcag_criterion="3.3.2"
|
||
)
|
||
|
||
def _check_tables(self):
|
||
"""Check table accessibility using PDF structure tree (tagged tables)."""
|
||
catalog = self.pdf_reader.trailer.get("/Root", {})
|
||
struct_tree = catalog.get("/StructTreeRoot")
|
||
|
||
tables_found = 0
|
||
tables_ok = 0
|
||
|
||
if struct_tree:
|
||
def walk(node, depth=0):
|
||
nonlocal tables_found, tables_ok
|
||
if depth > 50:
|
||
return
|
||
try:
|
||
obj = node.get_object() if hasattr(node, 'get_object') else node
|
||
if not isinstance(obj, dict):
|
||
return
|
||
role = obj.get("/S") or obj.get("/Type")
|
||
if role and str(role) == "/Table":
|
||
tables_found += 1
|
||
ok = self._analyze_table(obj, tables_found)
|
||
if ok:
|
||
tables_ok += 1
|
||
return # don't recurse into table internals
|
||
kids = obj.get("/K", [])
|
||
if not isinstance(kids, list):
|
||
kids = [kids]
|
||
for kid in kids:
|
||
if kid is not None:
|
||
walk(kid, depth + 1)
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
walk(struct_tree)
|
||
except Exception as e:
|
||
logger.warning(f"Structure tree walk failed: {e}")
|
||
|
||
if tables_found == 0:
|
||
# Fallback: visual detection via pdfplumber (for untagged docs)
|
||
visual_tables = 0
|
||
for i, page in enumerate(self.pdf_plumber.pages):
|
||
try:
|
||
tbls = page.find_tables()
|
||
visual_tables += len(tbls)
|
||
except Exception:
|
||
pass
|
||
|
||
if visual_tables > 0:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Tables",
|
||
f"{visual_tables} visual table(s) detected but not tagged in structure tree",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Tag tables with proper Table/TR/TH/TD structure elements"
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Tables",
|
||
"No tables detected in document",
|
||
wcag_criterion="1.3.1"
|
||
)
|
||
elif tables_ok == tables_found:
|
||
self.add_issue(
|
||
Severity.SUCCESS,
|
||
"Tables",
|
||
f"{tables_found} table(s) with proper header and scope structure",
|
||
wcag_criterion="1.3.1"
|
||
)
|
||
|
||
def _analyze_table(self, table_obj: dict, table_num: int) -> bool:
|
||
"""Analyse a single /Table structure element. Returns True if no issues found."""
|
||
kids = table_obj.get("/K", [])
|
||
if not isinstance(kids, list):
|
||
kids = [kids]
|
||
|
||
stats = {
|
||
'rows': 0, 'th_cells': 0, 'td_cells': 0,
|
||
'th_with_scope': 0, 'has_caption': False,
|
||
}
|
||
self._collect_table_stats(kids, stats)
|
||
|
||
issues_added = False
|
||
total_cells = stats['th_cells'] + stats['td_cells']
|
||
|
||
if stats['rows'] == 0 and total_cells == 0:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Tables",
|
||
f"Table {table_num}: empty — no TR/TH/TD elements found in structure tree",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Ensure the table is properly tagged with TR rows and TH/TD cells"
|
||
)
|
||
return False
|
||
|
||
if stats['th_cells'] == 0:
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Tables",
|
||
f"Table {table_num}: no header cells (TH) — {stats['rows']} row(s), {total_cells} data cell(s). "
|
||
f"Screen readers cannot identify column or row headers.",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Mark header cells as TH with scope='col' (column headers) or scope='row' (row headers)"
|
||
)
|
||
issues_added = True
|
||
elif stats['th_with_scope'] < stats['th_cells']:
|
||
missing = stats['th_cells'] - stats['th_with_scope']
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Tables",
|
||
f"Table {table_num}: {missing} of {stats['th_cells']} TH header cell(s) missing scope attribute",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Add scope='col' to column headers and scope='row' to row headers"
|
||
)
|
||
issues_added = True
|
||
|
||
if not stats['has_caption'] and total_cells > 6:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Tables",
|
||
f"Table {table_num}: no Caption element ({stats['rows']} rows, ~{total_cells} cells). "
|
||
f"A Caption helps screen readers identify the table — ensure a visible title exists nearby.",
|
||
wcag_criterion="1.3.1",
|
||
recommendation="Add a Caption as the first child of the Table element if no visible title precedes it"
|
||
)
|
||
# Not counted as a hard issue — don't set issues_added = True
|
||
|
||
return not issues_added
|
||
|
||
def _collect_table_stats(self, kids: list, stats: dict, depth: int = 0):
|
||
"""Recursively collect structural stats from a table's children."""
|
||
if depth > 15:
|
||
return
|
||
for kid in kids:
|
||
try:
|
||
obj = kid.get_object() if hasattr(kid, 'get_object') else kid
|
||
if not isinstance(obj, dict):
|
||
continue
|
||
role = str(obj.get("/S") or obj.get("/Type") or "")
|
||
|
||
if role == "/TR":
|
||
stats['rows'] += 1
|
||
elif role == "/TH":
|
||
stats['th_cells'] += 1
|
||
if self._th_has_scope(obj):
|
||
stats['th_with_scope'] += 1
|
||
elif role == "/TD":
|
||
stats['td_cells'] += 1
|
||
elif role == "/Caption":
|
||
stats['has_caption'] = True
|
||
|
||
sub_kids = obj.get("/K", [])
|
||
if not isinstance(sub_kids, list):
|
||
sub_kids = [sub_kids]
|
||
if sub_kids:
|
||
self._collect_table_stats(sub_kids, stats, depth + 1)
|
||
except Exception:
|
||
continue
|
||
|
||
def _th_has_scope(self, th_obj: dict) -> bool:
|
||
"""Return True if a TH element carries a Scope attribute."""
|
||
attrs = th_obj.get("/A")
|
||
if not attrs:
|
||
return False
|
||
try:
|
||
# /A can be a single attribute dict or a list of dicts
|
||
a = attrs.get_object() if hasattr(attrs, 'get_object') else attrs
|
||
if isinstance(a, dict):
|
||
return "/Scope" in a
|
||
if isinstance(a, list):
|
||
for item in a:
|
||
try:
|
||
d = item.get_object() if hasattr(item, 'get_object') else item
|
||
if isinstance(d, dict) and "/Scope" in d:
|
||
return True
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
def _check_reading_order(self):
|
||
"""Check reading order"""
|
||
catalog = self.pdf_reader.trailer.get("/Root", {})
|
||
|
||
if "/StructTreeRoot" not in catalog:
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"Reading Order",
|
||
"No structure tree - reading order cannot be determined",
|
||
wcag_criterion="1.3.2",
|
||
recommendation="Tag document to establish proper reading order"
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Reading Order",
|
||
"Structure tree present - verify reading order with screen reader",
|
||
wcag_criterion="1.3.2",
|
||
recommendation="Test with NVDA or JAWS to verify logical reading order"
|
||
)
|
||
|
||
def _check_fonts(self):
|
||
"""Check font embedding"""
|
||
embedded_count = 0
|
||
non_embedded_fonts: set = set()
|
||
|
||
for page in self.pdf_reader.pages:
|
||
resources = page.get("/Resources", {})
|
||
if "/Font" not in resources:
|
||
continue
|
||
fonts = resources["/Font"]
|
||
for font_key, font_ref in fonts.items():
|
||
try:
|
||
font_obj = font_ref.get_object()
|
||
except Exception:
|
||
continue
|
||
is_embedded = (
|
||
"/FontFile" in font_obj
|
||
or "/FontFile2" in font_obj
|
||
or "/FontFile3" in font_obj
|
||
or "/FontDescriptor" in font_obj and (
|
||
"/FontFile" in font_obj["/FontDescriptor"].get_object()
|
||
or "/FontFile2" in font_obj["/FontDescriptor"].get_object()
|
||
or "/FontFile3" in font_obj["/FontDescriptor"].get_object()
|
||
)
|
||
)
|
||
if is_embedded:
|
||
embedded_count += 1
|
||
else:
|
||
base_font = font_obj.get("/BaseFont", font_key)
|
||
non_embedded_fonts.add(str(base_font).lstrip('/'))
|
||
|
||
if non_embedded_fonts:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Fonts",
|
||
f"{len(non_embedded_fonts)} fonts not embedded",
|
||
wcag_criterion="1.4.4",
|
||
recommendation="Embed all fonts for consistent rendering",
|
||
details={"non_embedded_fonts": sorted(non_embedded_fonts)}
|
||
)
|
||
|
||
def _check_security(self):
|
||
"""Check security settings"""
|
||
if self.pdf_reader.is_encrypted:
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"Security",
|
||
"Document is encrypted",
|
||
recommendation="Ensure assistive technology can access content"
|
||
)
|
||
|
||
def _check_bookmarks(self):
|
||
"""Check navigation bookmarks"""
|
||
outlines = self.pdf_reader.outline
|
||
total_pages = len(self.pdf_reader.pages)
|
||
|
||
if not outlines and total_pages > 5:
|
||
self.add_issue(
|
||
Severity.INFO,
|
||
"Navigation",
|
||
"No bookmarks found",
|
||
wcag_criterion="2.4.5",
|
||
recommendation=f"Add bookmarks for {total_pages}-page document to aid navigation"
|
||
)
|
||
elif outlines:
|
||
self.add_issue(
|
||
Severity.SUCCESS,
|
||
"Navigation",
|
||
"Document has navigation bookmarks",
|
||
wcag_criterion="2.4.5"
|
||
)
|
||
|
||
def _check_verapdf_validation(self):
|
||
"""Run veraPDF PDF/UA validation"""
|
||
if not VeraPDFValidator:
|
||
logger.warning("veraPDF not available - skipping")
|
||
return
|
||
|
||
logger.info("Running veraPDF PDF/UA validation...")
|
||
|
||
try:
|
||
validator = VeraPDFValidator()
|
||
results = validator.validate(str(self.pdf_path))
|
||
|
||
if 'error' in results:
|
||
logger.warning(f"veraPDF validation error: {results['error']}")
|
||
return
|
||
|
||
self.verapdf_results = results
|
||
|
||
# Report compliance status
|
||
if results['compliant']:
|
||
self.add_issue(
|
||
Severity.SUCCESS,
|
||
"PDF/UA Compliance",
|
||
f"Document passes PDF/UA-1 validation ({results['passed_rules']} rules passed)",
|
||
wcag_criterion="PDF/UA",
|
||
recommendation="Document meets PDF/UA structure requirements"
|
||
)
|
||
else:
|
||
self.add_issue(
|
||
Severity.ERROR,
|
||
"PDF/UA Compliance",
|
||
f"Document fails PDF/UA-1 validation ({results['failed_rules']} rules failed, {results['failed_checks']} checks failed)",
|
||
wcag_criterion="PDF/UA",
|
||
recommendation="Fix structure issues reported by veraPDF"
|
||
)
|
||
|
||
# Add specific errors as issues
|
||
for error in results.get('errors', [])[:10]: # Limit to first 10
|
||
self.add_issue(
|
||
Severity.WARNING,
|
||
"PDF/UA Structure",
|
||
f"Clause {error['clause']}: {error['description'][:150]}",
|
||
wcag_criterion="PDF/UA",
|
||
recommendation="Consult veraPDF documentation for this clause"
|
||
)
|
||
|
||
logger.info(f"veraPDF: {results['passed_rules']} passed, {results['failed_rules']} failed")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"veraPDF check error: {str(e)}")
|
||
|
||
def _analyze_remediation_options(self):
|
||
"""Analyze what can be auto-fixed"""
|
||
if not PDFRemediator:
|
||
return
|
||
|
||
logger.info("Analyzing auto-remediation options...")
|
||
|
||
try:
|
||
remediator = PDFRemediator(str(self.pdf_path))
|
||
suggestions = remediator.analyze_and_suggest_fixes()
|
||
|
||
self.remediation_suggestions = suggestions
|
||
|
||
# Count fixable issues
|
||
total_fixable = sum(
|
||
len([f for f in fixes if f.get('auto_fixable')])
|
||
for fixes in suggestions.values()
|
||
)
|
||
|
||
if total_fixable > 0:
|
||
logger.info(f"{total_fixable} issues can be auto-fixed")
|
||
else:
|
||
logger.info("No auto-fixable issues found")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Remediation analysis error: {str(e)}")
|
||
|
||
# ==================== HELPER METHODS ====================
|
||
|
||
def _extract_image_from_page(self, page, img_info) -> Optional[bytes]:
|
||
"""Extract image bytes from PDF page"""
|
||
try:
|
||
# Get image coordinates
|
||
x0, y0, x1, y1 = img_info['x0'], img_info['top'], img_info['x1'], img_info['bottom']
|
||
|
||
# Crop page to image area
|
||
cropped = page.crop((x0, y0, x1, y1))
|
||
|
||
# Convert to PIL Image
|
||
pil_image = cropped.to_image(resolution=150).original
|
||
|
||
# Convert to bytes
|
||
buffer = BytesIO()
|
||
pil_image.save(buffer, format='JPEG', quality=85)
|
||
return buffer.getvalue()
|
||
|
||
except Exception as e:
|
||
return None
|
||
|
||
def _image_data_size(self, image_data: bytes) -> int:
|
||
"""Return byte size of image data — used to filter out tiny decorative images."""
|
||
return len(image_data) if image_data else 0
|
||
|
||
def _generate_page_images(self, output_dir: Path, dpi: int = 150):
|
||
"""Generate PNG images for each page for visual display"""
|
||
if not self.generate_images:
|
||
return
|
||
|
||
logger.info("Generating page images for visual display...")
|
||
|
||
try:
|
||
from pdf2image import convert_from_path
|
||
except ImportError:
|
||
logger.warning("pdf2image not available - skipping page image generation")
|
||
return
|
||
|
||
try:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Convert pages to images
|
||
# Store DPI for coordinate scaling
|
||
self.page_image_dpi = dpi
|
||
images = convert_from_path(
|
||
str(self.pdf_path),
|
||
dpi=dpi,
|
||
fmt='png'
|
||
)
|
||
|
||
for page_num, image in enumerate(images, start=1):
|
||
# Save as PNG
|
||
image_filename = f"page_{page_num}.png"
|
||
image_path = output_dir / image_filename
|
||
image.save(image_path, 'PNG')
|
||
self.page_images[page_num] = image_filename
|
||
logger.info(f"Page {page_num}/{len(images)}")
|
||
|
||
logger.info(f"Generated {len(images)} page images at {dpi} DPI")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Could not generate page images: {str(e)}")
|
||
|
||
# ==================== REPORTING ====================
|
||
|
||
def _build_matterhorn_summary(self) -> dict:
|
||
"""Build Matterhorn Protocol PDF/UA-1 checkpoint summary."""
|
||
# Map check names to Matterhorn checkpoint IDs
|
||
CHECK_TO_MATTERHORN = {
|
||
"Document Structure": ["01", "02", "09"],
|
||
"Metadata": ["06", "07"],
|
||
"Language Declaration": ["11"],
|
||
"Text Extractability": ["01", "08"],
|
||
"OCR Quality": ["08"],
|
||
"Image Accessibility": ["13"],
|
||
"Color Contrast": ["04"],
|
||
"Content Readability": [],
|
||
"Link Quality": ["27", "28"],
|
||
"Heading Structure": ["14"],
|
||
"Tab Order": ["28"],
|
||
"Role Mapping": ["02"],
|
||
"Form Accessibility": ["24", "28"],
|
||
"Table Structure": ["15"],
|
||
"Reading Order": ["09"],
|
||
"Font Accessibility": ["31"],
|
||
"Security Settings": ["26"],
|
||
"Navigation Aids": ["27"],
|
||
"PDF/UA Structure (veraPDF)": [], # Covers all M conditions
|
||
}
|
||
|
||
# Checkpoint definitions: id, name, how (M=machine/H=human)
|
||
CHECKPOINTS = [
|
||
("01", "Real content tagged", "M"),
|
||
("02", "Role mapping", "M"),
|
||
("03", "Flickering content", "H"),
|
||
("04", "Color and contrast", "H"),
|
||
("05", "Sound content", "H"),
|
||
("06", "Metadata – title", "M"),
|
||
("07", "Metadata – language", "M"),
|
||
("08", "Text content", "M"),
|
||
("09", "Reading order", "M"),
|
||
("10", "Tab order", "M"),
|
||
("11", "Natural language", "M"),
|
||
("12", "Character encoding", "M"),
|
||
("13", "Graphics / alt text", "H"),
|
||
("14", "Headings", "M"),
|
||
("15", "Tables", "M"),
|
||
("16", "Lists", "M"),
|
||
("17", "Mathematical expressions", "H"),
|
||
("18", "Page headers / footers", "H"),
|
||
("19", "Notes / references", "H"),
|
||
("20", "Optional content", "M"),
|
||
("21", "Embedded files", "M"),
|
||
("22", "Article threads", "H"),
|
||
("23", "Digital signatures", "H"),
|
||
("24", "Non-interactive forms", "H"),
|
||
("25", "XFA forms", "M"),
|
||
("26", "Security", "M"),
|
||
("27", "Navigation", "M"),
|
||
("28", "Annotations", "M"),
|
||
("29", "Actions", "M"),
|
||
("30", "XObjects", "M"),
|
||
("31", "Fonts", "M"),
|
||
]
|
||
|
||
# Build a map: checkpoint_id -> pass/fail/not_tested from our check results
|
||
cp_status: dict = {} # id -> "PASS" | "FAIL" | "NOT_TESTED"
|
||
|
||
check_name_to_result = {cr.check_name: cr.passed for cr in self.check_results}
|
||
|
||
# Determine which checkpoints are covered and whether they passed
|
||
for check_name, cp_ids in CHECK_TO_MATTERHORN.items():
|
||
result_passed = check_name_to_result.get(check_name)
|
||
if result_passed is None:
|
||
continue
|
||
for cp_id in cp_ids:
|
||
if cp_id not in cp_status:
|
||
cp_status[cp_id] = "PASS" if result_passed else "FAIL"
|
||
elif not result_passed:
|
||
# Any failure overrides a pass
|
||
cp_status[cp_id] = "FAIL"
|
||
|
||
# Handle PDF/UA veraPDF: if it passed, mark all M checkpoints as PASS unless already FAIL
|
||
verapdf_passed = check_name_to_result.get("PDF/UA Structure (veraPDF)")
|
||
if verapdf_passed:
|
||
for cp_id, _, how in CHECKPOINTS:
|
||
if how == "M" and cp_id not in cp_status:
|
||
cp_status[cp_id] = "PASS"
|
||
|
||
checkpoints_out = []
|
||
any_fail = False
|
||
for cp_id, cp_name, cp_how in CHECKPOINTS:
|
||
status = cp_status.get(cp_id, "NOT_TESTED")
|
||
if status == "FAIL":
|
||
any_fail = True
|
||
checkpoints_out.append({
|
||
"id": cp_id,
|
||
"name": cp_name,
|
||
"how": cp_how,
|
||
"status": status,
|
||
})
|
||
|
||
return {
|
||
"standard": "PDF/UA-1",
|
||
"overall_passed": not any_fail,
|
||
"checkpoints": checkpoints_out,
|
||
}
|
||
|
||
def _generate_summary(self) -> Dict[str, Any]:
|
||
"""Generate comprehensive summary"""
|
||
severity_counts = {
|
||
'critical': len([i for i in self.issues if i.severity == Severity.CRITICAL]),
|
||
'error': len([i for i in self.issues if i.severity == Severity.ERROR]),
|
||
'warning': len([i for i in self.issues if i.severity == Severity.WARNING]),
|
||
'info': len([i for i in self.issues if i.severity == Severity.INFO]),
|
||
'success': len([i for i in self.issues if i.severity == Severity.SUCCESS])
|
||
}
|
||
|
||
# Calculate score based on check-pass ratio
|
||
passed_checks = len([cr for cr in self.check_results if cr.passed])
|
||
total_checks = len(self.check_results)
|
||
base_score = round(100 * passed_checks / total_checks) if total_checks else 0
|
||
|
||
# Soft penalty for critical/error issues (capped at 20)
|
||
penalty = min(20, severity_counts['critical'] * 5 + severity_counts['error'] * 2)
|
||
score = max(0, base_score - penalty)
|
||
|
||
# Convert datetime objects to strings for JSON serialization
|
||
stats_serializable = {}
|
||
for key, value in self.stats.items():
|
||
if isinstance(value, datetime):
|
||
stats_serializable[key] = value.isoformat()
|
||
else:
|
||
stats_serializable[key] = value
|
||
|
||
# Count auto-fixable issues
|
||
auto_fixable_count = 0
|
||
if self.remediation_suggestions:
|
||
auto_fixable_count = sum(
|
||
len([f for f in fixes if f.get('auto_fixable')])
|
||
for fixes in self.remediation_suggestions.values()
|
||
)
|
||
|
||
# WCAG compliance summary
|
||
failing_criteria: set = set()
|
||
for issue in self.issues:
|
||
if issue.severity in (Severity.CRITICAL, Severity.ERROR):
|
||
for c in issue.wcag_criterion.split(','):
|
||
c = c.strip()
|
||
if c and c != 'PDF/UA':
|
||
failing_criteria.add(c)
|
||
|
||
level_a_fails = sorted([c for c in failing_criteria if WCAG_LEVELS.get(c) == 'A'])
|
||
level_aa_fails = sorted([c for c in failing_criteria if WCAG_LEVELS.get(c) in ('A', 'AA')])
|
||
|
||
wcag_compliance = {
|
||
'level_a': len(level_a_fails) == 0,
|
||
'level_aa': len(level_aa_fails) == 0,
|
||
'level_a_failures': level_a_fails,
|
||
'level_aa_failures': level_aa_fails,
|
||
}
|
||
|
||
# Prioritised next steps
|
||
next_steps = []
|
||
seen_recs: set = set()
|
||
for sev in (Severity.CRITICAL, Severity.ERROR, Severity.WARNING):
|
||
for issue in self.issues:
|
||
if issue.severity != sev:
|
||
continue
|
||
action = issue.recommendation or issue.description
|
||
if action in seen_recs:
|
||
continue
|
||
seen_recs.add(action)
|
||
next_steps.append({
|
||
'priority': 1 if sev == Severity.CRITICAL else 2 if sev == Severity.ERROR else 3,
|
||
'category': issue.category,
|
||
'action': action,
|
||
'wcag': issue.wcag_criterion,
|
||
'wcag_level': WCAG_LEVELS.get(issue.wcag_criterion.split(',')[0].strip(), ''),
|
||
})
|
||
if len(next_steps) >= 8:
|
||
break
|
||
if len(next_steps) >= 8:
|
||
break
|
||
|
||
return {
|
||
'filename': self.pdf_path.name,
|
||
'total_pages': len(self.pdf_reader.pages),
|
||
'accessibility_score': score,
|
||
'score_breakdown': {
|
||
'checks_passed': passed_checks,
|
||
'checks_total': total_checks,
|
||
'base_score': base_score,
|
||
'penalty': penalty,
|
||
'final_score': score,
|
||
'per_check': [
|
||
{'name': cr.check_name, 'passed': cr.passed}
|
||
for cr in self.check_results
|
||
]
|
||
},
|
||
'matterhorn_summary': self._build_matterhorn_summary(),
|
||
'severity_counts': severity_counts,
|
||
'total_issues': len(self.issues),
|
||
'auto_fixable_count': auto_fixable_count,
|
||
'stats': stats_serializable,
|
||
'page_images': self.page_images, # Map of page_num -> image_filename
|
||
'page_image_dpi': getattr(self, 'page_image_dpi', 150), # DPI for coordinate scaling
|
||
'verapdf_validation': self.verapdf_results,
|
||
'remediation_suggestions': self.remediation_suggestions,
|
||
'checks_performed': [
|
||
{
|
||
'name': cr.check_name,
|
||
'passed': cr.passed,
|
||
'duration': cr.duration
|
||
}
|
||
for cr in self.check_results
|
||
],
|
||
'issues': [issue.to_dict() for issue in self.issues],
|
||
'wcag_compliance': wcag_compliance,
|
||
'next_steps': next_steps,
|
||
}
|
||
|
||
def generate_json_report(self) -> str:
|
||
"""Generate JSON report"""
|
||
summary = self._generate_summary()
|
||
return json.dumps(summary, indent=2)
|
||
|
||
def run_full_check(self) -> Dict[str, Any]:
|
||
"""Alias for check_all - maintains backward compatibility"""
|
||
return self.check_all()
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""Convert results to dictionary"""
|
||
return self._generate_summary()
|
||
|
||
|
||
def main():
|
||
"""Main entry point"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Enterprise PDF Accessibility Checker",
|
||
epilog="Environment variables can be set in a .env file (see .env.example)"
|
||
)
|
||
parser.add_argument("pdf_file", help="PDF file to check")
|
||
parser.add_argument("--google-credentials", help="Path to Google Cloud credentials JSON (or set GOOGLE_APPLICATION_CREDENTIALS in .env)")
|
||
parser.add_argument("--google-key", help="Google API key string (or set GOOGLE_API_KEY in .env)")
|
||
parser.add_argument("--anthropic-key", help="Anthropic API key (or set ANTHROPIC_API_KEY in .env)")
|
||
parser.add_argument("--output", "-o", help="Output JSON file")
|
||
parser.add_argument("--quick", action="store_true", help="Quick mode - skip expensive checks (OCR, AI image analysis, color contrast)")
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Load from .env file as defaults, CLI args override
|
||
config = {
|
||
'google_credentials_path': args.google_credentials or os.getenv('GOOGLE_APPLICATION_CREDENTIALS'),
|
||
'google_api_key': args.google_key or os.getenv('GOOGLE_API_KEY'),
|
||
'anthropic_api_key': args.anthropic_key or os.getenv('ANTHROPIC_API_KEY')
|
||
}
|
||
|
||
# Show what we're using
|
||
if args.quick:
|
||
print("⚡ Quick mode enabled - skipping expensive checks\n")
|
||
|
||
checker = EnterprisePDFChecker(args.pdf_file, config, quick_mode=args.quick)
|
||
summary = checker.check_all()
|
||
|
||
# Generate page images if output specified
|
||
if args.output:
|
||
output_path = Path(args.output)
|
||
images_dir = output_path.parent / f"{output_path.stem}_images"
|
||
checker._generate_page_images(images_dir)
|
||
|
||
report = checker.generate_json_report()
|
||
|
||
if args.output:
|
||
with open(args.output, 'w') as f:
|
||
f.write(report)
|
||
print(f"\n📄 Report saved: {args.output}")
|
||
if checker.page_images:
|
||
print(f"📸 Page images saved to: {images_dir}")
|
||
else:
|
||
print("\n" + "="*60)
|
||
print("SUMMARY")
|
||
print("="*60)
|
||
print(f"Score: {summary['accessibility_score']}/100")
|
||
print(f"Critical: {summary['severity_counts']['critical']}")
|
||
print(f"Errors: {summary['severity_counts']['error']}")
|
||
print(f"Warnings: {summary['severity_counts']['warning']}")
|
||
print(f"API Calls: {summary['stats']['api_calls']}")
|
||
print(f"Cost: ${summary['stats']['total_cost_estimate']:.2f}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|