pdf-accessibility/worker.py
Vadym Samoilenko 112719b2c5 Add Docker stack, frontend redesign, and visual page inspector fix
- Redesigned frontend with Outfit/Figtree typography, coral accent palette,
  noise texture, glassmorphism header, and staggered animations
- Split monolithic index.html into modular JS (app, api, upload, batch,
  results, page-viewer, utils) and extracted CSS
- Fixed worker.py to generate page images for Visual Page Inspector
- Added Docker Compose stack (web, worker, redis, postgres)
- Added batch upload, HTML report export, rate limiting, and Redis queue
- Extended test suite with checker, remediation, worker, and DB tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 18:12:44 +00:00

163 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""
PDF Accessibility Checker — Redis Queue Worker
Daemon that:
1. Connects to Redis + PostgreSQL
2. BRPOP from pdf:queue (blocking wait)
3. Runs EnterprisePDFChecker on the PDF
4. Stores results in PostgreSQL + JSON file
5. Loops until SIGTERM
"""
import os
import sys
import json
import signal
import time
import logging
from pathlib import Path
from redis_queue import pop_job, set_job_status
from db_manager import create_job, update_job_status, log_audit
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
)
logger = logging.getLogger('worker')
RESULTS_DIR = Path(os.getenv('RESULTS_DIR', '/app/results'))
UPLOADS_DIR = Path(os.getenv('UPLOADS_DIR', '/app/uploads'))
shutdown_requested = False
def handle_signal(signum, frame):
global shutdown_requested
logger.info("Shutdown signal received, finishing current job...")
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
def process_job(job_data: dict):
"""Process a single PDF check job."""
job_id = job_data['job_id']
pdf_path = job_data['pdf_path']
options = job_data.get('options', {})
logger.info("Processing job %s: %s", job_id, pdf_path)
# Create DB record before processing
try:
filename = job_data.get('original_filename', os.path.basename(pdf_path))
create_job(job_id, filename)
except Exception as e:
logger.warning("DB create_job failed (non-fatal): %s", e)
set_job_status(job_id, 'processing', 5, 'Starting PDF analysis')
start_time = time.time()
try:
from enterprise_pdf_checker import EnterprisePDFChecker
# Build config from environment
config = {
'anthropic_api_key': os.getenv('ANTHROPIC_API_KEY'),
'google_api_key': os.getenv('GOOGLE_API_KEY'),
}
quick_mode = options.get('quick_mode', False)
set_job_status(job_id, 'processing', 10, 'Initializing checker')
checker = EnterprisePDFChecker(pdf_path, config, quick_mode=quick_mode)
set_job_status(job_id, 'processing', 20, 'Running accessibility checks')
checker.check_all()
set_job_status(job_id, 'processing', 85, 'Generating page images')
# Generate page images for visual inspector
output_path = RESULTS_DIR / f"{job_id}.result.json"
images_dir = RESULTS_DIR / f"{job_id}.result_images"
checker._generate_page_images(images_dir)
processing_time = time.time() - start_time
set_job_status(job_id, 'processing', 90, 'Saving results')
# Get full results including page_images after generation
results = checker.to_dict()
# Write JSON result file (for backward compatibility with api.php)
with open(output_path, 'w') as f:
json.dump(results, f, indent=2, default=str)
# Extract summary fields
score = results.get('accessibility_score', 0)
grade = results.get('grade', 'F')
issues = results.get('issues', [])
total_issues = len(issues)
critical_count = sum(1 for i in issues if i.get('severity') == 'CRITICAL')
error_count = sum(1 for i in issues if i.get('severity') == 'ERROR')
warning_count = sum(1 for i in issues if i.get('severity') == 'WARNING')
# Update PostgreSQL
update_job_status(
job_id, 'completed',
result_json=results,
score=score,
grade=grade,
total_issues=total_issues,
critical_count=critical_count,
error_count=error_count,
warning_count=warning_count,
processing_time=processing_time
)
set_job_status(job_id, 'completed', 100, 'Done')
log_audit(job_id, 'check_completed', {
'score': score, 'grade': grade,
'processing_time': round(processing_time, 2)
})
logger.info(
"Job %s completed: score=%s grade=%s issues=%d (%.1fs)",
job_id, score, grade, total_issues, processing_time
)
except Exception as e:
processing_time = time.time() - start_time
error_msg = str(e)
logger.error("Job %s failed: %s", job_id, error_msg)
update_job_status(job_id, 'failed', processing_time=processing_time)
set_job_status(job_id, 'failed', 0, error_msg[:500])
log_audit(job_id, 'check_failed', {'error': error_msg[:500]})
# Write error log for backward compatibility
error_log = RESULTS_DIR / f"{job_id}.error.log"
with open(error_log, 'w') as f:
f.write(error_msg)
def main():
logger.info("Worker starting — waiting for jobs on Redis queue")
while not shutdown_requested:
try:
job_data = pop_job(timeout=5)
if job_data:
process_job(job_data)
except KeyboardInterrupt:
break
except Exception as e:
logger.error("Worker error: %s", e)
time.sleep(2)
logger.info("Worker shutting down gracefully")
if __name__ == '__main__':
main()