- Add asyncpg connection pool (db/pool.py) with JSONB codec registration - Add schema.sql with users, clients, dropdown_categories, export_templates, sheets tables - Add migrate_json.py one-time migration script for existing JSON data - Rewrite user_store, sheets/manager, api/clients, api/dropdowns, api/export as async DB-backed - Update all callers (auth, sheets, admin, ai_command, export) to await async functions - Add postgres:16-alpine service to docker-compose with named volume and health check - App container depends_on postgres; DATABASE_URL injected via env - Schema applied automatically on startup; global categories seeded if DB is empty Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
140 lines
4.7 KiB
Python
140 lines
4.7 KiB
Python
"""
|
|
Authentication middleware — decorators for Quart routes.
|
|
Includes @auth_required, @admin_required, @dev_mode_bypass.
|
|
"""
|
|
|
|
import logging
|
|
from functools import wraps
|
|
from typing import Optional, Dict, Any, Callable
|
|
|
|
from quart import request, jsonify, g
|
|
|
|
from .msal_auth import msal_auth
|
|
from .user_store import upsert_user, get_user
|
|
from ..config_runtime import server_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _check_emergency_token(token: str) -> Optional[Dict[str, Any]]:
|
|
"""If EMERGENCY_TOKEN is configured and matches, return a synthetic admin user."""
|
|
et = server_config.EMERGENCY_TOKEN
|
|
if not et or not token:
|
|
return None
|
|
import hmac
|
|
if hmac.compare_digest(token, et):
|
|
email = server_config.EMERGENCY_USER_EMAIL
|
|
return {
|
|
'oid': f'emergency-{email}',
|
|
'preferred_username': email,
|
|
'name': server_config.EMERGENCY_USER_NAME,
|
|
}
|
|
return None
|
|
|
|
|
|
async def _extract_token_user() -> Optional[Dict[str, Any]]:
|
|
"""Extract and validate Bearer token from Authorization header or ?_token= query param."""
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if auth_header.startswith('Bearer '):
|
|
token = auth_header[7:]
|
|
else:
|
|
token = request.args.get('_token', '')
|
|
if not token:
|
|
return None
|
|
emergency = _check_emergency_token(token)
|
|
if emergency:
|
|
return emergency
|
|
return await msal_auth.validate_token(token)
|
|
|
|
|
|
async def _resolve_user(token_user: Dict) -> Dict:
|
|
"""
|
|
Merge token claims with DB user store.
|
|
Creates the user record on first login; enriches token info with role.
|
|
"""
|
|
user_id = token_user['oid']
|
|
email = token_user.get('preferred_username', '')
|
|
name = token_user.get('name', '')
|
|
|
|
stored = await upsert_user(user_id, email, name)
|
|
return {**token_user, 'role': stored.get('role', 'user'), 'active': stored.get('active', True)}
|
|
|
|
|
|
def auth_required(f: Callable) -> Callable:
|
|
"""Require a valid Bearer token. Sets g.current_user."""
|
|
@wraps(f)
|
|
async def wrapper(*args, **kwargs):
|
|
if server_config.DEV_MODE:
|
|
role = server_config.DEV_USER_ROLE
|
|
g.current_user = {
|
|
'oid': server_config.DEV_USER_ID,
|
|
'preferred_username': server_config.DEV_USER_EMAIL,
|
|
'name': server_config.DEV_USER_NAME,
|
|
'role': role,
|
|
'active': True,
|
|
}
|
|
await upsert_user(
|
|
server_config.DEV_USER_ID,
|
|
server_config.DEV_USER_EMAIL,
|
|
server_config.DEV_USER_NAME,
|
|
role=role,
|
|
)
|
|
else:
|
|
token_user = await _extract_token_user()
|
|
if not token_user:
|
|
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
|
|
user = await _resolve_user(token_user)
|
|
if not user.get('active', True):
|
|
return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403
|
|
g.current_user = user
|
|
|
|
return await f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
# Keep old name for compatibility with brief-extractor blueprints
|
|
dev_mode_bypass = auth_required
|
|
|
|
|
|
def admin_required(f: Callable) -> Callable:
|
|
"""Require admin role. Must be used after @auth_required."""
|
|
@wraps(f)
|
|
async def wrapper(*args, **kwargs):
|
|
if server_config.DEV_MODE:
|
|
role = server_config.DEV_USER_ROLE
|
|
g.current_user = {
|
|
'oid': server_config.DEV_USER_ID,
|
|
'preferred_username': server_config.DEV_USER_EMAIL,
|
|
'name': server_config.DEV_USER_NAME,
|
|
'role': role,
|
|
'active': True,
|
|
}
|
|
await upsert_user(
|
|
server_config.DEV_USER_ID,
|
|
server_config.DEV_USER_EMAIL,
|
|
server_config.DEV_USER_NAME,
|
|
role=role,
|
|
)
|
|
else:
|
|
token_user = await _extract_token_user()
|
|
if not token_user:
|
|
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
|
|
user = await _resolve_user(token_user)
|
|
if not user.get('active', True):
|
|
return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403
|
|
g.current_user = user
|
|
|
|
if g.current_user.get('role') != 'admin':
|
|
return jsonify({'error': 'forbidden', 'message': 'Admin access required'}), 403
|
|
|
|
return await f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def get_user_id() -> str:
|
|
user = getattr(g, 'current_user', None)
|
|
return user.get('oid', 'anonymous') if user else 'anonymous'
|
|
|
|
|
|
async def get_current_user() -> Optional[Dict[str, Any]]:
|
|
return getattr(g, 'current_user', None)
|