PDF-accessibility-saas/backend/app/services/queue.py
Vadym Samoilenko fc6f4a12e6 Phase 2+3: FastAPI backend + multi-tenancy schema
Backend (replaces PHP api.php + auth.php):
- FastAPI app with routers: jobs, auth, billing
- Supabase JWT authentication in deps.py
- Celery + Redis job queue (process_pdf_task)
- MinIO S3-compatible storage service
- PDF checker wrapper (delegates to enterprise_pdf_checker.py)
- Stripe billing: checkout, portal, webhook handler

Multi-tenancy (Phase 3):
- Alembic migration 001: workspaces, workspace_members, jobs, usage_events
- Row-Level Security on all tenant tables via app.workspace_id session var
- Monthly quota enforcement per workspace (402 on exceeded)
- Plan tiers: free(5) / pro(100) / business(unlimited)

Config:
- pydantic-settings based config.py (no hardcoded values)
- docker-compose.yml rewritten: postgres, redis, minio, api, celery

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 14:46:05 +01:00

74 lines
2.7 KiB
Python

"""Celery app + task definitions."""
from celery import Celery
from app.config import get_settings
settings = get_settings()
celery_app = Celery(
"pdf_accessibility",
broker=settings.redis_url,
backend=settings.redis_url,
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1, # one PDF at a time per worker
)
@celery_app.task(bind=True, max_retries=2, name="tasks.process_pdf")
def process_pdf_task(self, job_id: str, storage_key: str, filename: str, workspace_id: str):
"""Download PDF from MinIO, run checker, store result, update job status."""
import asyncio
from sqlalchemy import text, update
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.services import storage, checker as checker_svc
from datetime import datetime, timezone
engine = create_async_engine(settings.database_url)
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def _run():
async with Session() as db:
# Mark processing
await db.execute(
text("UPDATE jobs SET status='processing', updated_at=NOW() WHERE id=:id"),
{"id": job_id},
)
await db.commit()
pdf_bytes = storage.download_bytes(storage_key)
try:
result = checker_svc.run_check(pdf_bytes, filename)
score = result.get("accessibility_score", 0)
async with Session() as db:
await db.execute(
text("""
UPDATE jobs
SET status='completed', result=:result::jsonb,
accessibility_score=:score, completed_at=NOW(), updated_at=NOW()
WHERE id=:id
"""),
{"result": __import__("json").dumps(result), "score": score, "id": job_id},
)
await db.execute(
text("INSERT INTO usage_events (workspace_id, event_type, job_id) VALUES (:wid, 'pdf_checked', :jid)"),
{"wid": workspace_id, "jid": job_id},
)
await db.commit()
except Exception as exc:
async with Session() as db:
await db.execute(
text("UPDATE jobs SET status='failed', error_message=:err, updated_at=NOW() WHERE id=:id"),
{"err": str(exc), "id": job_id},
)
await db.commit()
raise self.retry(exc=exc, countdown=30)
asyncio.run(_run())