Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI) into a single Docker app with React/TypeScript frontend. Features: - Brief upload → AI extraction → review → Activation Calendar import - Handsontable v17 spreadsheet with dependent dropdowns (148 categories) - AI natural language commands via Gemini (YOLO mode, voice input) - Azure AD MSAL SPA PKCE authentication, user roles (user/admin) - CSV Activation Calendar export - Real-time WebSocket job progress - Admin: user management, dropdown Excel upload - Multi-stage Dockerfile, docker-compose, nginx proxy instructions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
123 lines
4.2 KiB
Python
123 lines
4.2 KiB
Python
"""
|
|
Authentication middleware — decorators for Quart routes.
|
|
Includes @auth_required, @admin_required, @dev_mode_bypass.
|
|
"""
|
|
|
|
import logging
|
|
from functools import wraps
|
|
from typing import Optional, Dict, Any, Callable
|
|
|
|
from quart import request, jsonify, g
|
|
|
|
from .msal_auth import msal_auth
|
|
from .user_store import upsert_user, get_user
|
|
from ..config_runtime import server_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def _extract_token_user() -> Optional[Dict[str, Any]]:
|
|
"""Extract and validate Bearer token from Authorization header or ?_token= query param."""
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if auth_header.startswith('Bearer '):
|
|
token = auth_header[7:]
|
|
else:
|
|
# Fallback for browser download links (window.open) which can't set headers
|
|
token = request.args.get('_token', '')
|
|
if not token:
|
|
return None
|
|
return await msal_auth.validate_token(token)
|
|
|
|
|
|
async def _resolve_user(token_user: Dict) -> Dict:
|
|
"""
|
|
Merge token claims with our users.json store.
|
|
Creates the user record on first login; enriches token info with role.
|
|
"""
|
|
user_id = token_user['oid']
|
|
email = token_user.get('preferred_username', '')
|
|
name = token_user.get('name', '')
|
|
|
|
stored = upsert_user(user_id, email, name)
|
|
return {**token_user, 'role': stored.get('role', 'user'), 'active': stored.get('active', True)}
|
|
|
|
|
|
def auth_required(f: Callable) -> Callable:
|
|
"""Require a valid Bearer token. Sets g.current_user."""
|
|
@wraps(f)
|
|
async def wrapper(*args, **kwargs):
|
|
if server_config.DEV_MODE:
|
|
role = server_config.DEV_USER_ROLE
|
|
g.current_user = {
|
|
'oid': server_config.DEV_USER_ID,
|
|
'preferred_username': server_config.DEV_USER_EMAIL,
|
|
'name': server_config.DEV_USER_NAME,
|
|
'role': role,
|
|
'active': True,
|
|
}
|
|
# Ensure dev user exists in store
|
|
upsert_user(
|
|
server_config.DEV_USER_ID,
|
|
server_config.DEV_USER_EMAIL,
|
|
server_config.DEV_USER_NAME,
|
|
role=role,
|
|
)
|
|
else:
|
|
token_user = await _extract_token_user()
|
|
if not token_user:
|
|
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
|
|
user = await _resolve_user(token_user)
|
|
if not user.get('active', True):
|
|
return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403
|
|
g.current_user = user
|
|
|
|
return await f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
# Keep old name for compatibility with brief-extractor blueprints
|
|
dev_mode_bypass = auth_required
|
|
|
|
|
|
def admin_required(f: Callable) -> Callable:
|
|
"""Require admin role. Must be used after @auth_required."""
|
|
@wraps(f)
|
|
async def wrapper(*args, **kwargs):
|
|
if server_config.DEV_MODE:
|
|
role = server_config.DEV_USER_ROLE
|
|
g.current_user = {
|
|
'oid': server_config.DEV_USER_ID,
|
|
'preferred_username': server_config.DEV_USER_EMAIL,
|
|
'name': server_config.DEV_USER_NAME,
|
|
'role': role,
|
|
'active': True,
|
|
}
|
|
upsert_user(
|
|
server_config.DEV_USER_ID,
|
|
server_config.DEV_USER_EMAIL,
|
|
server_config.DEV_USER_NAME,
|
|
role=role,
|
|
)
|
|
else:
|
|
token_user = await _extract_token_user()
|
|
if not token_user:
|
|
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
|
|
user = await _resolve_user(token_user)
|
|
if not user.get('active', True):
|
|
return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403
|
|
g.current_user = user
|
|
|
|
if g.current_user.get('role') != 'admin':
|
|
return jsonify({'error': 'forbidden', 'message': 'Admin access required'}), 403
|
|
|
|
return await f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def get_user_id() -> str:
|
|
user = getattr(g, 'current_user', None)
|
|
return user.get('oid', 'anonymous') if user else 'anonymous'
|
|
|
|
|
|
async def get_current_user() -> Optional[Dict[str, Any]]:
|
|
return getattr(g, 'current_user', None)
|