- Redesigned frontend with Outfit/Figtree typography, coral accent palette, noise texture, glassmorphism header, and staggered animations - Split monolithic index.html into modular JS (app, api, upload, batch, results, page-viewer, utils) and extracted CSS - Fixed worker.py to generate page images for Visual Page Inspector - Added Docker Compose stack (web, worker, redis, postgres) - Added batch upload, HTML report export, rate limiting, and Redis queue - Extended test suite with checker, remediation, worker, and DB tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
163 lines
5.1 KiB
Python
163 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PDF Accessibility Checker — Redis Queue Worker
|
|
|
|
Daemon that:
|
|
1. Connects to Redis + PostgreSQL
|
|
2. BRPOP from pdf:queue (blocking wait)
|
|
3. Runs EnterprisePDFChecker on the PDF
|
|
4. Stores results in PostgreSQL + JSON file
|
|
5. Loops until SIGTERM
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import signal
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from redis_queue import pop_job, set_job_status
|
|
from db_manager import create_job, update_job_status, log_audit
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
|
|
)
|
|
logger = logging.getLogger('worker')
|
|
|
|
RESULTS_DIR = Path(os.getenv('RESULTS_DIR', '/app/results'))
|
|
UPLOADS_DIR = Path(os.getenv('UPLOADS_DIR', '/app/uploads'))
|
|
|
|
shutdown_requested = False
|
|
|
|
|
|
def handle_signal(signum, frame):
|
|
global shutdown_requested
|
|
logger.info("Shutdown signal received, finishing current job...")
|
|
shutdown_requested = True
|
|
|
|
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
|
|
|
|
def process_job(job_data: dict):
|
|
"""Process a single PDF check job."""
|
|
job_id = job_data['job_id']
|
|
pdf_path = job_data['pdf_path']
|
|
options = job_data.get('options', {})
|
|
|
|
logger.info("Processing job %s: %s", job_id, pdf_path)
|
|
|
|
# Create DB record before processing
|
|
try:
|
|
filename = job_data.get('original_filename', os.path.basename(pdf_path))
|
|
create_job(job_id, filename)
|
|
except Exception as e:
|
|
logger.warning("DB create_job failed (non-fatal): %s", e)
|
|
|
|
set_job_status(job_id, 'processing', 5, 'Starting PDF analysis')
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
from enterprise_pdf_checker import EnterprisePDFChecker
|
|
|
|
# Build config from environment
|
|
config = {
|
|
'anthropic_api_key': os.getenv('ANTHROPIC_API_KEY'),
|
|
'google_api_key': os.getenv('GOOGLE_API_KEY'),
|
|
}
|
|
|
|
quick_mode = options.get('quick_mode', False)
|
|
set_job_status(job_id, 'processing', 10, 'Initializing checker')
|
|
checker = EnterprisePDFChecker(pdf_path, config, quick_mode=quick_mode)
|
|
|
|
set_job_status(job_id, 'processing', 20, 'Running accessibility checks')
|
|
checker.check_all()
|
|
|
|
set_job_status(job_id, 'processing', 85, 'Generating page images')
|
|
|
|
# Generate page images for visual inspector
|
|
output_path = RESULTS_DIR / f"{job_id}.result.json"
|
|
images_dir = RESULTS_DIR / f"{job_id}.result_images"
|
|
checker._generate_page_images(images_dir)
|
|
|
|
processing_time = time.time() - start_time
|
|
set_job_status(job_id, 'processing', 90, 'Saving results')
|
|
|
|
# Get full results including page_images after generation
|
|
results = checker.to_dict()
|
|
|
|
# Write JSON result file (for backward compatibility with api.php)
|
|
with open(output_path, 'w') as f:
|
|
json.dump(results, f, indent=2, default=str)
|
|
|
|
# Extract summary fields
|
|
score = results.get('accessibility_score', 0)
|
|
grade = results.get('grade', 'F')
|
|
issues = results.get('issues', [])
|
|
total_issues = len(issues)
|
|
critical_count = sum(1 for i in issues if i.get('severity') == 'CRITICAL')
|
|
error_count = sum(1 for i in issues if i.get('severity') == 'ERROR')
|
|
warning_count = sum(1 for i in issues if i.get('severity') == 'WARNING')
|
|
|
|
# Update PostgreSQL
|
|
update_job_status(
|
|
job_id, 'completed',
|
|
result_json=results,
|
|
score=score,
|
|
grade=grade,
|
|
total_issues=total_issues,
|
|
critical_count=critical_count,
|
|
error_count=error_count,
|
|
warning_count=warning_count,
|
|
processing_time=processing_time
|
|
)
|
|
set_job_status(job_id, 'completed', 100, 'Done')
|
|
log_audit(job_id, 'check_completed', {
|
|
'score': score, 'grade': grade,
|
|
'processing_time': round(processing_time, 2)
|
|
})
|
|
|
|
logger.info(
|
|
"Job %s completed: score=%s grade=%s issues=%d (%.1fs)",
|
|
job_id, score, grade, total_issues, processing_time
|
|
)
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
error_msg = str(e)
|
|
logger.error("Job %s failed: %s", job_id, error_msg)
|
|
|
|
update_job_status(job_id, 'failed', processing_time=processing_time)
|
|
set_job_status(job_id, 'failed', 0, error_msg[:500])
|
|
log_audit(job_id, 'check_failed', {'error': error_msg[:500]})
|
|
|
|
# Write error log for backward compatibility
|
|
error_log = RESULTS_DIR / f"{job_id}.error.log"
|
|
with open(error_log, 'w') as f:
|
|
f.write(error_msg)
|
|
|
|
|
|
def main():
|
|
logger.info("Worker starting — waiting for jobs on Redis queue")
|
|
|
|
while not shutdown_requested:
|
|
try:
|
|
job_data = pop_job(timeout=5)
|
|
if job_data:
|
|
process_job(job_data)
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception as e:
|
|
logger.error("Worker error: %s", e)
|
|
time.sleep(2)
|
|
|
|
logger.info("Worker shutting down gracefully")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|