""" Authentication API Endpoints Handles login, logout, token refresh, and Microsoft SSO. """ from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.responses import JSONResponse from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel from typing import Optional import msal import os from app.core.database import get_db, UserRepository, AuditLogRepository from app.core.auth import ( verify_password, hash_password, create_tokens_response, verify_refresh_token, get_current_user_id, validate_azure_id_token ) from app.core.redis_client import RedisSessionStore router = APIRouter() # ===== Request/Response Models ===== class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): access_token: str refresh_token: str token_type: str expires_in: int user: dict class TokenRefreshRequest(BaseModel): refresh_token: str class LogoutRequest(BaseModel): session_id: Optional[str] = None class MicrosoftLoginRequest(BaseModel): id_token: str # ===== Local Authentication Endpoints ===== @router.post("/login", response_model=LoginResponse) async def login( login_data: LoginRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Local authentication - username/password login. Returns JWT tokens + user info. """ # Get user from database user = await UserRepository.get_by_username(db, login_data.username) # Validate user exists and password correct if not user or not user.password_hash: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password" ) if not verify_password(login_data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password" ) # Check if user is active if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) # Create JWT tokens tokens = create_tokens_response(user.id) # Create user session in Redis redis: RedisSessionStore = request.app.state.redis session_id = await redis.create_user_session( user_id=user.id, refresh_token=tokens["refresh_token"], ip_address=request.client.host, user_agent=request.headers.get("user-agent", "") ) # Update last login await UserRepository.update_last_login(db, user.id) # Log action await AuditLogRepository.log_action( db, user_id=user.id, action="login", details=f"Login from {request.client.host}" ) return LoginResponse( **tokens, user=user.to_dict() ) @router.post("/token/refresh") async def refresh_access_token( refresh_data: TokenRefreshRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Refresh access token using refresh token. """ # Verify refresh token try: user_id = verify_refresh_token(refresh_data.refresh_token) except HTTPException as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) # Check if user still exists and is active user = await UserRepository.get_by_id(db, user_id) if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive" ) # Create new tokens tokens = create_tokens_response(user.id) # Update Redis session with new refresh token redis: RedisSessionStore = request.app.state.redis # Note: We keep the old session_id but update the refresh token # In production, you might want to rotate session_id as well return { **tokens, "user": user.to_dict() } @router.post("/logout") async def logout( logout_data: LogoutRequest, request: Request, user_id: int = Depends(get_current_user_id), db: AsyncSession = Depends(get_db) ): """ Logout user - invalidate session in Redis. """ # Delete user session from Redis redis: RedisSessionStore = request.app.state.redis if logout_data.session_id: await redis.delete_user_session(logout_data.session_id) # Log action await AuditLogRepository.log_action( db, user_id=user_id, action="logout", details=f"Logout from {request.client.host}" ) return {"message": "Logged out successfully"} # ===== Microsoft SSO Endpoints (Client-Side Flow) ===== # Microsoft OAuth configuration AZURE_CLIENT_ID = os.getenv("AZURE_CLIENT_ID") AZURE_TENANT_ID = os.getenv("AZURE_TENANT_ID") @router.post("/microsoft/login", response_model=LoginResponse) async def login_with_microsoft( login_data: MicrosoftLoginRequest, request: Request, db: AsyncSession = Depends(get_db) ): """ Authenticate with Microsoft id_token (client-side MSAL flow). Frontend uses @azure/msal-browser to get id_token from Microsoft, then sends it here for validation. Backend validates the JWT signature and creates application JWT tokens for session management. Args: login_data: Request containing id_token from Microsoft request: HTTP request for client info db: Database session Returns: LoginResponse with application JWT tokens and user info Raises: HTTPException: If id_token is invalid or SSO not configured """ if not AZURE_CLIENT_ID or not AZURE_TENANT_ID: raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Microsoft SSO not configured" ) # Validate id_token (JWT from Azure AD) user_claims = validate_azure_id_token( login_data.id_token, AZURE_CLIENT_ID, AZURE_TENANT_ID ) # Extract user details from token claims username = user_claims.get("preferred_username") or user_claims.get("email") email = user_claims.get("email") full_name = user_claims.get("name") if not username: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not extract username from id_token" ) # Create or update user in database user = await UserRepository.get_by_username(db, username) if not user: # Create new SSO user user = await UserRepository.create_user( db, username=username, password_hash=None, # SSO users don't have passwords email=email, full_name=full_name, auth_method="sso" ) # Check if user is active if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) # Create JWT tokens (for our app, not Azure tokens) tokens = create_tokens_response(user.id) # Create user session in Redis redis: RedisSessionStore = request.app.state.redis session_id = await redis.create_user_session( user_id=user.id, refresh_token=tokens["refresh_token"], ip_address=request.client.host, user_agent=request.headers.get("user-agent", "") ) # Update last login await UserRepository.update_last_login(db, user.id) # Log action await AuditLogRepository.log_action( db, user_id=user.id, action="sso_login", details=f"SSO login (client-side MSAL) from {request.client.host}" ) return LoginResponse( **tokens, user=user.to_dict() ) # ===== User Info Endpoint ===== @router.get("/me") async def get_current_user( user_id: int = Depends(get_current_user_id), db: AsyncSession = Depends(get_db) ): """ Get current user info from JWT token. """ user = await UserRepository.get_by_id(db, user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user.to_dict() # ===== Admin Endpoints (for testing) ===== @router.post("/register") async def register_user( login_data: LoginRequest, db: AsyncSession = Depends(get_db) ): """ Register new user (for testing/development). In production, disable this or add admin auth. """ # Check if user already exists existing_user = await UserRepository.get_by_username(db, login_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) # Create new user password_hashed = hash_password(login_data.password) user = await UserRepository.create_user( db, username=login_data.username, password_hash=password_hashed, email=None, full_name=None, auth_method="local" ) return { "message": "User created successfully", "user": user.to_dict() }