Backend (replaces PHP api.php + auth.php): - FastAPI app with routers: jobs, auth, billing - Supabase JWT authentication in deps.py - Celery + Redis job queue (process_pdf_task) - MinIO S3-compatible storage service - PDF checker wrapper (delegates to enterprise_pdf_checker.py) - Stripe billing: checkout, portal, webhook handler Multi-tenancy (Phase 3): - Alembic migration 001: workspaces, workspace_members, jobs, usage_events - Row-Level Security on all tenant tables via app.workspace_id session var - Monthly quota enforcement per workspace (402 on exceeded) - Plan tiers: free(5) / pro(100) / business(unlimited) Config: - pydantic-settings based config.py (no hardcoded values) - docker-compose.yml rewritten: postgres, redis, minio, api, celery Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
130 lines
4.7 KiB
Python
130 lines
4.7 KiB
Python
"""Billing router — Stripe webhook + subscription info."""
|
|
import stripe
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.config import get_settings
|
|
from app.db import get_db
|
|
from app.deps import CurrentUser, get_current_user
|
|
import structlog
|
|
|
|
settings = get_settings()
|
|
router = APIRouter(prefix="/api/v1/billing", tags=["billing"])
|
|
logger = structlog.get_logger()
|
|
|
|
PLAN_QUOTAS = {"free": 5, "pro": 100, "business": None} # None = unlimited
|
|
|
|
|
|
@router.get("/subscription")
|
|
async def get_subscription(
|
|
user: CurrentUser = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
row = await db.execute(
|
|
text("SELECT plan_tier, monthly_quota, stripe_customer_id FROM workspaces WHERE id = :wid"),
|
|
{"wid": user.workspace_id},
|
|
)
|
|
workspace = row.fetchone()
|
|
if not workspace:
|
|
raise HTTPException(status_code=404, detail="Workspace not found")
|
|
return {
|
|
"plan_tier": workspace.plan_tier,
|
|
"monthly_quota": workspace.monthly_quota,
|
|
}
|
|
|
|
|
|
@router.post("/checkout")
|
|
async def create_checkout_session(
|
|
price_id: str,
|
|
user: CurrentUser = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
stripe.api_key = settings.stripe_secret_key
|
|
if price_id not in [settings.stripe_price_pro, settings.stripe_price_business]:
|
|
raise HTTPException(status_code=400, detail="Invalid price ID")
|
|
|
|
row = await db.execute(
|
|
text("SELECT stripe_customer_id FROM workspaces WHERE id = :wid"),
|
|
{"wid": user.workspace_id},
|
|
)
|
|
workspace = row.fetchone()
|
|
customer_id = workspace.stripe_customer_id if workspace else None
|
|
|
|
session = stripe.checkout.Session.create(
|
|
mode="subscription",
|
|
customer=customer_id,
|
|
customer_email=None if customer_id else user.email,
|
|
line_items=[{"price": price_id, "quantity": 1}],
|
|
success_url=f"{settings.app_url}/settings/billing?success=1",
|
|
cancel_url=f"{settings.app_url}/pricing",
|
|
metadata={"workspace_id": user.workspace_id},
|
|
)
|
|
return {"checkout_url": session.url}
|
|
|
|
|
|
@router.post("/portal")
|
|
async def create_portal_session(
|
|
user: CurrentUser = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
stripe.api_key = settings.stripe_secret_key
|
|
row = await db.execute(
|
|
text("SELECT stripe_customer_id FROM workspaces WHERE id = :wid"),
|
|
{"wid": user.workspace_id},
|
|
)
|
|
workspace = row.fetchone()
|
|
if not workspace or not workspace.stripe_customer_id:
|
|
raise HTTPException(status_code=400, detail="No billing account found")
|
|
|
|
portal = stripe.billing_portal.Session.create(
|
|
customer=workspace.stripe_customer_id,
|
|
return_url=f"{settings.app_url}/settings/billing",
|
|
)
|
|
return {"portal_url": portal.url}
|
|
|
|
|
|
@router.post("/webhook")
|
|
async def stripe_webhook(request: Request, db: AsyncSession = Depends(get_db)):
|
|
payload = await request.body()
|
|
sig = request.headers.get("stripe-signature", "")
|
|
stripe.api_key = settings.stripe_secret_key
|
|
|
|
try:
|
|
event = stripe.Webhook.construct_event(payload, sig, settings.stripe_webhook_secret)
|
|
except stripe.error.SignatureVerificationError:
|
|
raise HTTPException(status_code=400, detail="Invalid signature")
|
|
|
|
if event["type"] == "checkout.session.completed":
|
|
session = event["data"]["object"]
|
|
workspace_id = session["metadata"].get("workspace_id")
|
|
customer_id = session["customer"]
|
|
subscription_id = session["subscription"]
|
|
price_id = session["line_items"]["data"][0]["price"]["id"] if session.get("line_items") else None
|
|
|
|
# Determine tier from price_id
|
|
tier = "pro" if price_id == settings.stripe_price_pro else "business"
|
|
quota = PLAN_QUOTAS[tier] or 999999
|
|
|
|
await db.execute(
|
|
text("""
|
|
UPDATE workspaces
|
|
SET plan_tier=:tier, monthly_quota=:quota,
|
|
stripe_customer_id=:cid, stripe_subscription_id=:sid
|
|
WHERE id=:wid
|
|
"""),
|
|
{"tier": tier, "quota": quota, "cid": customer_id, "sid": subscription_id, "wid": workspace_id},
|
|
)
|
|
await db.commit()
|
|
logger.info("subscription_upgraded", workspace_id=workspace_id, tier=tier)
|
|
|
|
elif event["type"] in ("customer.subscription.deleted", "customer.subscription.paused"):
|
|
sub = event["data"]["object"]
|
|
customer_id = sub["customer"]
|
|
await db.execute(
|
|
text("UPDATE workspaces SET plan_tier='free', monthly_quota=5 WHERE stripe_customer_id=:cid"),
|
|
{"cid": customer_id},
|
|
)
|
|
await db.commit()
|
|
logger.info("subscription_downgraded", customer_id=customer_id)
|
|
|
|
return {"received": True}
|