ai_qc/backend/user_access.py
nickviljoen 6592c38b0a Add per-user client access control and admin management
Default-deny access model with admin grant/revoke via new User Access
tab. /api/clients filters by user grants; client-scoped endpoints
enforce access server-side. Admin role and client grants persist in
user_access.json with audit trail in usage logs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 12:33:09 +02:00

243 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
User access control module.
Stores per-user client visibility grants and admin role in user_access.json.
Users not listed fall back to default_clients (typically ['general']).
Admins see all clients regardless of their grant list.
"""
import os
import json
from datetime import datetime
from threading import Lock
ACCESS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_access.json')
BOOTSTRAP_ADMIN = 'nick.viljoen@brandtech.plus'
_lock = Lock()
def _default_data():
return {
'version': 1,
'default_clients': ['general'],
'admins': [BOOTSTRAP_ADMIN],
'users': {}
}
def _load():
"""Load access data. Creates the file with bootstrap defaults if missing or corrupt."""
if not os.path.exists(ACCESS_FILE):
data = _default_data()
_save(data)
print(f"[user_access] Initialized {ACCESS_FILE} with {BOOTSTRAP_ADMIN} as admin")
return data
try:
with open(ACCESS_FILE, 'r') as f:
data = json.load(f)
# Fill in any missing top-level keys so old files keep working
defaults = _default_data()
for key, value in defaults.items():
if key not in data:
data[key] = value
return data
except (json.JSONDecodeError, OSError) as e:
print(f"[user_access] ERROR reading {ACCESS_FILE}: {e} — falling back to defaults")
return _default_data()
def _save(data):
with open(ACCESS_FILE, 'w') as f:
json.dump(data, f, indent=2)
def _normalize(email):
return (email or '').strip().lower()
def get_user_clients(email):
"""Return list of client IDs the user can see. Admins see all."""
email = _normalize(email)
if not email:
return []
with _lock:
data = _load()
# Admins see everything
if email in [_normalize(a) for a in data.get('admins', [])]:
from client_config import get_all_clients
return list(get_all_clients().keys())
# Explicit grant
users = data.get('users', {})
for stored_email, entry in users.items():
if _normalize(stored_email) == email:
return list(entry.get('clients', []))
# Fall back to default
return list(data.get('default_clients', []))
def set_user_clients(target_email, clients, actor_email):
"""Grant exactly this set of clients to a user. Returns the audit entry."""
target_email = _normalize(target_email)
actor_email = _normalize(actor_email)
if not target_email:
raise ValueError("target_email is required")
# Validate clients exist
from client_config import get_all_clients
valid_clients = set(get_all_clients().keys())
for client_id in clients:
if client_id not in valid_clients:
raise ValueError(f"Unknown client: {client_id}")
with _lock:
data = _load()
# Find existing entry (case-insensitive) or add new one
existing_key = None
for stored_email in data.get('users', {}).keys():
if _normalize(stored_email) == target_email:
existing_key = stored_email
break
clients_before = list(data['users'].get(existing_key, {}).get('clients', [])) if existing_key else list(data.get('default_clients', []))
if existing_key and existing_key != target_email:
del data['users'][existing_key]
data['users'][target_email] = {
'clients': list(clients),
'updated_at': datetime.now().isoformat(),
'updated_by': actor_email
}
_save(data)
return {
'action': 'grant' if len(clients) >= len(clients_before) else 'revoke',
'target_email': target_email,
'actor_email': actor_email,
'clients_before': clients_before,
'clients_after': list(clients)
}
def is_admin(email):
"""Check if a user is an admin."""
email = _normalize(email)
if not email:
return False
with _lock:
data = _load()
return email in [_normalize(a) for a in data.get('admins', [])]
def promote_admin(target_email, actor_email):
"""Promote a user to admin. Returns audit entry."""
target_email = _normalize(target_email)
actor_email = _normalize(actor_email)
if not target_email:
raise ValueError("target_email is required")
with _lock:
data = _load()
admins_lower = [_normalize(a) for a in data.get('admins', [])]
if target_email in admins_lower:
return {
'action': 'promote_admin',
'target_email': target_email,
'actor_email': actor_email,
'no_op': True
}
data.setdefault('admins', []).append(target_email)
_save(data)
return {
'action': 'promote_admin',
'target_email': target_email,
'actor_email': actor_email
}
def demote_admin(target_email, actor_email):
"""
Demote an admin. At least one admin must remain.
Blocks self-demote if the actor is the last admin.
"""
target_email = _normalize(target_email)
actor_email = _normalize(actor_email)
if not target_email:
raise ValueError("target_email is required")
with _lock:
data = _load()
admins = data.get('admins', [])
admins_lower = [_normalize(a) for a in admins]
if target_email not in admins_lower:
return {
'action': 'demote_admin',
'target_email': target_email,
'actor_email': actor_email,
'no_op': True
}
if len(admins) <= 1:
raise ValueError("Cannot demote the last admin. Promote another user first.")
data['admins'] = [a for a in admins if _normalize(a) != target_email]
_save(data)
return {
'action': 'demote_admin',
'target_email': target_email,
'actor_email': actor_email
}
def list_access_entries():
"""
Return all explicit access entries plus admin flags.
Used by the admin panel to render the users table.
"""
with _lock:
data = _load()
admins_lower = [_normalize(a) for a in data.get('admins', [])]
entries = []
for email, entry in data.get('users', {}).items():
entries.append({
'email': email,
'clients': list(entry.get('clients', [])),
'is_admin': _normalize(email) in admins_lower,
'updated_at': entry.get('updated_at'),
'updated_by': entry.get('updated_by')
})
# Include admins who have no explicit grant row yet
explicit_emails = {_normalize(e['email']) for e in entries}
for admin_email in data.get('admins', []):
if _normalize(admin_email) not in explicit_emails:
entries.append({
'email': admin_email,
'clients': [],
'is_admin': True,
'updated_at': None,
'updated_by': None
})
return {
'default_clients': list(data.get('default_clients', [])),
'entries': entries
}
def get_access_snapshot():
"""Raw view of the file — used by audit logging for clients_before."""
with _lock:
return _load()