#!/usr/bin/env python3 """ User access control module. Stores per-user client visibility grants and admin role in user_access.json. Users not listed fall back to default_clients (typically ['general']). Admins see all clients regardless of their grant list. """ import os import json from datetime import datetime from threading import Lock ACCESS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_access.json') BOOTSTRAP_ADMIN = 'nick.viljoen@brandtech.plus' _lock = Lock() def _default_data(): return { 'version': 1, 'default_clients': ['general'], 'admins': [BOOTSTRAP_ADMIN], 'users': {} } def _load(): """Load access data. Creates the file with bootstrap defaults if missing or corrupt.""" if not os.path.exists(ACCESS_FILE): data = _default_data() _save(data) print(f"[user_access] Initialized {ACCESS_FILE} with {BOOTSTRAP_ADMIN} as admin") return data try: with open(ACCESS_FILE, 'r') as f: data = json.load(f) # Fill in any missing top-level keys so old files keep working defaults = _default_data() for key, value in defaults.items(): if key not in data: data[key] = value return data except (json.JSONDecodeError, OSError) as e: print(f"[user_access] ERROR reading {ACCESS_FILE}: {e} — falling back to defaults") return _default_data() def _save(data): with open(ACCESS_FILE, 'w') as f: json.dump(data, f, indent=2) def _normalize(email): return (email or '').strip().lower() def get_user_clients(email): """Return list of client IDs the user can see. Admins see all.""" email = _normalize(email) if not email: return [] with _lock: data = _load() # Admins see everything if email in [_normalize(a) for a in data.get('admins', [])]: from client_config import get_all_clients return list(get_all_clients().keys()) # Explicit grant users = data.get('users', {}) for stored_email, entry in users.items(): if _normalize(stored_email) == email: return list(entry.get('clients', [])) # Fall back to default return list(data.get('default_clients', [])) def set_user_clients(target_email, clients, actor_email): """Grant exactly this set of clients to a user. Returns the audit entry.""" target_email = _normalize(target_email) actor_email = _normalize(actor_email) if not target_email: raise ValueError("target_email is required") # Validate clients exist from client_config import get_all_clients valid_clients = set(get_all_clients().keys()) for client_id in clients: if client_id not in valid_clients: raise ValueError(f"Unknown client: {client_id}") with _lock: data = _load() # Find existing entry (case-insensitive) or add new one existing_key = None for stored_email in data.get('users', {}).keys(): if _normalize(stored_email) == target_email: existing_key = stored_email break clients_before = list(data['users'].get(existing_key, {}).get('clients', [])) if existing_key else list(data.get('default_clients', [])) if existing_key and existing_key != target_email: del data['users'][existing_key] data['users'][target_email] = { 'clients': list(clients), 'updated_at': datetime.now().isoformat(), 'updated_by': actor_email } _save(data) return { 'action': 'grant' if len(clients) >= len(clients_before) else 'revoke', 'target_email': target_email, 'actor_email': actor_email, 'clients_before': clients_before, 'clients_after': list(clients) } def is_admin(email): """Check if a user is an admin.""" email = _normalize(email) if not email: return False with _lock: data = _load() return email in [_normalize(a) for a in data.get('admins', [])] def promote_admin(target_email, actor_email): """Promote a user to admin. Returns audit entry.""" target_email = _normalize(target_email) actor_email = _normalize(actor_email) if not target_email: raise ValueError("target_email is required") with _lock: data = _load() admins_lower = [_normalize(a) for a in data.get('admins', [])] if target_email in admins_lower: return { 'action': 'promote_admin', 'target_email': target_email, 'actor_email': actor_email, 'no_op': True } data.setdefault('admins', []).append(target_email) _save(data) return { 'action': 'promote_admin', 'target_email': target_email, 'actor_email': actor_email } def demote_admin(target_email, actor_email): """ Demote an admin. At least one admin must remain. Blocks self-demote if the actor is the last admin. """ target_email = _normalize(target_email) actor_email = _normalize(actor_email) if not target_email: raise ValueError("target_email is required") with _lock: data = _load() admins = data.get('admins', []) admins_lower = [_normalize(a) for a in admins] if target_email not in admins_lower: return { 'action': 'demote_admin', 'target_email': target_email, 'actor_email': actor_email, 'no_op': True } if len(admins) <= 1: raise ValueError("Cannot demote the last admin. Promote another user first.") data['admins'] = [a for a in admins if _normalize(a) != target_email] _save(data) return { 'action': 'demote_admin', 'target_email': target_email, 'actor_email': actor_email } def list_access_entries(): """ Return all explicit access entries plus admin flags. Used by the admin panel to render the users table. """ with _lock: data = _load() admins_lower = [_normalize(a) for a in data.get('admins', [])] entries = [] for email, entry in data.get('users', {}).items(): entries.append({ 'email': email, 'clients': list(entry.get('clients', [])), 'is_admin': _normalize(email) in admins_lower, 'updated_at': entry.get('updated_at'), 'updated_by': entry.get('updated_by') }) # Include admins who have no explicit grant row yet explicit_emails = {_normalize(e['email']) for e in entries} for admin_email in data.get('admins', []): if _normalize(admin_email) not in explicit_emails: entries.append({ 'email': admin_email, 'clients': [], 'is_admin': True, 'updated_at': None, 'updated_by': None }) return { 'default_clients': list(data.get('default_clients', [])), 'entries': entries } def get_access_snapshot(): """Raw view of the file — used by audit logging for clients_before.""" with _lock: return _load()