487 lines
18 KiB
Python
Executable file
487 lines
18 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
PDF Accessibility Auto-Remediation Module
|
|
|
|
Automatically fixes common accessibility issues:
|
|
- Add metadata (title, author, subject)
|
|
- Set document language
|
|
- Mark as tagged
|
|
- Generate basic bookmarks
|
|
- Embed fonts (when possible)
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional
|
|
from pypdf import PdfReader, PdfWriter
|
|
from pypdf.generic import NameObject, TextStringObject, DictionaryObject, BooleanObject
|
|
|
|
# Setup logging
|
|
from logger_config import setup_logger
|
|
logger = setup_logger(__name__, "pdf_remediation.log")
|
|
|
|
|
|
class VeraPDFValidator:
|
|
"""Wrapper for veraPDF validation"""
|
|
|
|
def __init__(self, verapdf_path: str = "verapdf"):
|
|
self.verapdf_path = verapdf_path
|
|
|
|
def validate(self, pdf_path: str, timeout: int = 30) -> Dict[str, Any]:
|
|
"""Run veraPDF validation and return structured results"""
|
|
|
|
try:
|
|
result = subprocess.run([
|
|
self.verapdf_path,
|
|
'-f', 'ua1', # PDF/UA-1 standard
|
|
'--format', 'json',
|
|
pdf_path
|
|
], capture_output=True, text=True, timeout=timeout)
|
|
|
|
if result.returncode != 0:
|
|
return {'error': f'veraPDF failed: {result.stderr}'}
|
|
|
|
data = json.loads(result.stdout)
|
|
|
|
# Parse the complex JSON structure
|
|
jobs = data.get('report', {}).get('jobs', [])
|
|
if not jobs:
|
|
return {'error': 'No validation results'}
|
|
|
|
job = jobs[0]
|
|
validation = job.get('validationResult', [{}])[0]
|
|
details = validation.get('details', {})
|
|
|
|
# Extract rule summaries
|
|
errors = []
|
|
warnings = []
|
|
|
|
for rule in details.get('ruleSummaries', []):
|
|
if rule.get('ruleStatus') == 'FAILED':
|
|
error = {
|
|
'clause': rule.get('clause'),
|
|
'description': rule.get('description'),
|
|
'test_number': rule.get('testNumber'),
|
|
'failed_checks': rule.get('failedChecks', 0),
|
|
'specification': rule.get('specification'),
|
|
'checks': rule.get('checks', [])
|
|
}
|
|
errors.append(error)
|
|
|
|
return {
|
|
'compliant': details.get('passedRules', 0) > 0 and details.get('failedRules', 0) == 0,
|
|
'passed_rules': details.get('passedRules', 0),
|
|
'failed_rules': details.get('failedRules', 0),
|
|
'passed_checks': details.get('passedChecks', 0),
|
|
'failed_checks': details.get('failedChecks', 0),
|
|
'errors': errors,
|
|
'raw_data': data
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {'error': 'veraPDF timeout'}
|
|
except Exception as e:
|
|
return {'error': f'veraPDF validation failed: {str(e)}'}
|
|
|
|
|
|
class PDFRemediator:
|
|
"""Automatically fix common PDF accessibility issues"""
|
|
|
|
def __init__(self, pdf_path: str):
|
|
self.pdf_path = Path(pdf_path)
|
|
self.reader = PdfReader(str(pdf_path))
|
|
self.writer = PdfWriter()
|
|
self.fixes_applied = []
|
|
|
|
def analyze_and_suggest_fixes(self) -> Dict[str, Any]:
|
|
"""Analyze PDF and return suggested fixes"""
|
|
|
|
suggestions = {
|
|
'metadata': self._check_metadata_fixes(),
|
|
'language': self._check_language_fixes(),
|
|
'tagging': self._check_tagging_fixes(),
|
|
'bookmarks': self._check_bookmark_fixes()
|
|
}
|
|
|
|
return suggestions
|
|
|
|
def apply_fixes(self, fixes_to_apply: List[str], output_path: str = None, custom_values: Dict[str, str] = None) -> Dict[str, Any]:
|
|
"""Apply selected fixes and save to new PDF"""
|
|
|
|
if not output_path:
|
|
output_path = str(self.pdf_path.parent / f"{self.pdf_path.stem}_remediated.pdf")
|
|
|
|
if custom_values is None:
|
|
custom_values = {}
|
|
|
|
# Clone the PDF
|
|
for page in self.reader.pages:
|
|
self.writer.add_page(page)
|
|
|
|
# Copy existing metadata first
|
|
if self.reader.metadata:
|
|
self.writer.add_metadata(self.reader.metadata)
|
|
|
|
# Apply each fix
|
|
for fix in fixes_to_apply:
|
|
if fix == 'add_title':
|
|
self._fix_add_title(custom_values.get('title'))
|
|
elif fix == 'add_author':
|
|
self._fix_add_author(custom_values.get('author'))
|
|
elif fix == 'add_subject':
|
|
self._fix_add_subject(custom_values.get('subject'))
|
|
elif fix == 'set_language':
|
|
self._fix_set_language(custom_values.get('language', 'en-US'))
|
|
elif fix == 'mark_tagged':
|
|
self._fix_mark_tagged()
|
|
elif fix == 'add_bookmarks':
|
|
self._fix_add_bookmarks()
|
|
|
|
# Save fixed PDF
|
|
with open(output_path, 'wb') as f:
|
|
self.writer.write(f)
|
|
|
|
return {
|
|
'output_path': output_path,
|
|
'fixes_applied': self.fixes_applied,
|
|
'success': True
|
|
}
|
|
|
|
# ==================== ANALYSIS METHODS ====================
|
|
|
|
def _check_metadata_fixes(self) -> Dict:
|
|
"""Check what metadata fixes are needed"""
|
|
meta = self.reader.metadata
|
|
fixes = []
|
|
|
|
if not meta or not meta.title or not meta.title.strip():
|
|
fixes.append({
|
|
'id': 'add_title',
|
|
'description': 'Add document title',
|
|
'severity': 'ERROR',
|
|
'auto_fixable': True,
|
|
'suggestion': self._suggest_title()
|
|
})
|
|
|
|
if not meta or not meta.author or not meta.author.strip():
|
|
fixes.append({
|
|
'id': 'add_author',
|
|
'description': 'Add author information',
|
|
'severity': 'WARNING',
|
|
'auto_fixable': True,
|
|
'suggestion': 'Unknown Author'
|
|
})
|
|
|
|
if not meta or not meta.subject or not meta.subject.strip():
|
|
fixes.append({
|
|
'id': 'add_subject',
|
|
'description': 'Add document subject/description',
|
|
'severity': 'INFO',
|
|
'auto_fixable': True,
|
|
'suggestion': self._suggest_subject()
|
|
})
|
|
|
|
return fixes
|
|
|
|
def _check_language_fixes(self) -> Dict:
|
|
"""Check if language needs to be set"""
|
|
catalog = self.reader.trailer.get("/Root", {})
|
|
|
|
if "/Lang" not in catalog:
|
|
return [{
|
|
'id': 'set_language',
|
|
'description': 'Set document language',
|
|
'severity': 'ERROR',
|
|
'auto_fixable': True,
|
|
'suggestion': 'en-US'
|
|
}]
|
|
|
|
return []
|
|
|
|
def _check_tagging_fixes(self) -> Dict:
|
|
"""Check if PDF needs to be marked as tagged"""
|
|
catalog = self.reader.trailer.get("/Root", {})
|
|
|
|
if "/MarkInfo" not in catalog:
|
|
return [{
|
|
'id': 'mark_tagged',
|
|
'description': 'Mark document as tagged (if tags exist)',
|
|
'severity': 'CRITICAL',
|
|
'auto_fixable': False, # Can set flag, but can't create tags
|
|
'suggestion': 'Can mark as tagged, but tags must be added manually with Adobe Acrobat'
|
|
}]
|
|
|
|
mark_info = catalog.get("/MarkInfo", {})
|
|
if not mark_info.get("/Marked", False):
|
|
return [{
|
|
'id': 'mark_tagged',
|
|
'description': 'Update MarkInfo to indicate document is tagged',
|
|
'severity': 'ERROR',
|
|
'auto_fixable': True,
|
|
'suggestion': 'Set /Marked to true (only if structure tags exist)'
|
|
}]
|
|
|
|
return []
|
|
|
|
def _check_bookmark_fixes(self) -> Dict:
|
|
"""Check if bookmarks should be added"""
|
|
outlines = self.reader.outline
|
|
total_pages = len(self.reader.pages)
|
|
|
|
if not outlines and total_pages > 5:
|
|
return [{
|
|
'id': 'add_bookmarks',
|
|
'description': f'Add navigation bookmarks for {total_pages}-page document',
|
|
'severity': 'INFO',
|
|
'auto_fixable': True,
|
|
'suggestion': f'Generate {min(10, total_pages)} automatic bookmarks'
|
|
}]
|
|
|
|
return []
|
|
|
|
# ==================== SUGGESTION METHODS ====================
|
|
|
|
def _suggest_title(self) -> str:
|
|
"""Generate a suggested title from content or filename."""
|
|
import re
|
|
stem = self.pdf_path.stem
|
|
# Temp filenames (e.g. tmp9h15ocsl) are useless as titles — try content first
|
|
if re.match(r'^tmp[a-zA-Z0-9]{5,}$', stem):
|
|
try:
|
|
for page in self.reader.pages[:2]:
|
|
text = page.extract_text()
|
|
if text:
|
|
lines = [l.strip() for l in text.split('\n') if len(l.strip()) > 3]
|
|
if lines:
|
|
return lines[0][:100]
|
|
except Exception:
|
|
pass
|
|
return "Untitled Document"
|
|
return stem.replace('_', ' ').replace('-', ' ').title()
|
|
|
|
def _suggest_subject(self) -> str:
|
|
"""Generate a suggested subject from first paragraph"""
|
|
try:
|
|
first_page = self.reader.pages[0]
|
|
text = first_page.extract_text()
|
|
if text:
|
|
# Get first sentence
|
|
sentences = text.split('.')
|
|
if sentences:
|
|
return sentences[0][:100].strip()
|
|
except (IndexError, AttributeError, Exception):
|
|
pass
|
|
|
|
return "PDF Document"
|
|
|
|
# ==================== FIX METHODS ====================
|
|
|
|
def _fix_add_title(self, title: str = None):
|
|
"""Add document title"""
|
|
if not title:
|
|
title = self._suggest_title()
|
|
|
|
self.writer.add_metadata({
|
|
'/Title': title
|
|
})
|
|
self.fixes_applied.append(f"Added title: '{title}'")
|
|
|
|
def _fix_add_author(self, author: str = None):
|
|
"""Add author information"""
|
|
if not author:
|
|
author = "Unknown Author"
|
|
|
|
self.writer.add_metadata({
|
|
'/Author': author
|
|
})
|
|
self.fixes_applied.append(f"Added author: '{author}'")
|
|
|
|
def _fix_add_subject(self, subject: str = None):
|
|
"""Add document subject"""
|
|
if not subject:
|
|
subject = self._suggest_subject()
|
|
|
|
self.writer.add_metadata({
|
|
'/Subject': subject
|
|
})
|
|
self.fixes_applied.append(f"Added subject: '{subject}'")
|
|
|
|
def _fix_set_language(self, language: str = "en-US"):
|
|
"""Set document language"""
|
|
# Add language to catalog
|
|
catalog = self.writer._root_object
|
|
catalog[NameObject("/Lang")] = TextStringObject(language)
|
|
self.fixes_applied.append(f"Set language to: {language}")
|
|
|
|
def _fix_mark_tagged(self):
|
|
"""Mark document as tagged (WARNING: only if tags actually exist!)"""
|
|
catalog = self.writer._root_object
|
|
|
|
# Create or update MarkInfo
|
|
mark_info = DictionaryObject()
|
|
mark_info[NameObject("/Marked")] = BooleanObject(True)
|
|
|
|
catalog[NameObject("/MarkInfo")] = mark_info
|
|
self.fixes_applied.append("Marked document as tagged (verify tags exist!)")
|
|
|
|
def _fix_add_bookmarks(self):
|
|
"""Add basic bookmarks based on page numbers"""
|
|
# Add bookmark every N pages
|
|
total_pages = len(self.reader.pages)
|
|
bookmark_interval = max(1, total_pages // 10) # Max 10 bookmarks
|
|
|
|
for i in range(0, total_pages, bookmark_interval):
|
|
self.writer.add_outline_item(
|
|
title=f"Page {i + 1}",
|
|
page_number=i
|
|
)
|
|
|
|
self.fixes_applied.append(f"Added {len(range(0, total_pages, bookmark_interval))} bookmarks")
|
|
|
|
|
|
def main():
|
|
"""CLI interface for remediation"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="PDF Accessibility Auto-Remediation")
|
|
parser.add_argument("pdf_file", help="PDF file to remediate")
|
|
parser.add_argument("--output", "-o", help="Output PDF file")
|
|
parser.add_argument("--title", help="Document title to add")
|
|
parser.add_argument("--author", help="Author to add")
|
|
parser.add_argument("--subject", help="Subject/description to add")
|
|
parser.add_argument("--language", default="en-US", help="Document language (default: en-US)")
|
|
parser.add_argument("--add-bookmarks", action="store_true", help="Add automatic bookmarks")
|
|
parser.add_argument("--mark-tagged", action="store_true", help="Mark as tagged (WARNING: only if tags exist!)")
|
|
parser.add_argument("--all", action="store_true", help="Apply all safe fixes")
|
|
|
|
args = parser.parse_args()
|
|
|
|
sys.stderr.write(f"PDF Accessibility Remediation\n")
|
|
sys.stderr.write(f"File: {args.pdf_file}\n")
|
|
sys.stderr.write(f"{'='*60}\n\n")
|
|
|
|
# Analyze
|
|
remediator = PDFRemediator(args.pdf_file)
|
|
suggestions = remediator.analyze_and_suggest_fixes()
|
|
|
|
sys.stderr.write("Analysis Complete\n")
|
|
sys.stderr.write(f"{'='*60}\n")
|
|
|
|
all_suggestions = []
|
|
for category, fixes in suggestions.items():
|
|
if fixes:
|
|
sys.stderr.write(f"\n{category.upper()} Fixes Available:\n")
|
|
for fix in fixes:
|
|
fixable_marker = "[auto]" if fix['auto_fixable'] else "[manual]"
|
|
sys.stderr.write(f" {fixable_marker} {fix['description']}\n")
|
|
sys.stderr.write(f" Severity: {fix['severity']}\n")
|
|
sys.stderr.write(f" Suggestion: {fix['suggestion']}\n")
|
|
all_suggestions.append(fix['id'])
|
|
|
|
if not all_suggestions:
|
|
sys.stderr.write("\nNo automatic fixes needed!\n")
|
|
sys.exit(0)
|
|
|
|
# Determine which fixes to apply
|
|
fixes_to_apply = []
|
|
custom_values = {}
|
|
|
|
if args.all:
|
|
# Apply all auto-fixable issues
|
|
for category, fixes in suggestions.items():
|
|
for fix in fixes:
|
|
if fix['auto_fixable']:
|
|
fixes_to_apply.append(fix['id'])
|
|
# Use CLI values if provided, otherwise use suggestions
|
|
if fix['id'] == 'add_title' and args.title:
|
|
custom_values['title'] = args.title
|
|
elif fix['id'] == 'add_author' and args.author:
|
|
custom_values['author'] = args.author
|
|
elif fix['id'] == 'add_subject' and args.subject:
|
|
custom_values['subject'] = args.subject
|
|
elif fix['id'] == 'set_language':
|
|
custom_values['language'] = args.language
|
|
else:
|
|
# Apply only what was explicitly requested
|
|
if args.title:
|
|
fixes_to_apply.append('add_title')
|
|
custom_values['title'] = args.title
|
|
if args.author:
|
|
fixes_to_apply.append('add_author')
|
|
custom_values['author'] = args.author
|
|
if args.subject:
|
|
fixes_to_apply.append('add_subject')
|
|
custom_values['subject'] = args.subject
|
|
if args.language != 'en-US': # If custom language specified
|
|
fixes_to_apply.append('set_language')
|
|
custom_values['language'] = args.language
|
|
if args.add_bookmarks:
|
|
fixes_to_apply.append('add_bookmarks')
|
|
if args.mark_tagged:
|
|
fixes_to_apply.append('mark_tagged')
|
|
|
|
if not fixes_to_apply:
|
|
sys.stderr.write("\nNo fixes specified. Use --all or specify individual fixes.\n")
|
|
sys.stderr.write(" Example: python pdf_remediation.py file.pdf --title 'My Document' --language en-US\n")
|
|
sys.exit(1)
|
|
|
|
# Validate output path parent directory exists (or create it)
|
|
output_path = args.output
|
|
if output_path:
|
|
output_dir = Path(output_path).parent
|
|
if not output_dir.exists():
|
|
try:
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
sys.stderr.write(f"Created output directory: {output_dir}\n")
|
|
except OSError as e:
|
|
sys.stderr.write(f"Error: Cannot create output directory '{output_dir}': {e}\n")
|
|
sys.exit(1)
|
|
|
|
# Apply fixes
|
|
sys.stderr.write(f"\n{'='*60}\n")
|
|
sys.stderr.write("Applying Fixes...\n")
|
|
sys.stderr.write(f"{'='*60}\n\n")
|
|
|
|
result = remediator.apply_fixes(fixes_to_apply, output_path, custom_values)
|
|
|
|
if result['success']:
|
|
sys.stderr.write("Remediation Complete!\n")
|
|
sys.stderr.write(f"\nOutput: {result['output_path']}\n")
|
|
sys.stderr.write("\nFixes Applied:\n")
|
|
for fix in result['fixes_applied']:
|
|
sys.stderr.write(f" - {fix}\n")
|
|
|
|
# Optionally run veraPDF validation on result
|
|
if os.isatty(sys.stderr.fileno()): # Only if running interactively (not from web)
|
|
sys.stderr.write(f"\n{'='*60}\n")
|
|
sys.stderr.write("Validating Remediated PDF with veraPDF...\n")
|
|
sys.stderr.write(f"{'='*60}\n\n")
|
|
|
|
validator = VeraPDFValidator()
|
|
validation = validator.validate(result['output_path'])
|
|
|
|
if 'error' not in validation:
|
|
compliant_str = "PASS" if validation['compliant'] else "FAIL"
|
|
sys.stderr.write(f"PDF/UA Compliance: {compliant_str}\n")
|
|
sys.stderr.write(f"Passed Rules: {validation['passed_rules']}\n")
|
|
sys.stderr.write(f"Failed Rules: {validation['failed_rules']}\n")
|
|
|
|
if validation['errors']:
|
|
sys.stderr.write(f"\nRemaining Issues ({len(validation['errors'])}):\n")
|
|
for i, error in enumerate(validation['errors'][:10], 1):
|
|
sys.stderr.write(f" {i}. Clause {error['clause']}: {error['description'][:80]}...\n")
|
|
|
|
if len(validation['errors']) > 10:
|
|
sys.stderr.write(f" ... and {len(validation['errors']) - 10} more\n")
|
|
|
|
sys.exit(0)
|
|
else:
|
|
sys.stderr.write("Remediation failed\n")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|