#!/usr/bin/env python3 """ PDF Accessibility Checker — Redis Queue Worker Daemon that: 1. Connects to Redis + PostgreSQL 2. BRPOP from pdf:queue (blocking wait) 3. Runs EnterprisePDFChecker on the PDF 4. Stores results in PostgreSQL + JSON file 5. Loops until SIGTERM """ import os import sys import json import signal import time import logging from pathlib import Path from redis_queue import pop_job, set_job_status from db_manager import create_job, update_job_status, log_audit logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(name)s] %(levelname)s: %(message)s' ) logger = logging.getLogger('worker') RESULTS_DIR = Path(os.getenv('RESULTS_DIR', '/app/results')) UPLOADS_DIR = Path(os.getenv('UPLOADS_DIR', '/app/uploads')) shutdown_requested = False def handle_signal(signum, frame): global shutdown_requested logger.info("Shutdown signal received, finishing current job...") shutdown_requested = True signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) def process_job(job_data: dict): """Process a single PDF check job.""" job_id = job_data['job_id'] pdf_path = job_data['pdf_path'] options = job_data.get('options', {}) logger.info("Processing job %s: %s", job_id, pdf_path) # Create DB record before processing try: filename = job_data.get('original_filename', os.path.basename(pdf_path)) create_job(job_id, filename) except Exception as e: logger.warning("DB create_job failed (non-fatal): %s", e) set_job_status(job_id, 'processing', 5, 'Starting PDF analysis') start_time = time.time() try: from enterprise_pdf_checker import EnterprisePDFChecker # Build config from environment config = { 'anthropic_api_key': os.getenv('ANTHROPIC_API_KEY'), 'google_api_key': os.getenv('GOOGLE_API_KEY'), } quick_mode = options.get('quick_mode', False) set_job_status(job_id, 'processing', 10, 'Initializing checker') checker = EnterprisePDFChecker(pdf_path, config, quick_mode=quick_mode) set_job_status(job_id, 'processing', 20, 'Running accessibility checks') checker.check_all() set_job_status(job_id, 'processing', 85, 'Generating page images') # Generate page images for visual inspector output_path = RESULTS_DIR / f"{job_id}.result.json" images_dir = RESULTS_DIR / f"{job_id}.result_images" checker._generate_page_images(images_dir) processing_time = time.time() - start_time set_job_status(job_id, 'processing', 90, 'Saving results') # Get full results including page_images after generation results = checker.to_dict() # Write JSON result file (for backward compatibility with api.php) with open(output_path, 'w') as f: json.dump(results, f, indent=2, default=str) # Extract summary fields score = results.get('accessibility_score', 0) grade = results.get('grade', 'F') issues = results.get('issues', []) total_issues = len(issues) critical_count = sum(1 for i in issues if i.get('severity') == 'CRITICAL') error_count = sum(1 for i in issues if i.get('severity') == 'ERROR') warning_count = sum(1 for i in issues if i.get('severity') == 'WARNING') # Update PostgreSQL update_job_status( job_id, 'completed', result_json=results, score=score, grade=grade, total_issues=total_issues, critical_count=critical_count, error_count=error_count, warning_count=warning_count, processing_time=processing_time ) set_job_status(job_id, 'completed', 100, 'Done') log_audit(job_id, 'check_completed', { 'score': score, 'grade': grade, 'processing_time': round(processing_time, 2) }) logger.info( "Job %s completed: score=%s grade=%s issues=%d (%.1fs)", job_id, score, grade, total_issues, processing_time ) except Exception as e: processing_time = time.time() - start_time error_msg = str(e) logger.error("Job %s failed: %s", job_id, error_msg) update_job_status(job_id, 'failed', processing_time=processing_time) set_job_status(job_id, 'failed', 0, error_msg[:500]) log_audit(job_id, 'check_failed', {'error': error_msg[:500]}) # Write error log for backward compatibility error_log = RESULTS_DIR / f"{job_id}.error.log" with open(error_log, 'w') as f: f.write(error_msg) def main(): logger.info("Worker starting — waiting for jobs on Redis queue") while not shutdown_requested: try: job_data = pop_job(timeout=5) if job_data: process_job(job_data) except KeyboardInterrupt: break except Exception as e: logger.error("Worker error: %s", e) time.sleep(2) logger.info("Worker shutting down gracefully") if __name__ == '__main__': main()