ppt-tool/backend/api/middlewares/auth_middleware.py
Vadym Samoilenko f2f729a50b Switch Azure AD auth to MSAL SPA (browser-side token exchange)
- Replace server-side ConfidentialClientApplication + OAuth callback
  with MSAL browser popup flow (PKCE, no client_secret required)
- Backend: add POST /sso-token endpoint that validates Azure AD ID token
  via Microsoft JWKS, issues session cookie; remove /login + /callback
- Frontend: install @azure/msal-browser + @azure/msal-react, wrap app
  with MsalProvider, login page uses loginPopup() → sends id_token to backend
- Pass NEXT_PUBLIC_AZURE_* env vars through next.config.mjs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 12:34:52 +00:00

84 lines
2.5 KiB
Python

import uuid
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from services.auth_service import AuthService
from services.database import async_session_maker
from models.sql.user import UserModel
# Paths that skip authentication (exact or prefix match)
PUBLIC_PATHS = [
"/api/v1/auth/dev-status",
"/api/v1/auth/dev-login",
"/api/v1/auth/sso-token",
"/docs",
"/openapi.json",
"/api/health",
]
auth_service = AuthService()
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
path = request.url.path
# Skip auth for public paths
for public_path in PUBLIC_PATHS:
if path == public_path or path.startswith(public_path + "/"):
request.state.user = None
return await call_next(request)
# Skip auth for non-API paths (Next.js frontend routes)
if not path.startswith("/api/"):
request.state.user = None
return await call_next(request)
# Extract token from cookie or Authorization header
token = request.cookies.get("session_token")
if not token:
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
return JSONResponse(
status_code=401,
content={"detail": "Authentication required"},
)
# Validate JWT
claims = auth_service.validate_token(token)
if not claims:
return JSONResponse(
status_code=401,
content={"detail": "Invalid or expired token"},
)
# Load user from DB
try:
user_id = uuid.UUID(claims["sub"])
async with async_session_maker() as session:
user = await session.get(UserModel, user_id)
except (ValueError, KeyError):
return JSONResponse(
status_code=401,
content={"detail": "Invalid token payload"},
)
if not user:
return JSONResponse(
status_code=401,
content={"detail": "User not found"},
)
if not user.is_active:
return JSONResponse(
status_code=401,
content={"detail": "Account deactivated"},
)
request.state.user = user
return await call_next(request)