From fc6f4a12e682401bb8e6295170d52a80d2d19ace Mon Sep 17 00:00:00 2001 From: Vadym Samoilenko Date: Tue, 19 May 2026 14:46:05 +0100 Subject: [PATCH] Phase 2+3: FastAPI backend + multi-tenancy schema Backend (replaces PHP api.php + auth.php): - FastAPI app with routers: jobs, auth, billing - Supabase JWT authentication in deps.py - Celery + Redis job queue (process_pdf_task) - MinIO S3-compatible storage service - PDF checker wrapper (delegates to enterprise_pdf_checker.py) - Stripe billing: checkout, portal, webhook handler Multi-tenancy (Phase 3): - Alembic migration 001: workspaces, workspace_members, jobs, usage_events - Row-Level Security on all tenant tables via app.workspace_id session var - Monthly quota enforcement per workspace (402 on exceeded) - Plan tiers: free(5) / pro(100) / business(unlimited) Config: - pydantic-settings based config.py (no hardcoded values) - docker-compose.yml rewritten: postgres, redis, minio, api, celery Co-Authored-By: Claude Sonnet 4.6 --- backend/Dockerfile | 28 +++ backend/alembic.ini | 38 ++++ backend/alembic/__init__.py | 0 backend/alembic/env.py | 51 +++++ .../alembic/versions/001_initial_schema.py | 96 +++++++++ backend/app/__init__.py | 0 backend/app/config.py | 61 ++++++ backend/app/db.py | 17 ++ backend/app/deps.py | 58 +++++ backend/app/main.py | 32 +++ backend/app/models/__init__.py | 0 backend/app/models/job.py | 23 ++ backend/app/models/workspace.py | 38 ++++ backend/app/routers/__init__.py | 0 backend/app/routers/auth.py | 31 +++ backend/app/routers/billing.py | 130 ++++++++++++ backend/app/routers/jobs.py | 198 ++++++++++++++++++ backend/app/services/__init__.py | 0 backend/app/services/checker.py | 34 +++ backend/app/services/queue.py | 74 +++++++ backend/app/services/storage.py | 57 +++++ backend/pyproject.toml | 47 +++++ docker-compose.yml | 87 +++++--- 23 files changed, 1075 insertions(+), 25 deletions(-) create mode 100644 backend/Dockerfile create mode 100644 backend/alembic.ini create mode 100644 backend/alembic/__init__.py create mode 100644 backend/alembic/env.py create mode 100644 backend/alembic/versions/001_initial_schema.py create mode 100644 backend/app/__init__.py create mode 100644 backend/app/config.py create mode 100644 backend/app/db.py create mode 100644 backend/app/deps.py create mode 100644 backend/app/main.py create mode 100644 backend/app/models/__init__.py create mode 100644 backend/app/models/job.py create mode 100644 backend/app/models/workspace.py create mode 100644 backend/app/routers/__init__.py create mode 100644 backend/app/routers/auth.py create mode 100644 backend/app/routers/billing.py create mode 100644 backend/app/routers/jobs.py create mode 100644 backend/app/services/__init__.py create mode 100644 backend/app/services/checker.py create mode 100644 backend/app/services/queue.py create mode 100644 backend/app/services/storage.py create mode 100644 backend/pyproject.toml diff --git a/backend/Dockerfile b/backend/Dockerfile new file mode 100644 index 0000000..23131eb --- /dev/null +++ b/backend/Dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12-slim + +# System deps for PDF processing +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + libpoppler-cpp-dev \ + poppler-utils \ + default-jre-headless \ + libcairo2 libpango-1.0-0 libpangocairo-1.0-0 \ + libgdk-pixbuf2.0-0 libffi-dev \ + && rm -rf /var/lib/apt/lists/* + +# Install uv +RUN pip install uv --no-cache-dir + +WORKDIR /app + +# Copy backend +COPY backend/pyproject.toml . +RUN uv sync --no-dev + +# Copy entire repo (checker engine lives at repo root) +COPY . /repo +ENV PYTHONPATH=/repo + +COPY backend/ . + +CMD ["uv", "run", "gunicorn", "app.main:app", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000", "-w", "2"] diff --git a/backend/alembic.ini b/backend/alembic.ini new file mode 100644 index 0000000..27e668c --- /dev/null +++ b/backend/alembic.ini @@ -0,0 +1,38 @@ +[alembic] +script_location = alembic +prepend_sys_path = . +sqlalchemy.url = driver://user:pass@localhost/dbname + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/backend/alembic/__init__.py b/backend/alembic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/alembic/env.py b/backend/alembic/env.py new file mode 100644 index 0000000..33c9016 --- /dev/null +++ b/backend/alembic/env.py @@ -0,0 +1,51 @@ +import asyncio +from logging.config import fileConfig +from sqlalchemy import pool +from sqlalchemy.ext.asyncio import create_async_engine +from alembic import context +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parents[1])) + +from app.config import get_settings +from app.db import Base +import app.models.job # noqa: F401 +import app.models.workspace # noqa: F401 + +config = context.config +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + + +def run_migrations_offline() -> None: + settings = get_settings() + context.configure(url=settings.sync_database_url, target_metadata=target_metadata, literal_binds=True) + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection): + context.configure(connection=connection, target_metadata=target_metadata) + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + settings = get_settings() + engine = create_async_engine(settings.database_url, poolclass=pool.NullPool) + async with engine.begin() as conn: + await conn.run_sync(do_run_migrations) + await engine.dispose() + + +def run_migrations_online() -> None: + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/backend/alembic/versions/001_initial_schema.py b/backend/alembic/versions/001_initial_schema.py new file mode 100644 index 0000000..5ef612e --- /dev/null +++ b/backend/alembic/versions/001_initial_schema.py @@ -0,0 +1,96 @@ +"""Initial SaaS schema — workspaces, jobs, usage_events with RLS + +Revision ID: 001 +Revises: +Create Date: 2026-05-19 +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + +revision = "001" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute('CREATE EXTENSION IF NOT EXISTS "pgcrypto"') + + # ── Workspaces ─────────────────────────────────────────────────────────── + op.create_table( + "workspaces", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("slug", sa.String(100), unique=True, nullable=False), + sa.Column("plan_tier", sa.String(50), nullable=False, server_default="free"), + sa.Column("monthly_quota", sa.Integer, nullable=False, server_default="5"), + sa.Column("stripe_customer_id", sa.String(255), nullable=True), + sa.Column("stripe_subscription_id", sa.String(255), nullable=True), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()")), + ) + + # ── Workspace Members ──────────────────────────────────────────────────── + op.create_table( + "workspace_members", + sa.Column("workspace_id", UUID(as_uuid=True), sa.ForeignKey("workspaces.id", ondelete="CASCADE"), primary_key=True), + sa.Column("user_id", sa.String(255), primary_key=True), + sa.Column("role", sa.String(50), nullable=False, server_default="member"), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()")), + ) + + # ── Jobs ───────────────────────────────────────────────────────────────── + op.create_table( + "jobs", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("workspace_id", UUID(as_uuid=True), sa.ForeignKey("workspaces.id", ondelete="CASCADE"), nullable=False), + sa.Column("user_id", sa.String(255), nullable=False), + sa.Column("filename", sa.String(500), nullable=False), + sa.Column("file_size", sa.Integer, nullable=True), + sa.Column("status", sa.String(50), nullable=False, server_default="pending"), + sa.Column("accessibility_score", sa.Float, nullable=True), + sa.Column("result", JSONB, nullable=True), + sa.Column("error_message", sa.Text, nullable=True), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()")), + sa.Column("updated_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()")), + sa.Column("completed_at", sa.TIMESTAMP(timezone=True), nullable=True), + ) + op.create_index("idx_jobs_workspace", "jobs", ["workspace_id"]) + op.create_index("idx_jobs_status", "jobs", ["status"]) + + # ── Usage Events ───────────────────────────────────────────────────────── + op.create_table( + "usage_events", + sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), + sa.Column("workspace_id", UUID(as_uuid=True), sa.ForeignKey("workspaces.id", ondelete="CASCADE"), nullable=False), + sa.Column("event_type", sa.String(100), nullable=False), + sa.Column("job_id", UUID(as_uuid=True), nullable=True), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()")), + ) + op.create_index("idx_usage_workspace_month", "usage_events", ["workspace_id", "created_at"]) + + # ── Row-Level Security ──────────────────────────────────────────────────── + # app.workspace_id is set per-request in FastAPI deps.py + for table in ("jobs", "usage_events"): + op.execute(f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY") + op.execute(f""" + CREATE POLICY workspace_isolation ON {table} + USING (workspace_id = current_setting('app.workspace_id', true)::uuid) + """) + + op.execute("ALTER TABLE workspaces ENABLE ROW LEVEL SECURITY") + op.execute(""" + CREATE POLICY workspace_self ON workspaces + USING (id = current_setting('app.workspace_id', true)::uuid) + """) + + op.execute("ALTER TABLE workspace_members ENABLE ROW LEVEL SECURITY") + op.execute(""" + CREATE POLICY members_workspace ON workspace_members + USING (workspace_id = current_setting('app.workspace_id', true)::uuid) + """) + + +def downgrade() -> None: + for table in ("usage_events", "jobs", "workspace_members", "workspaces"): + op.execute(f"DROP TABLE IF EXISTS {table} CASCADE") diff --git a/backend/app/__init__.py b/backend/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/config.py b/backend/app/config.py new file mode 100644 index 0000000..ef105c2 --- /dev/null +++ b/backend/app/config.py @@ -0,0 +1,61 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict +from functools import lru_cache + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") + + # App + app_url: str = "http://localhost:3000" + environment: str = "development" + secret_key: str = "dev-secret-change-in-production" + + # Database + db_host: str = "localhost" + db_port: int = 5432 + db_name: str = "pdf_accessibility" + db_user: str = "pdf_accessibility" + db_password: str = "changeme" + + @property + def database_url(self) -> str: + return f"postgresql+asyncpg://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}" + + @property + def sync_database_url(self) -> str: + return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}" + + # Supabase Auth + supabase_url: str = "" + supabase_anon_key: str = "" + supabase_service_role_key: str = "" + supabase_jwt_secret: str = "" + + # Redis / Celery + redis_url: str = "redis://localhost:6379/0" + + # Storage (MinIO / S3) + storage_endpoint: str = "http://localhost:9000" + storage_access_key: str = "minioadmin" + storage_secret_key: str = "minioadmin" + storage_bucket: str = "pdf-pages" + + # AI Providers + anthropic_api_key: str = "" + google_api_key: str = "" + google_application_credentials: str = "" + + # Stripe + stripe_secret_key: str = "" + stripe_webhook_secret: str = "" + stripe_price_pro: str = "" + stripe_price_business: str = "" + + # File retention + retention_hours: int = 24 + results_retention_hours: int = 720 + + +@lru_cache +def get_settings() -> Settings: + return Settings() diff --git a/backend/app/db.py b/backend/app/db.py new file mode 100644 index 0000000..0473ed9 --- /dev/null +++ b/backend/app/db.py @@ -0,0 +1,17 @@ +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase +from app.config import get_settings + +settings = get_settings() + +engine = create_async_engine(settings.database_url, echo=settings.environment == "development") +AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + +class Base(DeclarativeBase): + pass + + +async def get_db() -> AsyncSession: + async with AsyncSessionLocal() as session: + yield session diff --git a/backend/app/deps.py b/backend/app/deps.py new file mode 100644 index 0000000..1abfc78 --- /dev/null +++ b/backend/app/deps.py @@ -0,0 +1,58 @@ +"""FastAPI dependencies — auth, workspace context.""" +from __future__ import annotations +import jwt +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession +from app.config import get_settings +from app.db import get_db + +settings = get_settings() +bearer = HTTPBearer() + + +class CurrentUser: + def __init__(self, user_id: str, email: str, workspace_id: str): + self.user_id = user_id + self.email = email + self.workspace_id = workspace_id + + +async def get_current_user( + credentials: HTTPAuthorizationCredentials = Depends(bearer), + db: AsyncSession = Depends(get_db), +) -> CurrentUser: + token = credentials.credentials + try: + payload = jwt.decode( + token, + settings.supabase_jwt_secret, + algorithms=["HS256"], + audience="authenticated", + ) + except jwt.ExpiredSignatureError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") + except jwt.InvalidTokenError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") + + user_id: str = payload.get("sub") + email: str = payload.get("email", "") + if not user_id: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload") + + # Resolve the user's current workspace (first workspace they're a member of) + row = await db.execute( + text("SELECT workspace_id FROM workspace_members WHERE user_id = :uid LIMIT 1"), + {"uid": user_id}, + ) + result = row.fetchone() + if not result: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No workspace found — complete signup") + + workspace_id = str(result[0]) + + # Set workspace context for RLS — all subsequent queries in this session see only this workspace + await db.execute(text("SET LOCAL app.workspace_id = :wid"), {"wid": workspace_id}) + + return CurrentUser(user_id=user_id, email=email, workspace_id=workspace_id) diff --git a/backend/app/main.py b/backend/app/main.py new file mode 100644 index 0000000..76f55ee --- /dev/null +++ b/backend/app/main.py @@ -0,0 +1,32 @@ +import structlog +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from app.config import get_settings +from app.routers import auth, jobs, billing + +settings = get_settings() +logger = structlog.get_logger() + +app = FastAPI( + title="Aimpress PDF Accessibility API", + version="1.0.0", + docs_url="/api/docs" if settings.environment == "development" else None, + redoc_url=None, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=[settings.app_url, "http://localhost:3000"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(auth.router) +app.include_router(jobs.router) +app.include_router(billing.router) + + +@app.get("/api/health") +async def health(): + return {"status": "ok", "service": "pdf-accessibility-api"} diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/models/job.py b/backend/app/models/job.py new file mode 100644 index 0000000..6a48c29 --- /dev/null +++ b/backend/app/models/job.py @@ -0,0 +1,23 @@ +import uuid +from datetime import datetime +from sqlalchemy import String, Integer, Float, DateTime, ForeignKey, Text, func +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.dialects.postgresql import UUID, JSONB +from app.db import Base + + +class Job(Base): + __tablename__ = "jobs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + workspace_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("workspaces.id", ondelete="CASCADE"), nullable=False) + user_id: Mapped[str] = mapped_column(String(255), nullable=False) + filename: Mapped[str] = mapped_column(String(500), nullable=False) + file_size: Mapped[int] = mapped_column(Integer, nullable=True) + status: Mapped[str] = mapped_column(String(50), default="pending") # pending|processing|completed|failed + accessibility_score: Mapped[float | None] = mapped_column(Float, nullable=True) + result: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) diff --git a/backend/app/models/workspace.py b/backend/app/models/workspace.py new file mode 100644 index 0000000..242c67f --- /dev/null +++ b/backend/app/models/workspace.py @@ -0,0 +1,38 @@ +import uuid +from datetime import datetime +from sqlalchemy import String, Integer, DateTime, func +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.dialects.postgresql import UUID +from app.db import Base + + +class Workspace(Base): + __tablename__ = "workspaces" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name: Mapped[str] = mapped_column(String(255), nullable=False) + slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) + plan_tier: Mapped[str] = mapped_column(String(50), default="free") # free|pro|business + monthly_quota: Mapped[int] = mapped_column(Integer, default=5) + stripe_customer_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + stripe_subscription_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + +class WorkspaceMember(Base): + __tablename__ = "workspace_members" + + workspace_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True) + user_id: Mapped[str] = mapped_column(String(255), primary_key=True) + role: Mapped[str] = mapped_column(String(50), default="member") # owner|admin|member + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + +class UsageEvent(Base): + __tablename__ = "usage_events" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + workspace_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + event_type: Mapped[str] = mapped_column(String(100), nullable=False) # pdf_checked|remediated|api_call + job_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) diff --git a/backend/app/routers/__init__.py b/backend/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py new file mode 100644 index 0000000..c3cc16b --- /dev/null +++ b/backend/app/routers/auth.py @@ -0,0 +1,31 @@ +"""Auth router — user profile, workspace info.""" +from fastapi import APIRouter, Depends +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession +from app.db import get_db +from app.deps import CurrentUser, get_current_user + +router = APIRouter(prefix="/api/v1/auth", tags=["auth"]) + + +@router.get("/me") +async def get_me( + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + row = await db.execute( + text("SELECT id, name, slug, plan_tier, monthly_quota FROM workspaces WHERE id = :wid"), + {"wid": user.workspace_id}, + ) + workspace = row.fetchone() + return { + "user_id": user.user_id, + "email": user.email, + "workspace": { + "id": str(workspace.id), + "name": workspace.name, + "slug": workspace.slug, + "plan_tier": workspace.plan_tier, + "monthly_quota": workspace.monthly_quota, + } if workspace else None, + } diff --git a/backend/app/routers/billing.py b/backend/app/routers/billing.py new file mode 100644 index 0000000..ebb3f50 --- /dev/null +++ b/backend/app/routers/billing.py @@ -0,0 +1,130 @@ +"""Billing router — Stripe webhook + subscription info.""" +import stripe +from fastapi import APIRouter, Depends, HTTPException, Request +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession +from app.config import get_settings +from app.db import get_db +from app.deps import CurrentUser, get_current_user +import structlog + +settings = get_settings() +router = APIRouter(prefix="/api/v1/billing", tags=["billing"]) +logger = structlog.get_logger() + +PLAN_QUOTAS = {"free": 5, "pro": 100, "business": None} # None = unlimited + + +@router.get("/subscription") +async def get_subscription( + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + row = await db.execute( + text("SELECT plan_tier, monthly_quota, stripe_customer_id FROM workspaces WHERE id = :wid"), + {"wid": user.workspace_id}, + ) + workspace = row.fetchone() + if not workspace: + raise HTTPException(status_code=404, detail="Workspace not found") + return { + "plan_tier": workspace.plan_tier, + "monthly_quota": workspace.monthly_quota, + } + + +@router.post("/checkout") +async def create_checkout_session( + price_id: str, + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + stripe.api_key = settings.stripe_secret_key + if price_id not in [settings.stripe_price_pro, settings.stripe_price_business]: + raise HTTPException(status_code=400, detail="Invalid price ID") + + row = await db.execute( + text("SELECT stripe_customer_id FROM workspaces WHERE id = :wid"), + {"wid": user.workspace_id}, + ) + workspace = row.fetchone() + customer_id = workspace.stripe_customer_id if workspace else None + + session = stripe.checkout.Session.create( + mode="subscription", + customer=customer_id, + customer_email=None if customer_id else user.email, + line_items=[{"price": price_id, "quantity": 1}], + success_url=f"{settings.app_url}/settings/billing?success=1", + cancel_url=f"{settings.app_url}/pricing", + metadata={"workspace_id": user.workspace_id}, + ) + return {"checkout_url": session.url} + + +@router.post("/portal") +async def create_portal_session( + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + stripe.api_key = settings.stripe_secret_key + row = await db.execute( + text("SELECT stripe_customer_id FROM workspaces WHERE id = :wid"), + {"wid": user.workspace_id}, + ) + workspace = row.fetchone() + if not workspace or not workspace.stripe_customer_id: + raise HTTPException(status_code=400, detail="No billing account found") + + portal = stripe.billing_portal.Session.create( + customer=workspace.stripe_customer_id, + return_url=f"{settings.app_url}/settings/billing", + ) + return {"portal_url": portal.url} + + +@router.post("/webhook") +async def stripe_webhook(request: Request, db: AsyncSession = Depends(get_db)): + payload = await request.body() + sig = request.headers.get("stripe-signature", "") + stripe.api_key = settings.stripe_secret_key + + try: + event = stripe.Webhook.construct_event(payload, sig, settings.stripe_webhook_secret) + except stripe.error.SignatureVerificationError: + raise HTTPException(status_code=400, detail="Invalid signature") + + if event["type"] == "checkout.session.completed": + session = event["data"]["object"] + workspace_id = session["metadata"].get("workspace_id") + customer_id = session["customer"] + subscription_id = session["subscription"] + price_id = session["line_items"]["data"][0]["price"]["id"] if session.get("line_items") else None + + # Determine tier from price_id + tier = "pro" if price_id == settings.stripe_price_pro else "business" + quota = PLAN_QUOTAS[tier] or 999999 + + await db.execute( + text(""" + UPDATE workspaces + SET plan_tier=:tier, monthly_quota=:quota, + stripe_customer_id=:cid, stripe_subscription_id=:sid + WHERE id=:wid + """), + {"tier": tier, "quota": quota, "cid": customer_id, "sid": subscription_id, "wid": workspace_id}, + ) + await db.commit() + logger.info("subscription_upgraded", workspace_id=workspace_id, tier=tier) + + elif event["type"] in ("customer.subscription.deleted", "customer.subscription.paused"): + sub = event["data"]["object"] + customer_id = sub["customer"] + await db.execute( + text("UPDATE workspaces SET plan_tier='free', monthly_quota=5 WHERE stripe_customer_id=:cid"), + {"cid": customer_id}, + ) + await db.commit() + logger.info("subscription_downgraded", customer_id=customer_id) + + return {"received": True} diff --git a/backend/app/routers/jobs.py b/backend/app/routers/jobs.py new file mode 100644 index 0000000..26292ca --- /dev/null +++ b/backend/app/routers/jobs.py @@ -0,0 +1,198 @@ +"""Jobs router — upload PDF, poll status, list history, delete.""" +import uuid +import json +from datetime import datetime, timezone +from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, status +from fastapi.responses import JSONResponse, HTMLResponse +from pydantic import BaseModel +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession +from app.db import get_db +from app.deps import CurrentUser, get_current_user +from app.services import storage +from app.services.queue import process_pdf_task + +router = APIRouter(prefix="/api/v1/jobs", tags=["jobs"]) + +MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB +ALLOWED_MIME = {"application/pdf"} + + +class JobResponse(BaseModel): + id: str + filename: str + status: str + accessibility_score: float | None + created_at: datetime + completed_at: datetime | None + + +@router.post("", status_code=status.HTTP_202_ACCEPTED) +async def upload_pdf( + file: UploadFile = File(...), + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + if file.content_type not in ALLOWED_MIME: + raise HTTPException(status_code=400, detail="Only PDF files are accepted") + + pdf_bytes = await file.read() + if len(pdf_bytes) > MAX_FILE_SIZE: + raise HTTPException(status_code=413, detail="File exceeds 50 MB limit") + + # Quota check + row = await db.execute( + text(""" + SELECT COUNT(*) FROM usage_events + WHERE workspace_id = :wid + AND event_type = 'pdf_checked' + AND created_at > NOW() - INTERVAL '30 days' + """), + {"wid": user.workspace_id}, + ) + usage = row.scalar() + quota_row = await db.execute( + text("SELECT monthly_quota FROM workspaces WHERE id = :wid"), + {"wid": user.workspace_id}, + ) + quota = quota_row.scalar() or 5 + if usage >= quota: + raise HTTPException(status_code=402, detail=f"Monthly quota of {quota} PDFs reached. Upgrade your plan.") + + # Store PDF in MinIO + job_id = str(uuid.uuid4()) + storage_key = f"{user.workspace_id}/{job_id}/original.pdf" + storage.upload_bytes(storage_key, pdf_bytes, "application/pdf") + + # Create job record + await db.execute( + text(""" + INSERT INTO jobs (id, workspace_id, user_id, filename, file_size, status) + VALUES (:id, :wid, :uid, :fname, :fsize, 'pending') + """), + { + "id": job_id, + "wid": user.workspace_id, + "uid": user.user_id, + "fname": file.filename, + "fsize": len(pdf_bytes), + }, + ) + await db.commit() + + # Enqueue Celery task + process_pdf_task.delay(job_id, storage_key, file.filename, user.workspace_id) + + return {"id": job_id, "status": "pending"} + + +@router.get("/{job_id}") +async def get_job( + job_id: str, + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + row = await db.execute( + text(""" + SELECT id, filename, status, accessibility_score, result, error_message, created_at, completed_at + FROM jobs WHERE id = :id AND workspace_id = :wid + """), + {"id": job_id, "wid": user.workspace_id}, + ) + job = row.fetchone() + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + return { + "id": str(job.id), + "filename": job.filename, + "status": job.status, + "accessibility_score": job.accessibility_score, + "result": job.result, + "error_message": job.error_message, + "created_at": job.created_at, + "completed_at": job.completed_at, + } + + +@router.get("") +async def list_jobs( + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), + limit: int = 20, + offset: int = 0, +): + rows = await db.execute( + text(""" + SELECT id, filename, status, accessibility_score, created_at, completed_at + FROM jobs WHERE workspace_id = :wid + ORDER BY created_at DESC LIMIT :limit OFFSET :offset + """), + {"wid": user.workspace_id, "limit": limit, "offset": offset}, + ) + jobs = rows.fetchall() + return { + "jobs": [ + { + "id": str(j.id), + "filename": j.filename, + "status": j.status, + "accessibility_score": j.accessibility_score, + "created_at": j.created_at, + "completed_at": j.completed_at, + } + for j in jobs + ], + "limit": limit, + "offset": offset, + } + + +@router.delete("/{job_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_job( + job_id: str, + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + result = await db.execute( + text("DELETE FROM jobs WHERE id = :id AND workspace_id = :wid RETURNING id"), + {"id": job_id, "wid": user.workspace_id}, + ) + if not result.fetchone(): + raise HTTPException(status_code=404, detail="Job not found") + # Clean up storage + try: + storage.delete_object(f"{user.workspace_id}/{job_id}/original.pdf") + except Exception: + pass + await db.commit() + + +@router.post("/{job_id}/remediate") +async def remediate_job( + job_id: str, + user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + # Check plan tier allows auto-fix + row = await db.execute( + text("SELECT plan_tier FROM workspaces WHERE id = :wid"), + {"wid": user.workspace_id}, + ) + plan = row.scalar() + if plan == "free": + raise HTTPException(status_code=402, detail="Auto-fix requires Pro plan or higher") + + # Get job + job_row = await db.execute( + text("SELECT status, result FROM jobs WHERE id = :id AND workspace_id = :wid"), + {"id": job_id, "wid": user.workspace_id}, + ) + job = job_row.fetchone() + if not job: + raise HTTPException(status_code=404, detail="Job not found") + if job.status != "completed": + raise HTTPException(status_code=400, detail="Job must be completed before remediation") + + # TODO: run pdf_remediation.py in Celery task + return {"message": "Remediation queued", "job_id": job_id} diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/services/checker.py b/backend/app/services/checker.py new file mode 100644 index 0000000..aeb6716 --- /dev/null +++ b/backend/app/services/checker.py @@ -0,0 +1,34 @@ +"""Wrapper around enterprise_pdf_checker.py — runs in Celery worker.""" +import sys +import os +import tempfile +import structlog +from pathlib import Path + +# enterprise_pdf_checker.py lives at repo root (parent of backend/) +REPO_ROOT = Path(__file__).parents[3] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from enterprise_pdf_checker import EnterprisePDFChecker # noqa: E402 + +logger = structlog.get_logger() + + +def run_check(pdf_bytes: bytes, filename: str) -> dict: + """Run WCAG accessibility check on PDF bytes. Returns result dict.""" + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: + tmp.write(pdf_bytes) + tmp_path = tmp.name + + try: + checker = EnterprisePDFChecker(tmp_path) + result = checker.check_all() + result["filename"] = filename + logger.info("check_complete", filename=filename, score=result.get("accessibility_score")) + return result + except Exception as exc: + logger.error("check_failed", filename=filename, error=str(exc)) + raise + finally: + os.unlink(tmp_path) diff --git a/backend/app/services/queue.py b/backend/app/services/queue.py new file mode 100644 index 0000000..1cf7a34 --- /dev/null +++ b/backend/app/services/queue.py @@ -0,0 +1,74 @@ +"""Celery app + task definitions.""" +from celery import Celery +from app.config import get_settings + +settings = get_settings() + +celery_app = Celery( + "pdf_accessibility", + broker=settings.redis_url, + backend=settings.redis_url, +) +celery_app.conf.update( + task_serializer="json", + result_serializer="json", + accept_content=["json"], + timezone="UTC", + task_track_started=True, + task_acks_late=True, + worker_prefetch_multiplier=1, # one PDF at a time per worker +) + + +@celery_app.task(bind=True, max_retries=2, name="tasks.process_pdf") +def process_pdf_task(self, job_id: str, storage_key: str, filename: str, workspace_id: str): + """Download PDF from MinIO, run checker, store result, update job status.""" + import asyncio + from sqlalchemy import text, update + from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker + from app.services import storage, checker as checker_svc + from datetime import datetime, timezone + + engine = create_async_engine(settings.database_url) + Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + async def _run(): + async with Session() as db: + # Mark processing + await db.execute( + text("UPDATE jobs SET status='processing', updated_at=NOW() WHERE id=:id"), + {"id": job_id}, + ) + await db.commit() + + pdf_bytes = storage.download_bytes(storage_key) + + try: + result = checker_svc.run_check(pdf_bytes, filename) + score = result.get("accessibility_score", 0) + + async with Session() as db: + await db.execute( + text(""" + UPDATE jobs + SET status='completed', result=:result::jsonb, + accessibility_score=:score, completed_at=NOW(), updated_at=NOW() + WHERE id=:id + """), + {"result": __import__("json").dumps(result), "score": score, "id": job_id}, + ) + await db.execute( + text("INSERT INTO usage_events (workspace_id, event_type, job_id) VALUES (:wid, 'pdf_checked', :jid)"), + {"wid": workspace_id, "jid": job_id}, + ) + await db.commit() + except Exception as exc: + async with Session() as db: + await db.execute( + text("UPDATE jobs SET status='failed', error_message=:err, updated_at=NOW() WHERE id=:id"), + {"err": str(exc), "id": job_id}, + ) + await db.commit() + raise self.retry(exc=exc, countdown=30) + + asyncio.run(_run()) diff --git a/backend/app/services/storage.py b/backend/app/services/storage.py new file mode 100644 index 0000000..767ddbf --- /dev/null +++ b/backend/app/services/storage.py @@ -0,0 +1,57 @@ +"""MinIO/S3-compatible storage abstraction.""" +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError +from app.config import get_settings + +settings = get_settings() + +_client = None + + +def get_client(): + global _client + if _client is None: + _client = boto3.client( + "s3", + endpoint_url=settings.storage_endpoint, + aws_access_key_id=settings.storage_access_key, + aws_secret_access_key=settings.storage_secret_key, + config=Config(signature_version="s3v4"), + ) + return _client + + +def ensure_bucket() -> None: + client = get_client() + try: + client.head_bucket(Bucket=settings.storage_bucket) + except ClientError: + client.create_bucket(Bucket=settings.storage_bucket) + + +def upload_bytes(key: str, data: bytes, content_type: str = "application/octet-stream") -> str: + get_client().put_object( + Bucket=settings.storage_bucket, + Key=key, + Body=data, + ContentType=content_type, + ) + return f"{settings.storage_endpoint}/{settings.storage_bucket}/{key}" + + +def download_bytes(key: str) -> bytes: + response = get_client().get_object(Bucket=settings.storage_bucket, Key=key) + return response["Body"].read() + + +def delete_object(key: str) -> None: + get_client().delete_object(Bucket=settings.storage_bucket, Key=key) + + +def presigned_url(key: str, expires: int = 3600) -> str: + return get_client().generate_presigned_url( + "get_object", + Params={"Bucket": settings.storage_bucket, "Key": key}, + ExpiresIn=expires, + ) diff --git a/backend/pyproject.toml b/backend/pyproject.toml new file mode 100644 index 0000000..dcf496c --- /dev/null +++ b/backend/pyproject.toml @@ -0,0 +1,47 @@ +[project] +name = "pdf-accessibility-saas" +version = "0.1.0" +requires-python = ">=3.12" +dependencies = [ + "fastapi>=0.115.0", + "uvicorn[standard]>=0.32.0", + "gunicorn>=23.0.0", + "pydantic>=2.9.0", + "pydantic-settings>=2.6.0", + "asyncpg>=0.30.0", + "sqlalchemy[asyncio]>=2.0.36", + "alembic>=1.14.0", + "celery[redis]>=5.4.0", + "redis>=5.2.0", + "httpx>=0.27.2", + "structlog>=24.4.0", + "python-multipart>=0.0.18", + "boto3>=1.35.0", + "stripe>=11.3.0", + "PyJWT>=2.10.0", + "cryptography>=43.0.0", + # PDF processing (from original requirements.txt) + "pypdf>=5.0.0", + "pdfplumber>=0.11.0", + "anthropic>=0.40.0", + "google-cloud-vision>=3.9.0", + "weasyprint>=62.0", + "textblob>=0.18.0", + "Pillow>=11.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.3.0", + "pytest-asyncio>=0.24.0", + "pytest-cov>=6.0.0", + "httpx>=0.27.2", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/docker-compose.yml b/docker-compose.yml index 35a6a50..bda4527 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,36 +1,73 @@ services: - web: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_DB: ${DB_NAME:-pdf_accessibility} + POSTGRES_USER: ${DB_USER:-pdf_accessibility} + POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme} + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-pdf_accessibility}"] + interval: 5s + timeout: 5s + retries: 5 + + redis: + image: redis:7-alpine + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + + minio: + image: minio/minio:latest + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: ${STORAGE_ACCESS_KEY:-minioadmin} + MINIO_ROOT_PASSWORD: ${STORAGE_SECRET_KEY:-minioadmin} + volumes: + - minio_data:/data + ports: + - "9000:9000" + - "9001:9001" + + api: build: context: . - dockerfile: Dockerfile.web + dockerfile: backend/Dockerfile + environment: + - DB_HOST=postgres + - REDIS_URL=redis://redis:6379/0 + - STORAGE_ENDPOINT=http://minio:9000 + env_file: .env ports: - - "8000:80" - volumes: - - pdf-uploads:/app/uploads - - pdf-results:/app/results + - "8000:8000" depends_on: postgres: condition: service_healthy - env_file: .env - restart: unless-stopped + redis: + condition: service_healthy - postgres: - image: postgres:16-alpine - volumes: - - pg-data:/var/lib/postgresql/data - - ./db/init.sql:/docker-entrypoint-initdb.d/init.sql + celery: + build: + context: . + dockerfile: backend/Dockerfile + command: uv run celery -A app.services.queue.celery_app worker --loglevel=info -c 2 environment: - POSTGRES_DB: ${DB_NAME:-pdf_checker} - POSTGRES_USER: ${DB_USER:-pdf_checker} - POSTGRES_PASSWORD: ${DB_PASSWORD:-dev_password} - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-pdf_checker}"] - interval: 10s - timeout: 3s - retries: 3 - restart: unless-stopped + - DB_HOST=postgres + - REDIS_URL=redis://redis:6379/0 + - STORAGE_ENDPOINT=http://minio:9000 + env_file: .env + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy volumes: - pdf-uploads: - pdf-results: - pg-data: + postgres_data: + minio_data: