"""Celery app + task definitions.""" from celery import Celery from app.config import get_settings settings = get_settings() celery_app = Celery( "pdf_accessibility", broker=settings.redis_url, backend=settings.redis_url, ) celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], timezone="UTC", task_track_started=True, task_acks_late=True, worker_prefetch_multiplier=1, # one PDF at a time per worker ) @celery_app.task(bind=True, max_retries=2, name="tasks.process_pdf") def process_pdf_task(self, job_id: str, storage_key: str, filename: str, workspace_id: str): """Download PDF from MinIO, run checker, store result, update job status.""" import asyncio from sqlalchemy import text, update from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from app.services import storage, checker as checker_svc from datetime import datetime, timezone engine = create_async_engine(settings.database_url) Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async def _run(): async with Session() as db: # Mark processing await db.execute( text("UPDATE jobs SET status='processing', updated_at=NOW() WHERE id=:id"), {"id": job_id}, ) await db.commit() pdf_bytes = storage.download_bytes(storage_key) try: result = checker_svc.run_check(pdf_bytes, filename) score = result.get("accessibility_score", 0) async with Session() as db: await db.execute( text(""" UPDATE jobs SET status='completed', result=:result::jsonb, accessibility_score=:score, completed_at=NOW(), updated_at=NOW() WHERE id=:id """), {"result": __import__("json").dumps(result), "score": score, "id": job_id}, ) await db.execute( text("INSERT INTO usage_events (workspace_id, event_type, job_id) VALUES (:wid, 'pdf_checked', :jid)"), {"wid": workspace_id, "jid": job_id}, ) await db.commit() except Exception as exc: async with Session() as db: await db.execute( text("UPDATE jobs SET status='failed', error_message=:err, updated_at=NOW() WHERE id=:id"), {"err": str(exc), "id": job_id}, ) await db.commit() raise self.retry(exc=exc, countdown=30) asyncio.run(_run())