ppt-tool/backend/api/v1/auth/router.py
Vadym Samoilenko f2f729a50b Switch Azure AD auth to MSAL SPA (browser-side token exchange)
- Replace server-side ConfidentialClientApplication + OAuth callback
  with MSAL browser popup flow (PKCE, no client_secret required)
- Backend: add POST /sso-token endpoint that validates Azure AD ID token
  via Microsoft JWKS, issues session cookie; remove /login + /callback
- Frontend: install @azure/msal-browser + @azure/msal-react, wrap app
  with MsalProvider, login page uses loginPopup() → sends id_token to backend
- Pass NEXT_PUBLIC_AZURE_* env vars through next.config.mjs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 12:34:52 +00:00

139 lines
4.1 KiB
Python

import os
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from jose import JWTError
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from services.database import get_async_session
from services.auth_service import AuthService
from api.middlewares.rate_limit_middleware import limiter
AUTH_ROUTER = APIRouter(prefix="/api/v1/auth", tags=["Auth"])
auth_service = AuthService()
class SSOTokenRequest(BaseModel):
id_token: str
class DevLoginRequest(BaseModel):
email: str
password: str
@AUTH_ROUTER.get("/dev-status")
async def dev_status():
"""Check auth modes available."""
return {
"dev_mode": auth_service.is_dev_mode,
"dev_login_enabled": auth_service.dev_login_enabled,
"azure_enabled": not auth_service.is_dev_mode,
}
@AUTH_ROUTER.post("/sso-token")
@limiter.limit("10/minute")
async def sso_token(
request: Request,
body: SSOTokenRequest,
session: AsyncSession = Depends(get_async_session),
):
"""Exchange Azure AD MSAL id_token for a session cookie (SPA flow)."""
if auth_service.is_dev_mode:
raise HTTPException(status_code=404, detail="Azure AD not configured")
try:
claims = await auth_service.validate_azure_token(body.id_token)
user = await auth_service.get_or_create_user(claims, session)
token = auth_service.create_session_jwt(user)
is_https = os.environ.get("COOKIE_SECURE", "false").lower() == "true"
response = JSONResponse(content={
"id": str(user.id),
"email": user.email,
"display_name": user.display_name,
"role": user.role,
})
response.set_cookie(
key="session_token",
value=token,
httponly=True,
secure=is_https,
samesite="lax",
max_age=86400,
)
return response
except JWTError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Authentication failed: {e}")
@AUTH_ROUTER.post("/dev-login")
@limiter.limit("3/minute")
async def dev_login(
request: Request,
body: DevLoginRequest,
session: AsyncSession = Depends(get_async_session),
):
"""Dev login with email and password. Available when DEV_AUTH_PASSWORD is set."""
if not auth_service.dev_login_enabled:
raise HTTPException(status_code=404, detail="Dev login not available")
user = await auth_service.dev_login(body.email, body.password, session)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
token = auth_service.create_session_jwt(user)
is_https = os.environ.get("COOKIE_SECURE", "false").lower() == "true"
response = JSONResponse(
content={
"id": str(user.id),
"email": user.email,
"display_name": user.display_name,
"role": user.role,
}
)
response.set_cookie(
key="session_token",
value=token,
httponly=True,
secure=is_https,
samesite="lax",
max_age=86400,
)
return response
@AUTH_ROUTER.get("/me")
async def get_current_user_info(
request: Request,
session: AsyncSession = Depends(get_async_session),
):
"""Return current authenticated user info."""
user = getattr(request.state, "user", None)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
from services.access_service import get_accessible_client_ids
client_ids = await get_accessible_client_ids(user, session)
primary_client_id = str(client_ids[0]) if client_ids else None
return {
"id": str(user.id),
"email": user.email,
"displayName": user.display_name,
"role": user.role,
"clientId": primary_client_id,
}
@AUTH_ROUTER.post("/logout")
async def logout():
"""Clear session cookie."""
response = JSONResponse(content={"message": "Logged out"})
response.delete_cookie("session_token")
return response