pdf-accessibility/enterprise_pdf_checker.py
DJP 2952731bd6 Fix coordinate scaling for visual markers
Issue: Marker boxes were misaligned with actual PDF content
Cause: Coordinate system mismatch between PDF (72 DPI) and rendered images (150 DPI)
Fix: Apply proper DPI scaling factor to coordinates

Changes:
- Calculate scale factor: DPI / 72 (e.g., 150/72 = 2.083)
- Scale all x/y coordinates before drawing
- Store page_image_dpi in JSON for frontend
- Add debug console logs to verify scaling

Formula:
- pixel_coordinate = pdf_coordinate × (image_dpi / 72)
- Example: 100 points @ 150 DPI = 208 pixels

Now markers should align perfectly with PDF content!

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-20 16:31:02 -04:00

1384 lines
51 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Enterprise PDF Accessibility Checker
Quality-first comprehensive WCAG 2.1 validation
Features:
- Google Cloud Vision API for OCR and image analysis
- Anthropic Claude for alt text validation and content analysis
- Complete color contrast checking
- Readability analysis
- Form field validation
- Heading structure analysis
- Link quality checking
- Comprehensive reporting
"""
import sys
import os
import json
import re
import base64
import hashlib
import time
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field, asdict
from enum import Enum
from datetime import datetime
from io import BytesIO
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
# Load environment variables from .env file (optional)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
# dotenv not installed, that's okay - will use environment variables
pass
# Core PDF libraries
try:
from pypdf import PdfReader, PdfWriter
import pdfplumber
from PIL import Image
import numpy as np
except ImportError:
print("Error: Core libraries not installed.")
print("Install: pip install pypdf pdfplumber pillow numpy --break-system-packages")
sys.exit(1)
# OCR and analysis
try:
import pytesseract
from pdf2image import convert_from_path
except ImportError:
print("Warning: OCR libraries not available. Install: pip install pytesseract pdf2image")
pytesseract = None
# Readability
try:
from textblob import TextBlob
except ImportError:
print("Warning: TextBlob not available. Install: pip install textblob")
TextBlob = None
# Google Cloud Vision
try:
from google.cloud import vision
from google.cloud import documentai_v1 as documentai
except ImportError:
print("Warning: Google Cloud libraries not available.")
print("Install: pip install google-cloud-vision google-cloud-documentai")
vision = None
# Anthropic Claude
try:
import anthropic
except ImportError:
print("Warning: Anthropic library not available.")
print("Install: pip install anthropic")
anthropic = None
class Severity(Enum):
"""Issue severity levels"""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
SUCCESS = "SUCCESS"
@dataclass
class AccessibilityIssue:
"""Represents an accessibility issue"""
severity: Severity
category: str
description: str
page_number: Optional[int] = None
recommendation: str = ""
wcag_criterion: str = ""
details: Dict[str, Any] = field(default_factory=dict)
coordinates: Optional[Dict[str, float]] = None # x0, y0, x1, y1 for highlighting
def to_dict(self):
"""Convert to dictionary for JSON serialization"""
return {
'severity': self.severity.value,
'category': self.category,
'description': self.description,
'page_number': self.page_number,
'recommendation': self.recommendation,
'wcag_criterion': self.wcag_criterion,
'details': self.details,
'coordinates': self.coordinates
}
@dataclass
class CheckResult:
"""Results from a specific check"""
check_name: str
passed: bool
issues: List[AccessibilityIssue] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
duration: float = 0.0
class CacheManager:
"""Manages caching of API results to reduce costs"""
def __init__(self, cache_dir: str = ".cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def get_cache_key(self, data: bytes, prefix: str = "") -> str:
"""Generate cache key from data"""
hash_obj = hashlib.sha256(data)
return f"{prefix}_{hash_obj.hexdigest()}"
def get(self, key: str) -> Optional[Dict]:
"""Retrieve cached result"""
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
try:
with open(cache_file, 'r') as f:
return json.load(f)
except:
return None
return None
def set(self, key: str, data: Dict):
"""Store result in cache"""
cache_file = self.cache_dir / f"{key}.json"
with open(cache_file, 'w') as f:
json.dump(data, f)
class ColorContrastChecker:
"""WCAG color contrast validation"""
WCAG_AA_NORMAL = 4.5
WCAG_AA_LARGE = 3.0
WCAG_AAA_NORMAL = 7.0
WCAG_AAA_LARGE = 4.5
@staticmethod
def get_luminance(rgb: Tuple[int, int, int]) -> float:
"""Calculate relative luminance per WCAG formula"""
r, g, b = [x / 255.0 for x in rgb]
r = r / 12.92 if r <= 0.03928 else ((r + 0.055) / 1.055) ** 2.4
g = g / 12.92 if g <= 0.03928 else ((g + 0.055) / 1.055) ** 2.4
b = b / 12.92 if b <= 0.03928 else ((b + 0.055) / 1.055) ** 2.4
return 0.2126 * r + 0.7152 * g + 0.0722 * b
@staticmethod
def calculate_contrast_ratio(color1: Tuple[int, int, int],
color2: Tuple[int, int, int]) -> float:
"""Calculate WCAG contrast ratio"""
l1 = ColorContrastChecker.get_luminance(color1)
l2 = ColorContrastChecker.get_luminance(color2)
lighter = max(l1, l2)
darker = min(l1, l2)
return (lighter + 0.05) / (darker + 0.05)
@staticmethod
def check_image_contrast(image: Image.Image, sample_size: int = 500) -> Dict:
"""Sample image for contrast issues"""
if image.mode != 'RGB':
image = image.convert('RGB')
width, height = image.size
samples = []
for _ in range(min(sample_size, width * height // 100)):
x = np.random.randint(0, max(1, width - 2))
y = np.random.randint(0, max(1, height - 1))
try:
color1 = image.getpixel((x, y))
color2 = image.getpixel((min(x + 1, width - 1), y))
ratio = ColorContrastChecker.calculate_contrast_ratio(color1, color2)
samples.append({
'ratio': ratio,
'colors': (color1, color2),
'position': (x, y)
})
except:
continue
if not samples:
return {'error': 'Could not sample colors'}
fail_aa_normal = [s for s in samples if s['ratio'] < ColorContrastChecker.WCAG_AA_NORMAL]
fail_aa_large = [s for s in samples if s['ratio'] < ColorContrastChecker.WCAG_AA_LARGE]
return {
'total_samples': len(samples),
'fail_aa_normal_count': len(fail_aa_normal),
'fail_aa_large_count': len(fail_aa_large),
'fail_aa_normal_percent': len(fail_aa_normal) / len(samples) * 100,
'fail_aa_large_percent': len(fail_aa_large) / len(samples) * 100,
'worst_ratio': min(s['ratio'] for s in samples),
'best_ratio': max(s['ratio'] for s in samples),
'avg_ratio': sum(s['ratio'] for s in samples) / len(samples)
}
class ReadabilityAnalyzer:
"""Content readability analysis"""
@staticmethod
def count_syllables(word: str) -> int:
"""Count syllables in a word"""
word = word.lower().strip()
vowels = 'aeiouy'
syllable_count = 0
previous_was_vowel = False
for char in word:
is_vowel = char in vowels
if is_vowel and not previous_was_vowel:
syllable_count += 1
previous_was_vowel = is_vowel
if word.endswith('e') and syllable_count > 1:
syllable_count -= 1
return max(1, syllable_count)
@staticmethod
def analyze(text: str) -> Dict:
"""Comprehensive readability analysis"""
if not text or len(text.strip()) < 50:
return {'error': 'Insufficient text for analysis'}
# Clean text
text = re.sub(r'\s+', ' ', text.strip())
# Basic metrics
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
words = re.findall(r'\b\w+\b', text)
if not sentences or not words:
return {'error': 'Could not parse text'}
total_sentences = len(sentences)
total_words = len(words)
total_syllables = sum(ReadabilityAnalyzer.count_syllables(w) for w in words)
# Flesch Reading Ease (0-100, higher = easier)
flesch_reading_ease = (
206.835
- 1.015 * (total_words / total_sentences)
- 84.6 * (total_syllables / total_words)
)
# Flesch-Kincaid Grade Level
fk_grade_level = (
0.39 * (total_words / total_sentences)
+ 11.8 * (total_syllables / total_words)
- 15.59
)
# Find issues
long_sentences = [s for s in sentences if len(s.split()) > 25]
complex_words = [w for w in words if ReadabilityAnalyzer.count_syllables(w) > 3]
return {
'flesch_reading_ease': round(flesch_reading_ease, 2),
'flesch_kincaid_grade': round(fk_grade_level, 2),
'total_words': total_words,
'total_sentences': total_sentences,
'avg_words_per_sentence': round(total_words / total_sentences, 2),
'long_sentences_count': len(long_sentences),
'complex_words_count': len(complex_words),
'complex_words_percent': round(len(complex_words) / total_words * 100, 2)
}
class EnterprisePDFChecker:
"""Enterprise-grade PDF accessibility checker"""
def __init__(self, pdf_path: str, config: Dict[str, Any], quick_mode: bool = False, generate_images: bool = True):
self.pdf_path = Path(pdf_path)
self.config = config
self.quick_mode = quick_mode
self.generate_images = generate_images
self.issues: List[AccessibilityIssue] = []
self.check_results: List[CheckResult] = []
self.pdf_reader = None
self.pdf_plumber = None
self.cache = CacheManager()
self.page_images: Dict[int, str] = {} # page_num -> image_path
# API clients
self.vision_client = None
self.anthropic_client = None
self.api_timeout = 10.0 # 10 second timeout for API calls
# Initialize API clients
google_creds_path = config.get('google_credentials_path')
if google_creds_path and os.path.isfile(google_creds_path):
# Valid credentials file exists
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = google_creds_path
if vision:
try:
self.vision_client = vision.ImageAnnotatorClient()
print(f" ✅ Google Cloud Vision initialized with credentials file")
except Exception as e:
print(f" ⚠️ Google Vision initialization failed: {str(e)}")
elif config.get('google_api_key'):
# Use API key directly
if vision:
# Note: Vision API with API key requires different initialization
# For now, store key for use in requests
self.google_api_key = config['google_api_key']
print(f" Using Google API key: {self.google_api_key[:20]}...")
elif google_creds_path:
# Path provided but file doesn't exist
print(f" ⚠️ Google credentials file not found: {google_creds_path}")
print(f" ⚠️ Skipping Google Cloud Vision (advanced OCR disabled)")
if config.get('anthropic_api_key') and anthropic:
try:
self.anthropic_client = anthropic.Anthropic(api_key=config['anthropic_api_key'])
print(f" ✅ Anthropic Claude initialized")
except Exception as e:
print(f" ⚠️ Anthropic initialization failed: {str(e)}")
# Stats
self.stats = {
'start_time': datetime.now(),
'total_checks': 0,
'api_calls': 0,
'cached_calls': 0,
'total_cost_estimate': 0.0
}
def add_issue(self, severity: Severity, category: str, description: str, **kwargs):
"""Add an accessibility issue"""
issue = AccessibilityIssue(
severity=severity,
category=category,
description=description,
**kwargs
)
self.issues.append(issue)
def run_check(self, check_func, check_name: str) -> CheckResult:
"""Run a check and record results"""
start_time = time.time()
result = CheckResult(check_name=check_name, passed=True)
try:
check_func()
# Check passed if no critical/error issues added during check
critical_errors = [i for i in self.issues
if i.severity in [Severity.CRITICAL, Severity.ERROR]]
result.passed = len(critical_errors) == 0
except Exception as e:
self.add_issue(
Severity.CRITICAL,
check_name,
f"Check failed with error: {str(e)}",
details={'error': str(e), 'traceback': traceback.format_exc()}
)
result.passed = False
result.duration = time.time() - start_time
self.check_results.append(result)
self.stats['total_checks'] += 1
return result
def check_all(self) -> Dict[str, Any]:
"""Run all accessibility checks"""
print(f"🔍 Enterprise PDF Accessibility Check")
print(f"📄 File: {self.pdf_path.name}")
print(f"{'='*60}\n")
try:
self.pdf_reader = PdfReader(str(self.pdf_path))
self.pdf_plumber = pdfplumber.open(str(self.pdf_path))
# Run all checks
checks = [
(self._check_basic_structure, "Document Structure"),
(self._check_metadata, "Metadata"),
(self._check_language, "Language Declaration"),
(self._check_text_extractability, "Text Extractability"),
(self._check_ocr_quality, "OCR Quality"),
(self._check_images_comprehensive, "Image Accessibility"),
(self._check_color_contrast, "Color Contrast"),
(self._check_readability, "Content Readability"),
(self._check_links, "Link Quality"),
(self._check_headings, "Heading Structure"),
(self._check_forms, "Form Accessibility"),
(self._check_tables, "Table Structure"),
(self._check_reading_order, "Reading Order"),
(self._check_fonts, "Font Accessibility"),
(self._check_security, "Security Settings"),
(self._check_bookmarks, "Navigation Aids"),
]
for check_func, check_name in checks:
print(f"⏳ Running: {check_name}...", end=' ')
result = self.run_check(check_func, check_name)
status = "" if result.passed else ""
print(f"{status} ({result.duration:.2f}s)")
except Exception as e:
self.add_issue(
Severity.CRITICAL,
"File Access",
f"Could not process PDF: {str(e)}",
details={'error': str(e)}
)
finally:
if self.pdf_plumber:
self.pdf_plumber.close()
self.stats['end_time'] = datetime.now()
self.stats['duration'] = (self.stats['end_time'] - self.stats['start_time']).total_seconds()
return self._generate_summary()
# ==================== CORE CHECKS ====================
def _check_basic_structure(self):
"""Check PDF structure and tagging"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/MarkInfo" not in catalog:
self.add_issue(
Severity.CRITICAL,
"Document Structure",
"PDF is not tagged - completely inaccessible to screen readers",
wcag_criterion="1.3.1, 4.1.2",
recommendation="Tag the PDF using Adobe Acrobat Pro or authoring software"
)
return
mark_info = catalog.get("/MarkInfo", {})
marked = mark_info.get("/Marked", False)
if not marked:
self.add_issue(
Severity.CRITICAL,
"Document Structure",
"PDF marked as untagged in metadata",
wcag_criterion="1.3.1",
recommendation="Enable document tagging"
)
else:
self.add_issue(
Severity.SUCCESS,
"Document Structure",
"PDF is properly tagged",
wcag_criterion="1.3.1"
)
def _check_metadata(self):
"""Check document metadata"""
meta = self.pdf_reader.metadata
if not meta:
self.add_issue(
Severity.ERROR,
"Metadata",
"No document metadata found",
wcag_criterion="2.4.2",
recommendation="Add title, author, and subject metadata"
)
return
# Check title
if not meta.title or not meta.title.strip():
self.add_issue(
Severity.ERROR,
"Metadata",
"Document title is missing",
wcag_criterion="2.4.2",
recommendation="Add a descriptive title"
)
else:
self.add_issue(
Severity.SUCCESS,
"Metadata",
f"Document has title: '{meta.title}'",
wcag_criterion="2.4.2"
)
# Check author
if not meta.author or not meta.author.strip():
self.add_issue(
Severity.WARNING,
"Metadata",
"Author information is missing",
recommendation="Add author metadata"
)
# Check subject
if not meta.subject or not meta.subject.strip():
self.add_issue(
Severity.INFO,
"Metadata",
"Subject/description is missing",
recommendation="Add a brief description"
)
def _check_language(self):
"""Check language declaration"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/Lang" not in catalog:
self.add_issue(
Severity.ERROR,
"Language",
"Document language not specified",
wcag_criterion="3.1.1",
recommendation="Set document language (e.g., 'en-US')"
)
else:
lang = catalog["/Lang"]
self.add_issue(
Severity.SUCCESS,
"Language",
f"Document language set to: {lang}",
wcag_criterion="3.1.1"
)
def _check_text_extractability(self):
"""Check if text can be extracted"""
total_pages = len(self.pdf_reader.pages)
pages_without_text = 0
page_details = []
for i, page in enumerate(self.pdf_plumber.pages):
text = page.extract_text()
char_count = len(text) if text else 0
if char_count < 10:
pages_without_text += 1
page_details.append(i + 1)
if pages_without_text == total_pages:
self.add_issue(
Severity.CRITICAL,
"Text Accessibility",
"No extractable text found - document appears to be scanned images",
wcag_criterion="1.1.1",
recommendation="Run OCR or recreate from source with selectable text",
details={'pages_affected': page_details}
)
elif pages_without_text > 0:
self.add_issue(
Severity.WARNING,
"Text Accessibility",
f"{pages_without_text} of {total_pages} pages have no extractable text",
wcag_criterion="1.1.1",
recommendation="Review pages without text",
details={'pages_affected': page_details}
)
def _check_ocr_quality(self):
"""Check OCR quality if document appears scanned"""
if not pytesseract:
return
if self.quick_mode:
print(" ⏩ Skipping OCR analysis (quick mode)")
return
print(" 🔍 Running OCR analysis...")
try:
# Reduced DPI from 300 to 150 for faster processing
images = convert_from_path(str(self.pdf_path), dpi=150, first_page=1, last_page=min(2, len(self.pdf_reader.pages)))
for i, image in enumerate(images):
# Get OCR data with confidence
ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
confidences = [int(c) for c in ocr_data['conf'] if c != '-1']
if confidences:
avg_confidence = sum(confidences) / len(confidences)
if avg_confidence < 60:
self.add_issue(
Severity.WARNING,
"OCR Quality",
f"Page {i+1}: Low OCR confidence ({avg_confidence:.1f}%)",
wcag_criterion="1.1.1",
recommendation="Poor scan quality - rescan or manual review needed",
page_number=i+1,
details={'confidence': avg_confidence}
)
except Exception as e:
print(f" ⚠️ OCR check skipped: {str(e)}")
def _check_images_comprehensive(self):
"""Comprehensive image accessibility check with AI"""
print(" 🖼️ Analyzing images with AI...")
total_images = 0
analyzed_images = 0
# Collect all images first
image_tasks = []
for page_num, page in enumerate(self.pdf_plumber.pages):
images = page.images
total_images += len(images)
for img_idx, img in enumerate(images):
try:
image_data = self._extract_image_from_page(page, img)
if image_data:
# Include coordinates for highlighting
coords = {
'x0': img['x0'],
'y0': img['top'],
'x1': img['x1'],
'y1': img['bottom']
}
image_tasks.append((image_data, page_num + 1, img_idx + 1, coords))
except Exception as e:
print(f" ⚠️ Failed to extract image on page {page_num + 1}: {str(e)}")
if total_images == 0:
self.add_issue(
Severity.INFO,
"Images",
"No images found in document",
wcag_criterion="1.1.1"
)
return
print(f" 📊 Found {total_images} images to analyze...")
# Skip AI analysis in quick mode
if self.quick_mode:
print(" ⏩ Skipping AI image analysis (quick mode)")
self.add_issue(
Severity.INFO,
"Images",
f"Found {total_images} images - run without --quick for AI analysis",
wcag_criterion="1.1.1"
)
return
# Process images in parallel with progress updates
def analyze_single_image(task_data):
image_data, page_num, img_num, coords = task_data
result = {'page': page_num, 'img': img_num, 'analyzed': False, 'coords': coords}
try:
# Check cache first
cache_key = self.cache.get_cache_key(image_data, "claude_vision")
cached_result = self.cache.get(cache_key)
if cached_result:
analysis = cached_result
result['cached'] = True
else:
# Analyze with Claude
analysis = self._analyze_image_with_claude(image_data)
if analysis and 'error' not in analysis:
self.cache.set(cache_key, analysis)
result['cached'] = False
if analysis and 'error' not in analysis:
result['analysis'] = analysis
result['analyzed'] = True
# Also check with Google Vision for additional data
if self.vision_client:
vision_analysis = self._analyze_image_with_google(image_data)
if vision_analysis:
result['vision_analysis'] = vision_analysis
except Exception as e:
result['error'] = str(e)
return result
# Use ThreadPoolExecutor for parallel processing
max_workers = 3 if not self.quick_mode else 1
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(analyze_single_image, task): task for task in image_tasks}
for future in as_completed(futures):
try:
result = future.result()
analyzed_images += 1
cache_status = " (cached)" if result.get('cached') else ""
print(f" 📷 Analyzed image {analyzed_images}/{total_images} (Page {result['page']}){cache_status}")
if result.get('analyzed'):
self._process_image_analysis(result['analysis'], result['page'], result['img'], result.get('coords'))
if result.get('cached'):
self.stats['cached_calls'] += 1
else:
self.stats['api_calls'] += 1
self.stats['total_cost_estimate'] += 0.015
if result.get('vision_analysis'):
self._process_google_vision_results(result['vision_analysis'], result['page'], result['img'], result.get('coords'))
if result.get('error'):
print(f" ⚠️ Error analyzing image on page {result['page']}: {result['error']}")
except Exception as e:
print(f" ⚠️ Image analysis error: {str(e)}")
print(f" ✅ Completed analysis of {analyzed_images}/{total_images} images")
def _analyze_image_with_claude(self, image_bytes: bytes) -> Optional[Dict]:
"""Analyze image with Claude Vision"""
if not self.anthropic_client:
return None
try:
base64_image = base64.b64encode(image_bytes).decode('utf-8')
message = self.anthropic_client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
timeout=self.api_timeout,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image,
},
},
{
"type": "text",
"text": """Analyze this image for PDF accessibility (WCAG 2.1):
1. Provide concise alt text (1-2 sentences, max 125 characters)
2. Is this decorative or informational?
3. Does it contain text? If yes, what text?
4. Does it use color as the only means of conveying information?
5. Are there any accessibility concerns?
6. Quality rating (1-10) if this were to be used in a PDF
Respond in JSON format:
{
"alt_text": "...",
"type": "decorative|informational|complex",
"has_text": true|false,
"text_content": "...",
"color_only_info": true|false,
"concerns": ["..."],
"quality_rating": 1-10,
"recommendation": "..."
}"""
}
],
}
],
)
response_text = message.content[0].text
# Try to parse JSON from response
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {'error': 'Could not parse response'}
except Exception as e:
return {'error': str(e)}
def _analyze_image_with_google(self, image_bytes: bytes) -> Optional[Dict]:
"""Analyze image with Google Vision"""
if not self.vision_client:
return None
try:
image = vision.Image(content=image_bytes)
# Multiple detection types with timeout
response = self.vision_client.annotate_image(
{
'image': image,
'features': [
{'type_': vision.Feature.Type.TEXT_DETECTION},
{'type_': vision.Feature.Type.LABEL_DETECTION},
{'type_': vision.Feature.Type.IMAGE_PROPERTIES},
{'type_': vision.Feature.Type.OBJECT_LOCALIZATION},
],
},
timeout=self.api_timeout
)
self.stats['api_calls'] += 1
self.stats['total_cost_estimate'] += 0.0015
return {
'has_text': bool(response.text_annotations),
'text_content': response.text_annotations[0].description if response.text_annotations else None,
'labels': [label.description for label in response.label_annotations[:5]],
'objects': [obj.name for obj in response.localized_object_annotations]
}
except Exception as e:
return {'error': str(e)}
def _process_image_analysis(self, analysis: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None):
"""Process Claude's image analysis results"""
# Check if text in image
if analysis.get('has_text'):
self.add_issue(
Severity.ERROR,
"Images - Text in Image",
f"Page {page_num}, Image {img_num}: Contains text: '{analysis.get('text_content', '')[:50]}'",
wcag_criterion="1.4.5",
recommendation="Replace image with actual text or provide text alternative",
page_number=page_num,
details=analysis,
coordinates=coordinates
)
# Check alt text quality
if analysis.get('type') == 'informational':
alt_text = analysis.get('alt_text', '')
if len(alt_text) > 125:
self.add_issue(
Severity.WARNING,
"Images - Alt Text",
f"Page {page_num}, Image {img_num}: Suggested alt text is too long ({len(alt_text)} chars)",
wcag_criterion="1.1.1",
recommendation=f"Shorten alt text. Suggested: '{alt_text[:100]}...'",
page_number=page_num,
coordinates=coordinates
)
else:
self.add_issue(
Severity.INFO,
"Images - Alt Text",
f"Page {page_num}, Image {img_num}: Suggested alt text: '{alt_text}'",
wcag_criterion="1.1.1",
page_number=page_num,
coordinates=coordinates
)
# Check for color-only information
if analysis.get('color_only_info'):
self.add_issue(
Severity.ERROR,
"Images - Color Only",
f"Page {page_num}, Image {img_num}: Uses color as only means of conveying information",
wcag_criterion="1.4.1",
recommendation="Add patterns, labels, or text descriptions",
page_number=page_num,
coordinates=coordinates
)
# Check concerns
concerns = analysis.get('concerns', [])
if concerns:
for concern in concerns:
self.add_issue(
Severity.WARNING,
"Images - Quality",
f"Page {page_num}, Image {img_num}: {concern}",
wcag_criterion="1.1.1",
page_number=page_num,
coordinates=coordinates
)
def _process_google_vision_results(self, results: Dict, page_num: int, img_num: int, coordinates: Optional[Dict] = None):
"""Process Google Vision results"""
if results.get('has_text') and not results.get('error'):
# Cross-reference with Claude's analysis
self.add_issue(
Severity.INFO,
"Images - Analysis",
f"Page {page_num}, Image {img_num}: Google Vision detected: {', '.join(results.get('labels', [])[:3])}",
page_number=page_num,
details=results,
coordinates=coordinates
)
def _check_color_contrast(self):
"""Check color contrast using image analysis"""
print(" 🎨 Checking color contrast...")
if self.quick_mode:
print(" ⏩ Skipping detailed contrast analysis (quick mode)")
return
try:
# Reduced DPI from 150 to 100 for faster processing
images = convert_from_path(str(self.pdf_path), dpi=100, first_page=1, last_page=min(3, len(self.pdf_reader.pages)))
for i, image in enumerate(images):
contrast_results = ColorContrastChecker.check_image_contrast(image)
if 'error' in contrast_results:
continue
# Check for significant issues
if contrast_results['fail_aa_normal_percent'] > 15:
self.add_issue(
Severity.ERROR,
"Color Contrast",
f"Page {i+1}: {contrast_results['fail_aa_normal_percent']:.1f}% of samples fail WCAG AA (4.5:1)",
wcag_criterion="1.4.3",
recommendation="Review and increase color contrast to meet WCAG AA standards",
page_number=i+1,
details=contrast_results
)
elif contrast_results['fail_aa_normal_percent'] > 5:
self.add_issue(
Severity.WARNING,
"Color Contrast",
f"Page {i+1}: {contrast_results['fail_aa_normal_percent']:.1f}% of samples have low contrast",
wcag_criterion="1.4.3",
recommendation="Use Colour Contrast Analyser to verify specific areas",
page_number=i+1,
details=contrast_results
)
except Exception as e:
print(f" ⚠️ Contrast check skipped: {str(e)}")
def _check_readability(self):
"""Check content readability"""
# Extract all text
all_text = ""
for page in self.pdf_plumber.pages:
text = page.extract_text()
if text:
all_text += text + "\n"
if len(all_text) < 100:
return
analysis = ReadabilityAnalyzer.analyze(all_text)
if 'error' in analysis:
return
# Check Flesch Reading Ease
if analysis['flesch_reading_ease'] < 60:
severity = Severity.ERROR if analysis['flesch_reading_ease'] < 30 else Severity.WARNING
self.add_issue(
severity,
"Readability",
f"Content is difficult to read (Flesch score: {analysis['flesch_reading_ease']}/100)",
wcag_criterion="3.1.5",
recommendation="Simplify language to reach 8th-9th grade level (target score: 60+)",
details=analysis
)
# Check grade level
if analysis['flesch_kincaid_grade'] > 10:
self.add_issue(
Severity.WARNING,
"Readability",
f"Content requires grade {analysis['flesch_kincaid_grade']} reading level",
wcag_criterion="3.1.5",
recommendation="Target grade 8-10 for general audiences",
details=analysis
)
# Check long sentences
if analysis['long_sentences_count'] > 5:
self.add_issue(
Severity.INFO,
"Readability",
f"{analysis['long_sentences_count']} sentences exceed 25 words",
wcag_criterion="3.1.5",
recommendation="Break long sentences for better comprehension"
)
def _check_links(self):
"""Check link quality"""
unclear_patterns = [
r'\bclick here\b',
r'\bhere\b',
r'\blink\b',
r'\bread more\b',
r'\bmore\b',
r'\bthis\b',
]
for i, page in enumerate(self.pdf_plumber.pages):
text = page.extract_text()
if not text:
continue
# Find URLs
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text)
# Check for unclear link text
for pattern in unclear_patterns:
if re.search(pattern, text, re.IGNORECASE):
self.add_issue(
Severity.WARNING,
"Link Text",
f"Page {i+1}: Potentially unclear link text detected",
wcag_criterion="2.4.4",
recommendation="Use descriptive link text that makes sense out of context",
page_number=i+1
)
break
def _check_headings(self):
"""Check heading structure"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/StructTreeRoot" not in catalog:
self.add_issue(
Severity.ERROR,
"Headings",
"No structure tree - cannot verify heading hierarchy",
wcag_criterion="1.3.1",
recommendation="Tag document with proper heading structure"
)
return
# Try to parse heading structure
# This is complex and PDF-specific
self.add_issue(
Severity.INFO,
"Headings",
"Structure tree present - manual verification of heading hierarchy recommended",
wcag_criterion="1.3.1",
recommendation="Use Adobe Acrobat to verify H1-H6 hierarchy"
)
def _check_forms(self):
"""Check form field accessibility"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/AcroForm" not in catalog:
return
acro_form = catalog["/AcroForm"]
if "/Fields" not in acro_form:
return
fields = acro_form["/Fields"]
field_issues = []
for field in fields:
field = field.get_object()
field_name = field.get("/T", "Unnamed")
has_tooltip = "/TU" in field
if not has_tooltip:
field_issues.append(field_name)
if field_issues:
self.add_issue(
Severity.ERROR,
"Forms",
f"{len(field_issues)} form field(s) missing descriptions/tooltips",
wcag_criterion="3.3.2, 4.1.2",
recommendation="Add tooltip descriptions to all form fields",
details={'fields': field_issues}
)
else:
self.add_issue(
Severity.SUCCESS,
"Forms",
f"All {len(fields)} form fields have descriptions",
wcag_criterion="3.3.2"
)
def _check_tables(self):
"""Check table accessibility"""
# Basic table detection
has_tables = False
for i, page in enumerate(self.pdf_plumber.pages):
tables = page.extract_tables()
if tables:
has_tables = True
self.add_issue(
Severity.WARNING,
"Tables",
f"Page {i+1}: Contains {len(tables)} table(s) - verify structure",
wcag_criterion="1.3.1",
recommendation="Ensure tables have proper headers and structure tags",
page_number=i+1
)
if not has_tables:
self.add_issue(
Severity.INFO,
"Tables",
"No tables detected",
wcag_criterion="1.3.1"
)
def _check_reading_order(self):
"""Check reading order"""
catalog = self.pdf_reader.trailer.get("/Root", {})
if "/StructTreeRoot" not in catalog:
self.add_issue(
Severity.ERROR,
"Reading Order",
"No structure tree - reading order cannot be determined",
wcag_criterion="1.3.2",
recommendation="Tag document to establish proper reading order"
)
else:
self.add_issue(
Severity.INFO,
"Reading Order",
"Structure tree present - verify reading order with screen reader",
wcag_criterion="1.3.2",
recommendation="Test with NVDA or JAWS to verify logical reading order"
)
def _check_fonts(self):
"""Check font embedding"""
embedded_count = 0
non_embedded_count = 0
for page in self.pdf_reader.pages:
if "/Font" in page.get("/Resources", {}):
fonts = page["/Resources"]["/Font"]
for font_name, font_obj in fonts.items():
font_obj = font_obj.get_object()
if "/FontFile" in font_obj or "/FontFile2" in font_obj or "/FontFile3" in font_obj:
embedded_count += 1
else:
non_embedded_count += 1
if non_embedded_count > 0:
self.add_issue(
Severity.WARNING,
"Fonts",
f"{non_embedded_count} fonts not embedded",
wcag_criterion="1.4.4",
recommendation="Embed all fonts for consistent rendering"
)
def _check_security(self):
"""Check security settings"""
if self.pdf_reader.is_encrypted:
self.add_issue(
Severity.WARNING,
"Security",
"Document is encrypted",
recommendation="Ensure assistive technology can access content"
)
def _check_bookmarks(self):
"""Check navigation bookmarks"""
outlines = self.pdf_reader.outline
total_pages = len(self.pdf_reader.pages)
if not outlines and total_pages > 5:
self.add_issue(
Severity.INFO,
"Navigation",
"No bookmarks found",
wcag_criterion="2.4.5",
recommendation=f"Add bookmarks for {total_pages}-page document to aid navigation"
)
elif outlines:
self.add_issue(
Severity.SUCCESS,
"Navigation",
"Document has navigation bookmarks",
wcag_criterion="2.4.5"
)
# ==================== HELPER METHODS ====================
def _extract_image_from_page(self, page, img_info) -> Optional[bytes]:
"""Extract image bytes from PDF page"""
try:
# Get image coordinates
x0, y0, x1, y1 = img_info['x0'], img_info['top'], img_info['x1'], img_info['bottom']
# Crop page to image area
cropped = page.crop((x0, y0, x1, y1))
# Convert to PIL Image
pil_image = cropped.to_image(resolution=150).original
# Convert to bytes
buffer = BytesIO()
pil_image.save(buffer, format='JPEG', quality=85)
return buffer.getvalue()
except Exception as e:
return None
def _generate_page_images(self, output_dir: Path, dpi: int = 150):
"""Generate PNG images for each page for visual display"""
if not self.generate_images:
return
print(f"\n📸 Generating page images for visual display...")
try:
from pdf2image import convert_from_path
except ImportError:
print(f" ⚠️ pdf2image not available - skipping page image generation")
return
try:
output_dir.mkdir(parents=True, exist_ok=True)
# Convert pages to images
# Store DPI for coordinate scaling
self.page_image_dpi = dpi
images = convert_from_path(
str(self.pdf_path),
dpi=dpi,
fmt='png'
)
for page_num, image in enumerate(images, start=1):
# Save as PNG
image_filename = f"page_{page_num}.png"
image_path = output_dir / image_filename
image.save(image_path, 'PNG')
self.page_images[page_num] = image_filename
print(f" ✅ Page {page_num}/{len(images)}")
print(f" ✅ Generated {len(images)} page images at {dpi} DPI")
except Exception as e:
print(f" ⚠️ Could not generate page images: {str(e)}")
# ==================== REPORTING ====================
def _generate_summary(self) -> Dict[str, Any]:
"""Generate comprehensive summary"""
severity_counts = {
'critical': len([i for i in self.issues if i.severity == Severity.CRITICAL]),
'error': len([i for i in self.issues if i.severity == Severity.ERROR]),
'warning': len([i for i in self.issues if i.severity == Severity.WARNING]),
'info': len([i for i in self.issues if i.severity == Severity.INFO]),
'success': len([i for i in self.issues if i.severity == Severity.SUCCESS])
}
# Calculate score
score = 100
score -= severity_counts['critical'] * 25
score -= severity_counts['error'] * 10
score -= severity_counts['warning'] * 5
score -= severity_counts['info'] * 2
score = max(0, min(100, score))
# Convert datetime objects to strings for JSON serialization
stats_serializable = {}
for key, value in self.stats.items():
if isinstance(value, datetime):
stats_serializable[key] = value.isoformat()
else:
stats_serializable[key] = value
return {
'filename': self.pdf_path.name,
'total_pages': len(self.pdf_reader.pages),
'accessibility_score': score,
'severity_counts': severity_counts,
'total_issues': len(self.issues),
'stats': stats_serializable,
'page_images': self.page_images, # Map of page_num -> image_filename
'page_image_dpi': getattr(self, 'page_image_dpi', 150), # DPI for coordinate scaling
'checks_performed': [
{
'name': cr.check_name,
'passed': cr.passed,
'duration': cr.duration
}
for cr in self.check_results
],
'issues': [issue.to_dict() for issue in self.issues]
}
def generate_json_report(self) -> str:
"""Generate JSON report"""
summary = self._generate_summary()
return json.dumps(summary, indent=2)
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(
description="Enterprise PDF Accessibility Checker",
epilog="Environment variables can be set in a .env file (see .env.example)"
)
parser.add_argument("pdf_file", help="PDF file to check")
parser.add_argument("--google-credentials", help="Path to Google Cloud credentials JSON (or set GOOGLE_APPLICATION_CREDENTIALS in .env)")
parser.add_argument("--google-key", help="Google API key string (or set GOOGLE_API_KEY in .env)")
parser.add_argument("--anthropic-key", help="Anthropic API key (or set ANTHROPIC_API_KEY in .env)")
parser.add_argument("--output", "-o", help="Output JSON file")
parser.add_argument("--quick", action="store_true", help="Quick mode - skip expensive checks (OCR, AI image analysis, color contrast)")
args = parser.parse_args()
# Load from .env file as defaults, CLI args override
config = {
'google_credentials_path': args.google_credentials or os.getenv('GOOGLE_APPLICATION_CREDENTIALS'),
'google_api_key': args.google_key or os.getenv('GOOGLE_API_KEY'),
'anthropic_api_key': args.anthropic_key or os.getenv('ANTHROPIC_API_KEY')
}
# Show what we're using
if args.quick:
print("⚡ Quick mode enabled - skipping expensive checks\n")
checker = EnterprisePDFChecker(args.pdf_file, config, quick_mode=args.quick)
summary = checker.check_all()
# Generate page images if output specified
if args.output:
output_path = Path(args.output)
images_dir = output_path.parent / f"{output_path.stem}_images"
checker._generate_page_images(images_dir)
report = checker.generate_json_report()
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"\n📄 Report saved: {args.output}")
if checker.page_images:
print(f"📸 Page images saved to: {images_dir}")
else:
print("\n" + "="*60)
print("SUMMARY")
print("="*60)
print(f"Score: {summary['accessibility_score']}/100")
print(f"Critical: {summary['severity_counts']['critical']}")
print(f"Errors: {summary['severity_counts']['error']}")
print(f"Warnings: {summary['severity_counts']['warning']}")
print(f"API Calls: {summary['stats']['api_calls']}")
print(f"Cost: ${summary['stats']['total_cost_estimate']:.2f}")
if __name__ == "__main__":
main()