marriott-box-image-video-ta.../auth.py
DJP 1f2c2ff8e1 Multi-token + fuzzy search; admin-only Run Now / Backfill
Search:
- Previously /api/events did one ILIKE %q% across the columns, so
  "female city" required the literal substring "female city" to
  appear somewhere. Now the query is tokenised on whitespace; every
  token must match somewhere (AND), and each token matches either
  by substring (ILIKE) across the searched columns OR by trigram
  similarity (pg_trgm) against a concatenated text blob with a 0.3
  threshold — handles typos like "femalle" → "female".
- Results ranked by summed similarity score across all tokens, then
  recency. Empty query falls back to "newest 100".
- schema.sql: CREATE EXTENSION IF NOT EXISTS pg_trgm (idempotent;
  applied by ensure_schema on api startup).

Admin gating:
- auth.py: User now carries `is_admin`. Computed from a
  comma-separated ADMIN_EMAILS env var (case-insensitive match
  against `preferred_username`/`upn`/`email` claim). New
  `require_admin` FastAPI dependency 403s non-admins.
- In DEV_AUTH_BYPASS mode the dev user is admin by default; flip
  DEV_AUTH_IS_ADMIN=false to test the read-only UX without enabling
  SSO.
- POST /api/runs and POST /api/backfill now gated by require_admin.
- /api/me carries is_admin so the SPA can hide the destructive
  buttons for non-admins.

Frontend:
- App.tsx fetches /api/me on mount and hides Run Now + Backfill
  unless `is_admin` is true. Non-admins still see search + results +
  recent-runs table.

docker-compose / .env.example: thread ADMIN_EMAILS +
DEV_AUTH_IS_ADMIN into the api container.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 15:51:50 -04:00

147 lines
4.9 KiB
Python

"""
Azure AD (Entra ID) bearer-token auth for the FastAPI backend.
- DEV_AUTH_BYPASS=true → skip all validation, return a fixed dev user.
- Otherwise: extract Bearer token, fetch the tenant's JWKS once and cache it,
verify the JWT signature, and check `aud` matches AZURE_CLIENT_ID.
"""
import os
import time
from typing import Optional
import httpx
import jwt
from fastapi import Depends, Header, HTTPException, status
from jwt import PyJWKClient
AZURE_TENANT_ID = os.getenv("AZURE_TENANT_ID", "").strip()
AZURE_CLIENT_ID = os.getenv("AZURE_CLIENT_ID", "").strip()
DEV_AUTH_BYPASS = os.getenv("DEV_AUTH_BYPASS", "").strip().lower() in ("1", "true", "yes")
# Comma-separated allowlist of admin emails. Case-insensitive. Members can
# trigger destructive endpoints (Run now, Backfill). Everyone else can read.
_ADMIN_EMAILS = {
e.strip().lower()
for e in os.getenv("ADMIN_EMAILS", "").split(",")
if e.strip()
}
# In bypass mode the dev user is admin by default — set DEV_AUTH_IS_ADMIN=false
# to test the non-admin UX without flipping to MSAL.
_DEV_AUTH_IS_ADMIN = os.getenv("DEV_AUTH_IS_ADMIN", "true").strip().lower() in ("1", "true", "yes")
JWKS_URL = f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/discovery/v2.0/keys" if AZURE_TENANT_ID else None
ISSUERS = (
f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/v2.0",
f"https://sts.windows.net/{AZURE_TENANT_ID}/",
)
_jwks_client: Optional[PyJWKClient] = None
def _get_jwks_client() -> PyJWKClient:
global _jwks_client
if _jwks_client is None:
if not JWKS_URL:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AZURE_TENANT_ID not configured on the server",
)
_jwks_client = PyJWKClient(JWKS_URL)
return _jwks_client
class User:
def __init__(self, *, oid: str, name: str, email: str, dev: bool = False, is_admin: bool = False):
self.oid = oid
self.name = name
self.email = email
self.dev = dev
self.is_admin = is_admin
def to_dict(self):
return {
"oid": self.oid,
"name": self.name,
"email": self.email,
"dev": self.dev,
"is_admin": self.is_admin,
}
def _is_admin_email(email: str) -> bool:
return bool(email) and email.strip().lower() in _ADMIN_EMAILS
def _bypass_user() -> User:
email = os.getenv("DEV_AUTH_EMAIL", "dev@oliver.agency")
return User(
oid="dev-bypass",
name=os.getenv("DEV_AUTH_NAME", "Dev User"),
email=email,
dev=True,
is_admin=_DEV_AUTH_IS_ADMIN or _is_admin_email(email),
)
def require_auth(authorization: Optional[str] = Header(default=None)) -> User:
"""
FastAPI dependency. Validates the Bearer token and returns a User, or
raises 401. Honors DEV_AUTH_BYPASS for local/dev use.
"""
if DEV_AUTH_BYPASS:
return _bypass_user()
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing bearer token")
token = authorization.split(" ", 1)[1].strip()
if not AZURE_TENANT_ID or not AZURE_CLIENT_ID:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Server missing AZURE_TENANT_ID / AZURE_CLIENT_ID",
)
try:
signing_key = _get_jwks_client().get_signing_key_from_jwt(token).key
# Accept either v2.0 or v1.0 issuer URLs.
claims = jwt.decode(
token,
signing_key,
algorithms=["RS256"],
audience=AZURE_CLIENT_ID,
issuer=list(ISSUERS),
options={"verify_aud": True, "verify_iss": True, "verify_exp": True},
)
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}")
except httpx.HTTPError as e:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"JWKS fetch failed: {e}")
email = claims.get("preferred_username") or claims.get("upn") or claims.get("email", "")
return User(
oid=claims.get("oid") or claims.get("sub", "unknown"),
name=claims.get("name", ""),
email=email,
is_admin=_is_admin_email(email),
)
def require_admin(user: User = Depends(require_auth)) -> User:
"""403 unless the caller is in ADMIN_EMAILS (or is a bypass-admin dev user)."""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin-only endpoint",
)
return user
def maybe_auth_info():
"""Diagnostic helper for /api/health: report whether auth is wired."""
return {
"dev_bypass": DEV_AUTH_BYPASS,
"tenant_configured": bool(AZURE_TENANT_ID),
"client_configured": bool(AZURE_CLIENT_ID),
"admin_emails_configured": len(_ADMIN_EMAILS),
}