PDF-accessibility-saas/backend/app/routers/billing.py
Vadym Samoilenko fc6f4a12e6 Phase 2+3: FastAPI backend + multi-tenancy schema
Backend (replaces PHP api.php + auth.php):
- FastAPI app with routers: jobs, auth, billing
- Supabase JWT authentication in deps.py
- Celery + Redis job queue (process_pdf_task)
- MinIO S3-compatible storage service
- PDF checker wrapper (delegates to enterprise_pdf_checker.py)
- Stripe billing: checkout, portal, webhook handler

Multi-tenancy (Phase 3):
- Alembic migration 001: workspaces, workspace_members, jobs, usage_events
- Row-Level Security on all tenant tables via app.workspace_id session var
- Monthly quota enforcement per workspace (402 on exceeded)
- Plan tiers: free(5) / pro(100) / business(unlimited)

Config:
- pydantic-settings based config.py (no hardcoded values)
- docker-compose.yml rewritten: postgres, redis, minio, api, celery

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 14:46:05 +01:00

130 lines
4.7 KiB
Python

"""Billing router — Stripe webhook + subscription info."""
import stripe
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.db import get_db
from app.deps import CurrentUser, get_current_user
import structlog
settings = get_settings()
router = APIRouter(prefix="/api/v1/billing", tags=["billing"])
logger = structlog.get_logger()
PLAN_QUOTAS = {"free": 5, "pro": 100, "business": None} # None = unlimited
@router.get("/subscription")
async def get_subscription(
user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
row = await db.execute(
text("SELECT plan_tier, monthly_quota, stripe_customer_id FROM workspaces WHERE id = :wid"),
{"wid": user.workspace_id},
)
workspace = row.fetchone()
if not workspace:
raise HTTPException(status_code=404, detail="Workspace not found")
return {
"plan_tier": workspace.plan_tier,
"monthly_quota": workspace.monthly_quota,
}
@router.post("/checkout")
async def create_checkout_session(
price_id: str,
user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
stripe.api_key = settings.stripe_secret_key
if price_id not in [settings.stripe_price_pro, settings.stripe_price_business]:
raise HTTPException(status_code=400, detail="Invalid price ID")
row = await db.execute(
text("SELECT stripe_customer_id FROM workspaces WHERE id = :wid"),
{"wid": user.workspace_id},
)
workspace = row.fetchone()
customer_id = workspace.stripe_customer_id if workspace else None
session = stripe.checkout.Session.create(
mode="subscription",
customer=customer_id,
customer_email=None if customer_id else user.email,
line_items=[{"price": price_id, "quantity": 1}],
success_url=f"{settings.app_url}/settings/billing?success=1",
cancel_url=f"{settings.app_url}/pricing",
metadata={"workspace_id": user.workspace_id},
)
return {"checkout_url": session.url}
@router.post("/portal")
async def create_portal_session(
user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
stripe.api_key = settings.stripe_secret_key
row = await db.execute(
text("SELECT stripe_customer_id FROM workspaces WHERE id = :wid"),
{"wid": user.workspace_id},
)
workspace = row.fetchone()
if not workspace or not workspace.stripe_customer_id:
raise HTTPException(status_code=400, detail="No billing account found")
portal = stripe.billing_portal.Session.create(
customer=workspace.stripe_customer_id,
return_url=f"{settings.app_url}/settings/billing",
)
return {"portal_url": portal.url}
@router.post("/webhook")
async def stripe_webhook(request: Request, db: AsyncSession = Depends(get_db)):
payload = await request.body()
sig = request.headers.get("stripe-signature", "")
stripe.api_key = settings.stripe_secret_key
try:
event = stripe.Webhook.construct_event(payload, sig, settings.stripe_webhook_secret)
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
if event["type"] == "checkout.session.completed":
session = event["data"]["object"]
workspace_id = session["metadata"].get("workspace_id")
customer_id = session["customer"]
subscription_id = session["subscription"]
price_id = session["line_items"]["data"][0]["price"]["id"] if session.get("line_items") else None
# Determine tier from price_id
tier = "pro" if price_id == settings.stripe_price_pro else "business"
quota = PLAN_QUOTAS[tier] or 999999
await db.execute(
text("""
UPDATE workspaces
SET plan_tier=:tier, monthly_quota=:quota,
stripe_customer_id=:cid, stripe_subscription_id=:sid
WHERE id=:wid
"""),
{"tier": tier, "quota": quota, "cid": customer_id, "sid": subscription_id, "wid": workspace_id},
)
await db.commit()
logger.info("subscription_upgraded", workspace_id=workspace_id, tier=tier)
elif event["type"] in ("customer.subscription.deleted", "customer.subscription.paused"):
sub = event["data"]["object"]
customer_id = sub["customer"]
await db.execute(
text("UPDATE workspaces SET plan_tier='free', monthly_quota=5 WHERE stripe_customer_id=:cid"),
{"cid": customer_id},
)
await db.commit()
logger.info("subscription_downgraded", customer_id=customer_id)
return {"received": True}