Authentication Core: - Security utilities: JWT token creation, validation, hashing - AuthService: Azure AD token validation via Microsoft Graph API - User session management with access/refresh tokens - Token expiration handling (1 hour access, 7 days refresh) API Endpoints: - POST /api/v1/auth/login - Login with Azure AD MSAL token - POST /api/v1/auth/refresh - Refresh access token - POST /api/v1/auth/logout - Logout and invalidate session - GET /api/v1/auth/me - Get current user info - GET /api/v1/auth/health - Auth service health check Middleware: - get_current_user: Extract and validate user from Bearer token - get_current_active_user: Ensure user is active - get_current_admin_user: Require admin role - get_optional_user: Optional authentication Security Features: - JWT with HS256 signing - Token hashing with bcrypt for storage - Session validation with expiration checks - Microsoft Graph API integration for Azure AD validation - IP address and user agent tracking - Active session management Schemas: - LoginRequest/Response with tokens and user info - RefreshTokenRequest/Response - UserInfo for current user details - LogoutResponse Main App Updates: - Connected auth router to /api/v1/auth - All authentication endpoints now accessible Dependencies Added: - pyjwt for JWT handling - httpx for async HTTP requests to Microsoft Graph Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
130 lines
3.1 KiB
Python
130 lines
3.1 KiB
Python
"""
|
|
Authentication Middleware
|
|
|
|
FastAPI dependencies for authentication and authorization
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database import get_db
|
|
from app.services.auth_service import AuthService
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# HTTP Bearer token security
|
|
security = HTTPBearer()
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: AsyncSession = Depends(get_db)
|
|
) -> User:
|
|
"""
|
|
Get current authenticated user from access token
|
|
|
|
Args:
|
|
credentials: HTTP authorization credentials
|
|
db: Database session
|
|
|
|
Returns:
|
|
Current user instance
|
|
|
|
Raises:
|
|
HTTPException: If authentication fails
|
|
"""
|
|
token = credentials.credentials
|
|
|
|
auth_service = AuthService(db)
|
|
|
|
# Get user from token
|
|
user = await auth_service.get_current_user(token)
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Verify session is valid
|
|
is_valid = await auth_service.verify_session(token)
|
|
|
|
if not is_valid:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Session expired or invalid",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
logger.debug(f"Authenticated user: {user.id}")
|
|
return user
|
|
|
|
|
|
async def get_current_active_user(
|
|
current_user: User = Depends(get_current_user)
|
|
) -> User:
|
|
"""
|
|
Get current active user (must be active)
|
|
|
|
Args:
|
|
current_user: Current user from get_current_user
|
|
|
|
Returns:
|
|
Current active user
|
|
|
|
Raises:
|
|
HTTPException: If user is inactive
|
|
"""
|
|
if not current_user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="User account is inactive"
|
|
)
|
|
|
|
return current_user
|
|
|
|
|
|
async def get_current_admin_user(
|
|
current_user: User = Depends(get_current_active_user)
|
|
) -> User:
|
|
"""
|
|
Get current admin user (must have admin role)
|
|
|
|
Args:
|
|
current_user: Current active user
|
|
|
|
Returns:
|
|
Current admin user
|
|
|
|
Raises:
|
|
HTTPException: If user is not admin
|
|
"""
|
|
if current_user.role not in ["admin", "superadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Insufficient permissions. Admin access required."
|
|
)
|
|
|
|
return current_user
|
|
|
|
|
|
def get_optional_user(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)
|
|
) -> Optional[str]:
|
|
"""
|
|
Get optional user token (doesn't raise exception if not authenticated)
|
|
|
|
Args:
|
|
credentials: Optional HTTP authorization credentials
|
|
|
|
Returns:
|
|
Access token or None
|
|
"""
|
|
if credentials:
|
|
return credentials.credentials
|
|
return None
|