""" Authentication Middleware for Flask application. Python equivalent of AuthMiddleware.php from MSAL specification. """ import os import json from datetime import datetime, timedelta from functools import wraps from typing import Dict, Any, Optional, Tuple from flask import request, jsonify, make_response, g from jwt_validator import JWTValidator class AuthMiddleware: """Authentication middleware for Flask with Azure AD JWT validation and httpOnly cookie management.""" def __init__(self, app=None, tenant_id: str = None, client_id: str = None): self.tenant_id = tenant_id or os.getenv('AZURE_TENANT_ID', 'e519c2e6-bc6d-4fdf-8d9c-923c2f002385') self.client_id = client_id or os.getenv('AZURE_CLIENT_ID', '9079054c-9620-4757-a256-23413042f1ef') self.jwt_validator = JWTValidator(self.tenant_id, self.client_id) self.cookie_name = 'ai_qc_auth_token' if app: self.init_app(app) def init_app(self, app): """Initialize the middleware with Flask app.""" self.app = app app.auth_middleware = self # Set secure cookie defaults based on environment app.config.setdefault('SESSION_COOKIE_SECURE', os.getenv('FLASK_ENV') == 'production') app.config.setdefault('SESSION_COOKIE_HTTPONLY', True) app.config.setdefault('SESSION_COOKIE_SAMESITE', 'Lax') def require_auth(self, f): """ Decorator to require authentication for protected routes. Similar to AuthMiddleware->requireAuth() in PHP version. """ @wraps(f) def decorated_function(*args, **kwargs): auth_result = self.is_authenticated() if not auth_result['authenticated']: return jsonify({ 'error': 'Authentication required', 'message': auth_result['error'], 'authenticated': False }), 401 # Store user info in Flask's g object for use in route handlers g.user = auth_result['user'] g.token_payload = auth_result['payload'] return f(*args, **kwargs) return decorated_function def is_authenticated(self) -> Dict[str, Any]: """ Check if current request is authenticated. Returns dict with authentication status and user info. """ try: # Try to get token from httpOnly cookie token = request.cookies.get(self.cookie_name) if not token: return { 'authenticated': False, 'error': 'No authentication token found', 'user': None, 'payload': None } # Validate token payload = self.jwt_validator.validate_token(token) # Check if token is expired if self.jwt_validator.is_token_expired(payload): return { 'authenticated': False, 'error': 'Authentication token has expired', 'user': None, 'payload': None } # Extract user information user_info = self.jwt_validator.get_user_info(payload) return { 'authenticated': True, 'error': None, 'user': user_info, 'payload': payload } except Exception as e: return { 'authenticated': False, 'error': f'Token validation failed: {str(e)}', 'user': None, 'payload': None } def set_auth_token(self, token: str): """ Validate and store authentication token in httpOnly cookie. Returns Flask response with cookie set. """ try: # Validate token before storing payload = self.jwt_validator.validate_token(token) # Create response with httpOnly cookie response = make_response(jsonify({ 'success': True, 'message': 'Authentication successful', 'authenticated': True, 'user': self.jwt_validator.get_user_info(payload) })) # Set httpOnly cookie with security flags self._set_secure_cookie(response, token, payload) return response except Exception as e: return make_response(jsonify({ 'success': False, 'error': f'Token validation failed: {str(e)}', 'authenticated': False }), 401) def clear_auth_token(self): """Clear authentication cookie and return response.""" response = make_response(jsonify({ 'success': True, 'message': 'Logged out successfully', 'authenticated': False })) # Clear the authentication cookie response.set_cookie( self.cookie_name, '', expires=0, path='/', domain='', secure=self._is_secure_context(), httponly=True, samesite='Lax' ) return response def _set_secure_cookie(self, response, token: str, payload: Dict[str, Any]): """Set httpOnly cookie with proper security flags.""" # Calculate expiration time (24 hours or token expiration, whichever is sooner) token_exp = payload.get('exp') max_age = 24 * 60 * 60 # 24 hours in seconds if token_exp: current_time = datetime.utcnow().timestamp() token_remaining = token_exp - current_time max_age = min(max_age, int(token_remaining)) # Set secure cookie response.set_cookie( self.cookie_name, token, max_age=max_age, path='/', domain='', secure=self._is_secure_context(), httponly=True, samesite='Lax' ) def _is_secure_context(self) -> bool: """Determine if we're in a secure context (HTTPS).""" # Check various indicators of HTTPS if request.is_secure: return True # Check for common proxy headers if request.headers.get('X-Forwarded-Proto') == 'https': return True if request.headers.get('X-Forwarded-SSL') == 'on': return True # Check Flask environment if os.getenv('FLASK_ENV') == 'production': return True return False def get_auth_status(self) -> Dict[str, Any]: """Get current authentication status for API endpoint.""" auth_result = self.is_authenticated() response_data = { 'authenticated': auth_result['authenticated'], 'user': auth_result['user'] } if not auth_result['authenticated']: response_data['error'] = auth_result['error'] return response_data def validate_and_refresh_token(self) -> Dict[str, Any]: """ Validate current token and check if refresh is needed. This method can be called periodically to ensure token validity. """ auth_result = self.is_authenticated() if not auth_result['authenticated']: return auth_result # Check if token is close to expiration (within 5 minutes) payload = auth_result['payload'] exp = payload.get('exp') if exp: current_time = datetime.utcnow().timestamp() time_to_expire = exp - current_time if time_to_expire < 300: # 5 minutes return { 'authenticated': True, 'user': auth_result['user'], 'payload': payload, 'refresh_needed': True, 'expires_in': int(time_to_expire) } return { 'authenticated': True, 'user': auth_result['user'], 'payload': payload, 'refresh_needed': False }