Default-deny access model with admin grant/revoke via new User Access tab. /api/clients filters by user grants; client-scoped endpoints enforce access server-side. Admin role and client grants persist in user_access.json with audit trail in usage logs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
243 lines
7.2 KiB
Python
243 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
User access control module.
|
|
|
|
Stores per-user client visibility grants and admin role in user_access.json.
|
|
Users not listed fall back to default_clients (typically ['general']).
|
|
Admins see all clients regardless of their grant list.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
from threading import Lock
|
|
|
|
ACCESS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_access.json')
|
|
BOOTSTRAP_ADMIN = 'nick.viljoen@brandtech.plus'
|
|
|
|
_lock = Lock()
|
|
|
|
|
|
def _default_data():
|
|
return {
|
|
'version': 1,
|
|
'default_clients': ['general'],
|
|
'admins': [BOOTSTRAP_ADMIN],
|
|
'users': {}
|
|
}
|
|
|
|
|
|
def _load():
|
|
"""Load access data. Creates the file with bootstrap defaults if missing or corrupt."""
|
|
if not os.path.exists(ACCESS_FILE):
|
|
data = _default_data()
|
|
_save(data)
|
|
print(f"[user_access] Initialized {ACCESS_FILE} with {BOOTSTRAP_ADMIN} as admin")
|
|
return data
|
|
|
|
try:
|
|
with open(ACCESS_FILE, 'r') as f:
|
|
data = json.load(f)
|
|
# Fill in any missing top-level keys so old files keep working
|
|
defaults = _default_data()
|
|
for key, value in defaults.items():
|
|
if key not in data:
|
|
data[key] = value
|
|
return data
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
print(f"[user_access] ERROR reading {ACCESS_FILE}: {e} — falling back to defaults")
|
|
return _default_data()
|
|
|
|
|
|
def _save(data):
|
|
with open(ACCESS_FILE, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def _normalize(email):
|
|
return (email or '').strip().lower()
|
|
|
|
|
|
def get_user_clients(email):
|
|
"""Return list of client IDs the user can see. Admins see all."""
|
|
email = _normalize(email)
|
|
if not email:
|
|
return []
|
|
|
|
with _lock:
|
|
data = _load()
|
|
|
|
# Admins see everything
|
|
if email in [_normalize(a) for a in data.get('admins', [])]:
|
|
from client_config import get_all_clients
|
|
return list(get_all_clients().keys())
|
|
|
|
# Explicit grant
|
|
users = data.get('users', {})
|
|
for stored_email, entry in users.items():
|
|
if _normalize(stored_email) == email:
|
|
return list(entry.get('clients', []))
|
|
|
|
# Fall back to default
|
|
return list(data.get('default_clients', []))
|
|
|
|
|
|
def set_user_clients(target_email, clients, actor_email):
|
|
"""Grant exactly this set of clients to a user. Returns the audit entry."""
|
|
target_email = _normalize(target_email)
|
|
actor_email = _normalize(actor_email)
|
|
if not target_email:
|
|
raise ValueError("target_email is required")
|
|
|
|
# Validate clients exist
|
|
from client_config import get_all_clients
|
|
valid_clients = set(get_all_clients().keys())
|
|
for client_id in clients:
|
|
if client_id not in valid_clients:
|
|
raise ValueError(f"Unknown client: {client_id}")
|
|
|
|
with _lock:
|
|
data = _load()
|
|
|
|
# Find existing entry (case-insensitive) or add new one
|
|
existing_key = None
|
|
for stored_email in data.get('users', {}).keys():
|
|
if _normalize(stored_email) == target_email:
|
|
existing_key = stored_email
|
|
break
|
|
|
|
clients_before = list(data['users'].get(existing_key, {}).get('clients', [])) if existing_key else list(data.get('default_clients', []))
|
|
|
|
if existing_key and existing_key != target_email:
|
|
del data['users'][existing_key]
|
|
|
|
data['users'][target_email] = {
|
|
'clients': list(clients),
|
|
'updated_at': datetime.now().isoformat(),
|
|
'updated_by': actor_email
|
|
}
|
|
_save(data)
|
|
|
|
return {
|
|
'action': 'grant' if len(clients) >= len(clients_before) else 'revoke',
|
|
'target_email': target_email,
|
|
'actor_email': actor_email,
|
|
'clients_before': clients_before,
|
|
'clients_after': list(clients)
|
|
}
|
|
|
|
|
|
def is_admin(email):
|
|
"""Check if a user is an admin."""
|
|
email = _normalize(email)
|
|
if not email:
|
|
return False
|
|
with _lock:
|
|
data = _load()
|
|
return email in [_normalize(a) for a in data.get('admins', [])]
|
|
|
|
|
|
def promote_admin(target_email, actor_email):
|
|
"""Promote a user to admin. Returns audit entry."""
|
|
target_email = _normalize(target_email)
|
|
actor_email = _normalize(actor_email)
|
|
if not target_email:
|
|
raise ValueError("target_email is required")
|
|
|
|
with _lock:
|
|
data = _load()
|
|
admins_lower = [_normalize(a) for a in data.get('admins', [])]
|
|
if target_email in admins_lower:
|
|
return {
|
|
'action': 'promote_admin',
|
|
'target_email': target_email,
|
|
'actor_email': actor_email,
|
|
'no_op': True
|
|
}
|
|
data.setdefault('admins', []).append(target_email)
|
|
_save(data)
|
|
|
|
return {
|
|
'action': 'promote_admin',
|
|
'target_email': target_email,
|
|
'actor_email': actor_email
|
|
}
|
|
|
|
|
|
def demote_admin(target_email, actor_email):
|
|
"""
|
|
Demote an admin. At least one admin must remain.
|
|
Blocks self-demote if the actor is the last admin.
|
|
"""
|
|
target_email = _normalize(target_email)
|
|
actor_email = _normalize(actor_email)
|
|
if not target_email:
|
|
raise ValueError("target_email is required")
|
|
|
|
with _lock:
|
|
data = _load()
|
|
admins = data.get('admins', [])
|
|
admins_lower = [_normalize(a) for a in admins]
|
|
|
|
if target_email not in admins_lower:
|
|
return {
|
|
'action': 'demote_admin',
|
|
'target_email': target_email,
|
|
'actor_email': actor_email,
|
|
'no_op': True
|
|
}
|
|
|
|
if len(admins) <= 1:
|
|
raise ValueError("Cannot demote the last admin. Promote another user first.")
|
|
|
|
data['admins'] = [a for a in admins if _normalize(a) != target_email]
|
|
_save(data)
|
|
|
|
return {
|
|
'action': 'demote_admin',
|
|
'target_email': target_email,
|
|
'actor_email': actor_email
|
|
}
|
|
|
|
|
|
def list_access_entries():
|
|
"""
|
|
Return all explicit access entries plus admin flags.
|
|
Used by the admin panel to render the users table.
|
|
"""
|
|
with _lock:
|
|
data = _load()
|
|
|
|
admins_lower = [_normalize(a) for a in data.get('admins', [])]
|
|
entries = []
|
|
for email, entry in data.get('users', {}).items():
|
|
entries.append({
|
|
'email': email,
|
|
'clients': list(entry.get('clients', [])),
|
|
'is_admin': _normalize(email) in admins_lower,
|
|
'updated_at': entry.get('updated_at'),
|
|
'updated_by': entry.get('updated_by')
|
|
})
|
|
|
|
# Include admins who have no explicit grant row yet
|
|
explicit_emails = {_normalize(e['email']) for e in entries}
|
|
for admin_email in data.get('admins', []):
|
|
if _normalize(admin_email) not in explicit_emails:
|
|
entries.append({
|
|
'email': admin_email,
|
|
'clients': [],
|
|
'is_admin': True,
|
|
'updated_at': None,
|
|
'updated_by': None
|
|
})
|
|
|
|
return {
|
|
'default_clients': list(data.get('default_clients', [])),
|
|
'entries': entries
|
|
}
|
|
|
|
|
|
def get_access_snapshot():
|
|
"""Raw view of the file — used by audit logging for clients_before."""
|
|
with _lock:
|
|
return _load()
|