apac-ops-bot/backend/app/core/security.py
SamoilenkoVadym 8c770dbfa9 Implement MSAL authentication system with JWT sessions
Authentication Core:
- Security utilities: JWT token creation, validation, hashing
- AuthService: Azure AD token validation via Microsoft Graph API
- User session management with access/refresh tokens
- Token expiration handling (1 hour access, 7 days refresh)

API Endpoints:
- POST /api/v1/auth/login - Login with Azure AD MSAL token
- POST /api/v1/auth/refresh - Refresh access token
- POST /api/v1/auth/logout - Logout and invalidate session
- GET /api/v1/auth/me - Get current user info
- GET /api/v1/auth/health - Auth service health check

Middleware:
- get_current_user: Extract and validate user from Bearer token
- get_current_active_user: Ensure user is active
- get_current_admin_user: Require admin role
- get_optional_user: Optional authentication

Security Features:
- JWT with HS256 signing
- Token hashing with bcrypt for storage
- Session validation with expiration checks
- Microsoft Graph API integration for Azure AD validation
- IP address and user agent tracking
- Active session management

Schemas:
- LoginRequest/Response with tokens and user info
- RefreshTokenRequest/Response
- UserInfo for current user details
- LogoutResponse

Main App Updates:
- Connected auth router to /api/v1/auth
- All authentication endpoints now accessible

Dependencies Added:
- pyjwt for JWT handling
- httpx for async HTTP requests to Microsoft Graph

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-27 14:33:28 +00:00

137 lines
2.6 KiB
Python

"""
Security utilities
JWT token management and password hashing
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict
import jwt
from passlib.context import CryptContext
from app.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
data: Dict,
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create JWT access token
Args:
data: Token payload data
expires_delta: Optional expiration time delta
Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=1)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm="HS256"
)
return encoded_jwt
def create_refresh_token(data: Dict) -> str:
"""
Create JWT refresh token (longer expiration)
Args:
data: Token payload data
Returns:
Encoded JWT refresh token
"""
return create_access_token(
data,
expires_delta=timedelta(days=7)
)
def decode_token(token: str) -> Optional[Dict]:
"""
Decode and verify JWT token
Args:
token: JWT token string
Returns:
Decoded token payload or None if invalid
"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=["HS256"]
)
return payload
except jwt.ExpiredSignatureError:
logger.warning("Token has expired")
return None
except jwt.InvalidTokenError as e:
logger.warning(f"Invalid token: {e}")
return None
def hash_token(token: str) -> str:
"""
Hash token for secure storage
Args:
token: Token string to hash
Returns:
Hashed token
"""
return pwd_context.hash(token)
def verify_token_hash(token: str, hashed: str) -> bool:
"""
Verify token against hash
Args:
token: Plain token string
hashed: Hashed token
Returns:
True if token matches hash
"""
return pwd_context.verify(token, hashed)
def get_token_subject(token: str) -> Optional[str]:
"""
Extract subject (user_id) from token
Args:
token: JWT token string
Returns:
User ID or None
"""
payload = decode_token(token)
if payload:
return payload.get("sub")
return None