ac-tool/backend/server/auth/middleware.py
Vadym Samoilenko 8da149b84e Migrate storage from JSON files to PostgreSQL (asyncpg)
- Add asyncpg connection pool (db/pool.py) with JSONB codec registration
- Add schema.sql with users, clients, dropdown_categories, export_templates, sheets tables
- Add migrate_json.py one-time migration script for existing JSON data
- Rewrite user_store, sheets/manager, api/clients, api/dropdowns, api/export as async DB-backed
- Update all callers (auth, sheets, admin, ai_command, export) to await async functions
- Add postgres:16-alpine service to docker-compose with named volume and health check
- App container depends_on postgres; DATABASE_URL injected via env
- Schema applied automatically on startup; global categories seeded if DB is empty

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 19:51:37 +00:00

140 lines
4.7 KiB
Python

"""
Authentication middleware — decorators for Quart routes.
Includes @auth_required, @admin_required, @dev_mode_bypass.
"""
import logging
from functools import wraps
from typing import Optional, Dict, Any, Callable
from quart import request, jsonify, g
from .msal_auth import msal_auth
from .user_store import upsert_user, get_user
from ..config_runtime import server_config
logger = logging.getLogger(__name__)
def _check_emergency_token(token: str) -> Optional[Dict[str, Any]]:
"""If EMERGENCY_TOKEN is configured and matches, return a synthetic admin user."""
et = server_config.EMERGENCY_TOKEN
if not et or not token:
return None
import hmac
if hmac.compare_digest(token, et):
email = server_config.EMERGENCY_USER_EMAIL
return {
'oid': f'emergency-{email}',
'preferred_username': email,
'name': server_config.EMERGENCY_USER_NAME,
}
return None
async def _extract_token_user() -> Optional[Dict[str, Any]]:
"""Extract and validate Bearer token from Authorization header or ?_token= query param."""
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
else:
token = request.args.get('_token', '')
if not token:
return None
emergency = _check_emergency_token(token)
if emergency:
return emergency
return await msal_auth.validate_token(token)
async def _resolve_user(token_user: Dict) -> Dict:
"""
Merge token claims with DB user store.
Creates the user record on first login; enriches token info with role.
"""
user_id = token_user['oid']
email = token_user.get('preferred_username', '')
name = token_user.get('name', '')
stored = await upsert_user(user_id, email, name)
return {**token_user, 'role': stored.get('role', 'user'), 'active': stored.get('active', True)}
def auth_required(f: Callable) -> Callable:
"""Require a valid Bearer token. Sets g.current_user."""
@wraps(f)
async def wrapper(*args, **kwargs):
if server_config.DEV_MODE:
role = server_config.DEV_USER_ROLE
g.current_user = {
'oid': server_config.DEV_USER_ID,
'preferred_username': server_config.DEV_USER_EMAIL,
'name': server_config.DEV_USER_NAME,
'role': role,
'active': True,
}
await upsert_user(
server_config.DEV_USER_ID,
server_config.DEV_USER_EMAIL,
server_config.DEV_USER_NAME,
role=role,
)
else:
token_user = await _extract_token_user()
if not token_user:
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
user = await _resolve_user(token_user)
if not user.get('active', True):
return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403
g.current_user = user
return await f(*args, **kwargs)
return wrapper
# Keep old name for compatibility with brief-extractor blueprints
dev_mode_bypass = auth_required
def admin_required(f: Callable) -> Callable:
"""Require admin role. Must be used after @auth_required."""
@wraps(f)
async def wrapper(*args, **kwargs):
if server_config.DEV_MODE:
role = server_config.DEV_USER_ROLE
g.current_user = {
'oid': server_config.DEV_USER_ID,
'preferred_username': server_config.DEV_USER_EMAIL,
'name': server_config.DEV_USER_NAME,
'role': role,
'active': True,
}
await upsert_user(
server_config.DEV_USER_ID,
server_config.DEV_USER_EMAIL,
server_config.DEV_USER_NAME,
role=role,
)
else:
token_user = await _extract_token_user()
if not token_user:
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
user = await _resolve_user(token_user)
if not user.get('active', True):
return jsonify({'error': 'forbidden', 'message': 'Account deactivated'}), 403
g.current_user = user
if g.current_user.get('role') != 'admin':
return jsonify({'error': 'forbidden', 'message': 'Admin access required'}), 403
return await f(*args, **kwargs)
return wrapper
def get_user_id() -> str:
user = getattr(g, 'current_user', None)
return user.get('oid', 'anonymous') if user else 'anonymous'
async def get_current_user() -> Optional[Dict[str, Any]]:
return getattr(g, 'current_user', None)