solventum-image-metadata/backend/app/api/auth.py
SamoilenkoVadym 5f5c04471c feat(sso): migrate to client-side MSAL flow without client secret
Backend changes:
- Add PyJWT for Azure AD id_token validation
- Add validate_azure_id_token() function in core/auth.py
- Replace /microsoft/login and /microsoft/callback with /microsoft/login POST
- New endpoint validates id_token from frontend (no Graph API calls)
- Support PublicClientApplication (no client secret needed)

Frontend changes:
- Add @azure/msal-browser and @azure/msal-react dependencies
- Create msalConfig.ts with MSAL configuration
- Wrap App with MsalProvider
- Update LoginPage to use useMsal hook and loginPopup
- Remove OAuthCallback handler (MSAL handles redirect)
- Frontend gets id_token from Microsoft, sends to backend

Benefits:
-  Works without AZURE_CLIENT_SECRET (matches apac-ops-bot)
-  More secure (no secret in backend)
-  Simpler backend (just JWT validation)
-  Better UX (MSAL handles popups, silent refresh)

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-02-09 17:25:34 +00:00

347 lines
9 KiB
Python

"""
Authentication API Endpoints
Handles login, logout, token refresh, and Microsoft SSO.
"""
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from typing import Optional
import msal
import os
from app.core.database import get_db, UserRepository, AuditLogRepository
from app.core.auth import (
verify_password,
hash_password,
create_tokens_response,
verify_refresh_token,
get_current_user_id,
validate_azure_id_token
)
from app.core.redis_client import RedisSessionStore
router = APIRouter()
# ===== Request/Response Models =====
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str
expires_in: int
user: dict
class TokenRefreshRequest(BaseModel):
refresh_token: str
class LogoutRequest(BaseModel):
session_id: Optional[str] = None
class MicrosoftLoginRequest(BaseModel):
id_token: str
# ===== Local Authentication Endpoints =====
@router.post("/login", response_model=LoginResponse)
async def login(
login_data: LoginRequest,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Local authentication - username/password login.
Returns JWT tokens + user info.
"""
# Get user from database
user = await UserRepository.get_by_username(db, login_data.username)
# Validate user exists and password correct
if not user or not user.password_hash:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
if not verify_password(login_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
# Check if user is active
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled"
)
# Create JWT tokens
tokens = create_tokens_response(user.id)
# Create user session in Redis
redis: RedisSessionStore = request.app.state.redis
session_id = await redis.create_user_session(
user_id=user.id,
refresh_token=tokens["refresh_token"],
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", "")
)
# Update last login
await UserRepository.update_last_login(db, user.id)
# Log action
await AuditLogRepository.log_action(
db,
user_id=user.id,
action="login",
details=f"Login from {request.client.host}"
)
return LoginResponse(
**tokens,
user=user.to_dict()
)
@router.post("/token/refresh")
async def refresh_access_token(
refresh_data: TokenRefreshRequest,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Refresh access token using refresh token.
"""
# Verify refresh token
try:
user_id = verify_refresh_token(refresh_data.refresh_token)
except HTTPException as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
# Check if user still exists and is active
user = await UserRepository.get_by_id(db, user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive"
)
# Create new tokens
tokens = create_tokens_response(user.id)
# Update Redis session with new refresh token
redis: RedisSessionStore = request.app.state.redis
# Note: We keep the old session_id but update the refresh token
# In production, you might want to rotate session_id as well
return {
**tokens,
"user": user.to_dict()
}
@router.post("/logout")
async def logout(
logout_data: LogoutRequest,
request: Request,
user_id: int = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db)
):
"""
Logout user - invalidate session in Redis.
"""
# Delete user session from Redis
redis: RedisSessionStore = request.app.state.redis
if logout_data.session_id:
await redis.delete_user_session(logout_data.session_id)
# Log action
await AuditLogRepository.log_action(
db,
user_id=user_id,
action="logout",
details=f"Logout from {request.client.host}"
)
return {"message": "Logged out successfully"}
# ===== Microsoft SSO Endpoints (Client-Side Flow) =====
# Microsoft OAuth configuration
AZURE_CLIENT_ID = os.getenv("AZURE_CLIENT_ID")
AZURE_TENANT_ID = os.getenv("AZURE_TENANT_ID")
@router.post("/microsoft/login", response_model=LoginResponse)
async def login_with_microsoft(
login_data: MicrosoftLoginRequest,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Authenticate with Microsoft id_token (client-side MSAL flow).
Frontend uses @azure/msal-browser to get id_token from Microsoft,
then sends it here for validation. Backend validates the JWT signature
and creates application JWT tokens for session management.
Args:
login_data: Request containing id_token from Microsoft
request: HTTP request for client info
db: Database session
Returns:
LoginResponse with application JWT tokens and user info
Raises:
HTTPException: If id_token is invalid or SSO not configured
"""
if not AZURE_CLIENT_ID or not AZURE_TENANT_ID:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Microsoft SSO not configured"
)
# Validate id_token (JWT from Azure AD)
user_claims = validate_azure_id_token(
login_data.id_token,
AZURE_CLIENT_ID,
AZURE_TENANT_ID
)
# Extract user details from token claims
username = user_claims.get("preferred_username") or user_claims.get("email")
email = user_claims.get("email")
full_name = user_claims.get("name")
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not extract username from id_token"
)
# Create or update user in database
user = await UserRepository.get_by_username(db, username)
if not user:
# Create new SSO user
user = await UserRepository.create_user(
db,
username=username,
password_hash=None, # SSO users don't have passwords
email=email,
full_name=full_name,
auth_method="sso"
)
# Check if user is active
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled"
)
# Create JWT tokens (for our app, not Azure tokens)
tokens = create_tokens_response(user.id)
# Create user session in Redis
redis: RedisSessionStore = request.app.state.redis
session_id = await redis.create_user_session(
user_id=user.id,
refresh_token=tokens["refresh_token"],
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", "")
)
# Update last login
await UserRepository.update_last_login(db, user.id)
# Log action
await AuditLogRepository.log_action(
db,
user_id=user.id,
action="sso_login",
details=f"SSO login (client-side MSAL) from {request.client.host}"
)
return LoginResponse(
**tokens,
user=user.to_dict()
)
# ===== User Info Endpoint =====
@router.get("/me")
async def get_current_user(
user_id: int = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db)
):
"""
Get current user info from JWT token.
"""
user = await UserRepository.get_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user.to_dict()
# ===== Admin Endpoints (for testing) =====
@router.post("/register")
async def register_user(
login_data: LoginRequest,
db: AsyncSession = Depends(get_db)
):
"""
Register new user (for testing/development).
In production, disable this or add admin auth.
"""
# Check if user already exists
existing_user = await UserRepository.get_by_username(db, login_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
# Create new user
password_hashed = hash_password(login_data.password)
user = await UserRepository.create_user(
db,
username=login_data.username,
password_hash=password_hashed,
email=None,
full_name=None,
auth_method="local"
)
return {
"message": "User created successfully",
"user": user.to_dict()
}