""" Authentication middleware — decorators for Quart routes. Includes @auth_required, @admin_required, @dev_mode_bypass. """ import logging from functools import wraps from typing import Optional, Dict, Any, Callable from quart import request, jsonify, g from .msal_auth import msal_auth from .user_store import upsert_user, get_user from ..config_runtime import server_config logger = logging.getLogger(__name__) def _check_emergency_token(token: str) -> Optional[Dict[str, Any]]: """If EMERGENCY_TOKEN is configured and matches, return a synthetic admin user.""" et = server_config.EMERGENCY_TOKEN if not et or not token: return None import hmac if hmac.compare_digest(token, et): email = server_config.EMERGENCY_USER_EMAIL return { 'oid': f'emergency-{email}', 'preferred_username': email, 'name': server_config.EMERGENCY_USER_NAME, } return None async def _extract_token_user() -> Optional[Dict[str, Any]]: """Extract and validate Bearer token from Authorization header or ?_token= query param.""" auth_header = request.headers.get('Authorization', '') if auth_header.startswith('Bearer '): token = auth_header[7:] else: token = request.args.get('_token', '') if not token: return None emergency = _check_emergency_token(token) if emergency: return emergency return await msal_auth.validate_token(token) async def _resolve_user(token_user: Dict) -> Dict: """ Merge token claims with DB user store. Creates the user record on first login; enriches token info with role. """ user_id = token_user['oid'] email = token_user.get('preferred_username', '') name = token_user.get('name', '') stored = await upsert_user(user_id, email, name) return {**token_user, 'role': stored.get('role', 'user'), 'active': stored.get('active', True)} def auth_required(f: Callable) -> Callable: """Require a valid Bearer token. Sets g.current_user.""" @wraps(f) async def wrapper(*args, **kwargs): if server_config.DEV_MODE: role = server_config.DEV_USER_ROLE g.current_user = { 'oid': server_config.DEV_USER_ID, 'preferred_username': server_config.DEV_USER_EMAIL, 'name': server_config.DEV_USER_NAME, 'role': role, 'active': True, } await upsert_user( server_config.DEV_USER_ID, server_config.DEV_USER_EMAIL, server_config.DEV_USER_NAME, role=role, ) else: token_user = await _extract_token_user() if not token_user: return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401 user = await _resolve_user(token_user) if not user.get('active', True): return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403 g.current_user = user return await f(*args, **kwargs) return wrapper # Keep old name for compatibility with brief-extractor blueprints dev_mode_bypass = auth_required def admin_required(f: Callable) -> Callable: """Require admin role. Must be used after @auth_required.""" @wraps(f) async def wrapper(*args, **kwargs): if server_config.DEV_MODE: role = server_config.DEV_USER_ROLE g.current_user = { 'oid': server_config.DEV_USER_ID, 'preferred_username': server_config.DEV_USER_EMAIL, 'name': server_config.DEV_USER_NAME, 'role': role, 'active': True, } await upsert_user( server_config.DEV_USER_ID, server_config.DEV_USER_EMAIL, server_config.DEV_USER_NAME, role=role, ) else: token_user = await _extract_token_user() if not token_user: return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401 user = await _resolve_user(token_user) if not user.get('active', True): return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403 g.current_user = user if g.current_user.get('role') != 'admin': return jsonify({'error': 'forbidden', 'message': 'Admin access required'}), 403 return await f(*args, **kwargs) return wrapper def get_user_id() -> str: user = getattr(g, 'current_user', None) return user.get('oid', 'anonymous') if user else 'anonymous' async def get_current_user() -> Optional[Dict[str, Any]]: return getattr(g, 'current_user', None)