amazon-transcreation/backend/app/auth/router.py
Vadym Samoilenko f60b7261b5 Add Azure AD MSAL SSO (SPA token exchange)
- Backend: Azure AD JWKS validator with 24h cache, new POST /api/v1/auth/sso/login
  endpoint, sso_login() in AuthService with auto-provisioning, password_hash made
  nullable, auth_provider column added, Alembic migration c1d2e3f4a5b6
- Frontend: @azure/msal-browser, msal.ts config singleton, ssoLogin() API function,
  login page updated with SSO button and redirect callback handling
- Deploy: frontend Dockerfile and docker-compose.prod.yml updated to bake Azure AD
  vars into the image at build time; deploy.sh validates SSO config on init/deploy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 18:08:46 +01:00

108 lines
3.8 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.schemas import (
LoginRequest,
RefreshRequest,
SSOLoginRequest,
TokenResponse,
UserClaims,
)
from app.auth.service import AuthService
from app.config import settings
from app.dependencies import get_current_user, get_db
from app.services.audit_service import AuditService
router = APIRouter(prefix="/auth", tags=["auth"])
auth_service = AuthService()
audit_service = AuditService()
@router.post("/login", response_model=TokenResponse)
async def login(
body: LoginRequest,
request: Request,
db: AsyncSession = Depends(get_db),
) -> TokenResponse:
"""Authenticate user and return access + refresh tokens."""
result = await auth_service.login(body.email, body.password, db)
if result is None:
await audit_service.log(
db, action="login_failed", entity_type="user", entity_id=body.email,
details={"reason": "Invalid credentials"},
ip_address=request.client.host if request.client else None,
)
await db.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
)
# Extract user_id from the access token claims
claims = auth_service.validate_token(result["access_token"])
user_id = claims["sub"] if claims else body.email
await audit_service.log(
db, action="login", entity_type="user", entity_id=str(user_id),
user_id=user_id if claims else None,
details={"email": body.email},
ip_address=request.client.host if request.client else None,
)
await db.commit()
return TokenResponse(**result)
@router.post("/sso/login", response_model=TokenResponse)
async def sso_login(
body: SSOLoginRequest,
request: Request,
db: AsyncSession = Depends(get_db),
) -> TokenResponse:
"""Authenticate via Azure AD ID token (SPA flow) and return app tokens."""
if not settings.AZURE_AD_SSO_ENABLED:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="SSO is not enabled on this server",
)
result = await auth_service.sso_login(body.token, db)
if result is None:
await audit_service.log(
db, action="sso_login_failed", entity_type="user", entity_id="unknown",
details={"reason": "SSO authentication failed"},
ip_address=request.client.host if request.client else None,
)
await db.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="SSO authentication failed",
)
claims = auth_service.validate_token(result["access_token"])
user_id = claims["sub"] if claims else "unknown"
await audit_service.log(
db, action="sso_login", entity_type="user", entity_id=str(user_id),
user_id=user_id if claims else None,
details={"provider": "azure_ad"},
ip_address=request.client.host if request.client else None,
)
await db.commit()
return TokenResponse(**result)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(body: RefreshRequest) -> TokenResponse:
"""Exchange a valid refresh token for a new token pair."""
result = auth_service.refresh_tokens(body.refresh_token)
if result is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token",
)
return TokenResponse(**result)
@router.get("/me", response_model=UserClaims)
async def get_me(
current_user: dict = Depends(get_current_user),
) -> UserClaims:
"""Return the current authenticated user's claims."""
return UserClaims(**current_user)