diff --git a/.gitignore b/.gitignore index 1e00a3d..03211d0 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,14 @@ output-dev/ fix_apache_proxy.sh diagnose_auth_issue.sh + +# Local working docs and runtime state +CAPABILITY_LIST.md +COMPLIANCE_DOCUMENTATION.md +IMPLEMENTATION_TEST_RESULTS.md +LOREAL_STATIC_GENERAL_IMPLEMENTATION.md +OCR_CALIBRATION_TODO.md +backend/debug_mode.txt +backend/media_plans/ +backend/usage_logs/ +backend/user_access.json diff --git a/backend/api_server.py b/backend/api_server.py index 1ef12b9..70f789f 100755 --- a/backend/api_server.py +++ b/backend/api_server.py @@ -1726,6 +1726,10 @@ def start_analysis(): else: client = 'general' + access_err = _require_client_access(client) + if access_err: + return access_err + # Log analysis start try: from usage_tracker import log_analysis_start @@ -2078,8 +2082,12 @@ def start_analysis(): }), 500 @app.route('/output//', methods=['GET']) +@auth.require_auth def serve_client_output_file(client, filename): """Serve saved output files from client-specific folders""" + access_err = _require_client_access(client) + if access_err: + return access_err try: file_path = os.path.join(app.config['OUTPUT_FOLDER'], client, filename) if os.path.exists(file_path): @@ -2121,6 +2129,7 @@ def serve_output_file(filename): return jsonify({'error': str(e)}), 500 @app.route('/api/output_files', methods=['GET']) +@auth.require_auth def list_output_files(): """List saved output files filtered by client, sorted by creation date (newest first)""" try: @@ -2128,6 +2137,11 @@ def list_output_files(): client_filter = request.args.get('client', None) print(f"DEBUG: list_output_files called with client_filter='{client_filter}'") + if client_filter: + access_err = _require_client_access(client_filter) + if access_err: + return access_err + # Run cleanup of files older than 14 days cleanup_old_files(max_age_days=14) @@ -2318,14 +2332,34 @@ def get_available_profiles(): @app.route('/api/clients', methods=['GET']) def get_clients_endpoint(): - """Get all available clients and their configurations""" + """Get clients visible to the current user. Admins see all; others see only their grants.""" from client_config import get_all_clients + from user_access import get_user_clients, is_admin try: - clients = get_all_clients() + all_clients = get_all_clients() + + # Resolve the current user's email if authenticated. Unauthenticated + # callers get an empty list so the UI can prompt for sign-in without + # leaking the full client catalogue. + user_email = '' + try: + auth_result = app.auth_middleware.is_authenticated() + if auth_result.get('authenticated'): + user_email = auth_result.get('user', {}).get('email', '') + except Exception: + pass + + if not user_email: + return jsonify({'status': 'success', 'clients': {}, 'is_admin': False}) + + allowed_ids = get_user_clients(user_email) + filtered = {cid: all_clients[cid] for cid in allowed_ids if cid in all_clients} + return jsonify({ 'status': 'success', - 'clients': clients + 'clients': filtered, + 'is_admin': is_admin(user_email) }) except Exception as e: return jsonify({ @@ -4205,6 +4239,10 @@ def upload_media_plan(): if not client_id: return jsonify({'status': 'error', 'message': 'client_id is required'}), 400 + access_err = _require_client_access(client_id) + if access_err: + return access_err + # Save the Excel file timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') safe_name = re.sub(r'[^a-zA-Z0-9._-]', '_', file.filename) @@ -4261,12 +4299,17 @@ def upload_media_plan(): @app.route('/api/media_plan', methods=['GET']) +@auth.require_auth def get_media_plan(): """Get the active media plan for a client""" client_id = request.args.get('client', '').strip().lower() if not client_id: return jsonify({'status': 'error', 'message': 'client parameter required'}), 400 + access_err = _require_client_access(client_id) + if access_err: + return access_err + db = _load_media_plans_db() plan_info = db.get(client_id) if not plan_info: @@ -4287,6 +4330,9 @@ def get_media_plan(): @auth.require_auth def delete_media_plan(client_id): """Delete the media plan for a client""" + access_err = _require_client_access(client_id) + if access_err: + return access_err db = _load_media_plans_db() plan_info = db.get(client_id) if not plan_info: @@ -4473,6 +4519,10 @@ def get_client_usage_stats(): if not client: return jsonify({'status': 'error', 'message': 'client parameter required'}), 400 + access_err = _require_client_access(client) + if access_err: + return access_err + start_date = request.args.get('start_date') end_date = request.args.get('end_date') @@ -4533,7 +4583,7 @@ def get_client_usage_stats(): @auth.require_auth def check_admin(): """Check if the current user is an admin""" - from client_config import is_admin + from user_access import is_admin user_email = getattr(g, 'user', {}).get('email', '') return jsonify({'is_admin': is_admin(user_email)}) @@ -4542,7 +4592,7 @@ def check_admin(): @auth.require_auth def get_admin_users(): """Get all users who have accessed the platform (admin only)""" - from client_config import is_admin + from user_access import is_admin user_email = getattr(g, 'user', {}).get('email', '') if not is_admin(user_email): return jsonify({'status': 'error', 'message': 'Admin access required'}), 403 @@ -4621,6 +4671,165 @@ def get_admin_users(): return jsonify({'status': 'error', 'message': str(e)}), 500 +def _require_admin(): + """Return (user_email, None) if admin, else (None, error_response).""" + from user_access import is_admin + user_email = getattr(g, 'user', {}).get('email', '') + if not is_admin(user_email): + return None, (jsonify({'status': 'error', 'message': 'Admin access required'}), 403) + return user_email, None + + +def _require_client_access(client_id): + """ + Validate the authed user can access `client_id`. + Returns None on success, or a (response, status) tuple to short-circuit the endpoint. + """ + if not client_id: + return None # endpoint didn't scope by client + from user_access import get_user_clients + user_email = getattr(g, 'user', {}).get('email', '') + if not user_email: + return jsonify({'status': 'error', 'message': 'Authentication required'}), 401 + allowed = get_user_clients(user_email) + if client_id not in allowed: + return jsonify({ + 'status': 'error', + 'code': 'client_access_denied', + 'message': f'You do not have access to client "{client_id}"' + }), 403 + return None + + +@app.route('/api/admin/user_access', methods=['GET']) +@auth.require_auth +def get_user_access_list(): + """List all users with their client grants. Joins login-log users with explicit grants.""" + _, err = _require_admin() + if err: + return err + + try: + from user_access import list_access_entries + from generate_usage_report import load_logs + + access = list_access_entries() + entries_by_email = {e['email'].lower(): e for e in access['entries']} + + # Enrich with login-log data (name + last_active) for users who have signed in + login_users = {} + for log_entry in load_logs(): + email = log_entry.get('user_email') + if not email: + continue + lower = email.lower() + ts = log_entry.get('timestamp', '') + if lower not in login_users or ts > login_users[lower].get('last_active', ''): + login_users[lower] = { + 'email': email, + 'name': log_entry.get('user_name', ''), + 'last_active': ts + } + + # Merge: everyone in login logs + everyone with an explicit grant + all_emails = set(login_users.keys()) | set(entries_by_email.keys()) + merged = [] + for lower in all_emails: + entry = entries_by_email.get(lower, { + 'email': login_users.get(lower, {}).get('email', lower), + 'clients': access['default_clients'], + 'is_admin': False, + 'updated_at': None, + 'updated_by': None + }) + login = login_users.get(lower, {}) + merged.append({ + **entry, + 'name': login.get('name', ''), + 'last_active': login.get('last_active', ''), + 'has_explicit_grant': lower in entries_by_email + }) + + merged.sort(key=lambda x: (not x.get('is_admin'), x.get('email', '').lower())) + + return jsonify({ + 'status': 'success', + 'default_clients': access['default_clients'], + 'users': merged + }) + except Exception as e: + print(f"Error listing user access: {e}") + import traceback + traceback.print_exc() + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@app.route('/api/admin/user_access/', methods=['PUT']) +@auth.require_auth +def set_user_access(target_email): + """Set the list of clients a user can see. Body: {"clients": ["general", ...]}""" + actor_email, err = _require_admin() + if err: + return err + + data = request.get_json(silent=True) or {} + clients = data.get('clients') + if not isinstance(clients, list): + return jsonify({'status': 'error', 'message': 'clients must be a list'}), 400 + + try: + from user_access import set_user_clients + from usage_tracker import log_access_change + audit = set_user_clients(target_email, clients, actor_email) + log_access_change(audit) + return jsonify({'status': 'success', 'audit': audit}) + except ValueError as ve: + return jsonify({'status': 'error', 'message': str(ve)}), 400 + except Exception as e: + print(f"Error setting user access: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@app.route('/api/admin/user_access//promote', methods=['POST']) +@auth.require_auth +def promote_user_admin(target_email): + """Promote a user to admin.""" + actor_email, err = _require_admin() + if err: + return err + + try: + from user_access import promote_admin + from usage_tracker import log_access_change + audit = promote_admin(target_email, actor_email) + log_access_change(audit) + return jsonify({'status': 'success', 'audit': audit}) + except ValueError as ve: + return jsonify({'status': 'error', 'message': str(ve)}), 400 + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@app.route('/api/admin/user_access//demote', methods=['POST']) +@auth.require_auth +def demote_user_admin(target_email): + """Remove admin role from a user. Blocked if it would leave zero admins.""" + actor_email, err = _require_admin() + if err: + return err + + try: + from user_access import demote_admin + from usage_tracker import log_access_change + audit = demote_admin(target_email, actor_email) + log_access_change(audit) + return jsonify({'status': 'success', 'audit': audit}) + except ValueError as ve: + return jsonify({'status': 'error', 'message': str(ve)}), 400 + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + + @app.route('/api/consolidate_reports', methods=['POST']) @auth.require_auth def consolidate_reports(): diff --git a/backend/client_config.py b/backend/client_config.py index 15b480d..310ae81 100644 --- a/backend/client_config.py +++ b/backend/client_config.py @@ -121,13 +121,9 @@ def get_profiles_with_visibility(client_id): return available_profiles -# Admin user configuration -ADMIN_USERS = [ - 'nick.viljoen@brandtech.plus', -] - +# Admin membership now lives in backend/user_access.json. +# Kept as a thin shim so any older callers keep working. def is_admin(email): - """Check if a user email is an admin""" - if not email: - return False - return email.lower() in [e.lower() for e in ADMIN_USERS] + """Deprecated — delegates to user_access.is_admin().""" + from user_access import is_admin as _is_admin + return _is_admin(email) diff --git a/backend/usage_tracker.py b/backend/usage_tracker.py index d428959..d2ad38d 100644 --- a/backend/usage_tracker.py +++ b/backend/usage_tracker.py @@ -158,6 +158,22 @@ def log_user_login(user_info): return log_entry +def log_access_change(audit_entry): + """ + Log an access grant/revoke/promote/demote event. + + Args: + audit_entry: dict from user_access.set_user_clients / promote_admin / demote_admin + """ + log_entry = { + 'event': 'access_change', + 'timestamp': datetime.now().isoformat(), + **audit_entry + } + _write_log_entry(log_entry) + return log_entry + + def _calculate_analysis_cost(results): """ Calculate cost based on actual token usage from LLM responses diff --git a/backend/user_access.py b/backend/user_access.py new file mode 100644 index 0000000..cef0be5 --- /dev/null +++ b/backend/user_access.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +User access control module. + +Stores per-user client visibility grants and admin role in user_access.json. +Users not listed fall back to default_clients (typically ['general']). +Admins see all clients regardless of their grant list. +""" + +import os +import json +from datetime import datetime +from threading import Lock + +ACCESS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_access.json') +BOOTSTRAP_ADMIN = 'nick.viljoen@brandtech.plus' + +_lock = Lock() + + +def _default_data(): + return { + 'version': 1, + 'default_clients': ['general'], + 'admins': [BOOTSTRAP_ADMIN], + 'users': {} + } + + +def _load(): + """Load access data. Creates the file with bootstrap defaults if missing or corrupt.""" + if not os.path.exists(ACCESS_FILE): + data = _default_data() + _save(data) + print(f"[user_access] Initialized {ACCESS_FILE} with {BOOTSTRAP_ADMIN} as admin") + return data + + try: + with open(ACCESS_FILE, 'r') as f: + data = json.load(f) + # Fill in any missing top-level keys so old files keep working + defaults = _default_data() + for key, value in defaults.items(): + if key not in data: + data[key] = value + return data + except (json.JSONDecodeError, OSError) as e: + print(f"[user_access] ERROR reading {ACCESS_FILE}: {e} — falling back to defaults") + return _default_data() + + +def _save(data): + with open(ACCESS_FILE, 'w') as f: + json.dump(data, f, indent=2) + + +def _normalize(email): + return (email or '').strip().lower() + + +def get_user_clients(email): + """Return list of client IDs the user can see. Admins see all.""" + email = _normalize(email) + if not email: + return [] + + with _lock: + data = _load() + + # Admins see everything + if email in [_normalize(a) for a in data.get('admins', [])]: + from client_config import get_all_clients + return list(get_all_clients().keys()) + + # Explicit grant + users = data.get('users', {}) + for stored_email, entry in users.items(): + if _normalize(stored_email) == email: + return list(entry.get('clients', [])) + + # Fall back to default + return list(data.get('default_clients', [])) + + +def set_user_clients(target_email, clients, actor_email): + """Grant exactly this set of clients to a user. Returns the audit entry.""" + target_email = _normalize(target_email) + actor_email = _normalize(actor_email) + if not target_email: + raise ValueError("target_email is required") + + # Validate clients exist + from client_config import get_all_clients + valid_clients = set(get_all_clients().keys()) + for client_id in clients: + if client_id not in valid_clients: + raise ValueError(f"Unknown client: {client_id}") + + with _lock: + data = _load() + + # Find existing entry (case-insensitive) or add new one + existing_key = None + for stored_email in data.get('users', {}).keys(): + if _normalize(stored_email) == target_email: + existing_key = stored_email + break + + clients_before = list(data['users'].get(existing_key, {}).get('clients', [])) if existing_key else list(data.get('default_clients', [])) + + if existing_key and existing_key != target_email: + del data['users'][existing_key] + + data['users'][target_email] = { + 'clients': list(clients), + 'updated_at': datetime.now().isoformat(), + 'updated_by': actor_email + } + _save(data) + + return { + 'action': 'grant' if len(clients) >= len(clients_before) else 'revoke', + 'target_email': target_email, + 'actor_email': actor_email, + 'clients_before': clients_before, + 'clients_after': list(clients) + } + + +def is_admin(email): + """Check if a user is an admin.""" + email = _normalize(email) + if not email: + return False + with _lock: + data = _load() + return email in [_normalize(a) for a in data.get('admins', [])] + + +def promote_admin(target_email, actor_email): + """Promote a user to admin. Returns audit entry.""" + target_email = _normalize(target_email) + actor_email = _normalize(actor_email) + if not target_email: + raise ValueError("target_email is required") + + with _lock: + data = _load() + admins_lower = [_normalize(a) for a in data.get('admins', [])] + if target_email in admins_lower: + return { + 'action': 'promote_admin', + 'target_email': target_email, + 'actor_email': actor_email, + 'no_op': True + } + data.setdefault('admins', []).append(target_email) + _save(data) + + return { + 'action': 'promote_admin', + 'target_email': target_email, + 'actor_email': actor_email + } + + +def demote_admin(target_email, actor_email): + """ + Demote an admin. At least one admin must remain. + Blocks self-demote if the actor is the last admin. + """ + target_email = _normalize(target_email) + actor_email = _normalize(actor_email) + if not target_email: + raise ValueError("target_email is required") + + with _lock: + data = _load() + admins = data.get('admins', []) + admins_lower = [_normalize(a) for a in admins] + + if target_email not in admins_lower: + return { + 'action': 'demote_admin', + 'target_email': target_email, + 'actor_email': actor_email, + 'no_op': True + } + + if len(admins) <= 1: + raise ValueError("Cannot demote the last admin. Promote another user first.") + + data['admins'] = [a for a in admins if _normalize(a) != target_email] + _save(data) + + return { + 'action': 'demote_admin', + 'target_email': target_email, + 'actor_email': actor_email + } + + +def list_access_entries(): + """ + Return all explicit access entries plus admin flags. + Used by the admin panel to render the users table. + """ + with _lock: + data = _load() + + admins_lower = [_normalize(a) for a in data.get('admins', [])] + entries = [] + for email, entry in data.get('users', {}).items(): + entries.append({ + 'email': email, + 'clients': list(entry.get('clients', [])), + 'is_admin': _normalize(email) in admins_lower, + 'updated_at': entry.get('updated_at'), + 'updated_by': entry.get('updated_by') + }) + + # Include admins who have no explicit grant row yet + explicit_emails = {_normalize(e['email']) for e in entries} + for admin_email in data.get('admins', []): + if _normalize(admin_email) not in explicit_emails: + entries.append({ + 'email': admin_email, + 'clients': [], + 'is_admin': True, + 'updated_at': None, + 'updated_by': None + }) + + return { + 'default_clients': list(data.get('default_clients', [])), + 'entries': entries + } + + +def get_access_snapshot(): + """Raw view of the file — used by audit logging for clients_before.""" + with _lock: + return _load() diff --git a/web_ui.html b/web_ui.html index 477b2d9..5bbfedd 100644 --- a/web_ui.html +++ b/web_ui.html @@ -938,50 +938,92 @@