""" Admin API — user management and dropdown Excel upload. All routes require admin role. """ import json import logging import os from quart import Blueprint, jsonify, request from ..auth.middleware import admin_required from ..auth.user_store import list_users, set_role, set_active from ..api.dropdowns import save_dropdowns, parse_excel_dropdowns, detect_excel_mapping from ..api.clients import load_clients, get_client_by_id from ..api.export import detect_csv_template, load_export_template, save_export_template, _client_template_path, INTERNAL_FIELDS logger = logging.getLogger(__name__) admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin') @admin_bp.route('/users', methods=['GET']) @admin_required async def get_users(): return jsonify({'users': list_users()}) @admin_bp.route('/users/', methods=['PATCH']) @admin_required async def update_user(user_id: str): body = await request.get_json() or {} user = None if 'role' in body: user = set_role(user_id, body['role']) if user is None: return jsonify({'error': 'invalid_role_or_not_found'}), 400 if 'active' in body: user = set_active(user_id, bool(body['active'])) if user is None: return jsonify({'error': 'not_found'}), 404 return jsonify({'success': True, 'user': user}) def _read_xlsx_file(file) -> bytes: return file.read() def _extract_mapping(form) -> dict | None: """Extract mapping override from multipart form fields (name_col, status_col, media_col).""" try: if 'name_col' in form and 'status_col' in form and 'media_col' in form: return { 'name_col': int(form['name_col']), 'status_col': int(form['status_col']), 'media_col': int(form['media_col']), } except (ValueError, KeyError): pass return None async def _parse_uploaded_xlsx(files, form=None) -> tuple[list, str | None]: """Returns (categories, error_message). error_message is None on success.""" file = files.get('file') if not file: return [], 'no_file' if not (file.filename or '').lower().endswith('.xlsx'): return [], 'Only .xlsx files accepted' try: data = _read_xlsx_file(file) mapping = _extract_mapping(form) if form else None categories = parse_excel_dropdowns(data, mapping=mapping) if not categories: return [], 'No categories found in file' return categories, None except Exception as e: logger.error(f"Dropdown parse error: {e}", exc_info=True) return [], str(e) @admin_bp.route('/dropdowns/detect-mapping', methods=['POST']) @admin_required async def detect_mapping(): """Detect column mapping from an uploaded .xlsx without saving. Returns headers + mapping.""" files = await request.files file = files.get('file') if not file: return jsonify({'error': 'no_file'}), 400 if not (file.filename or '').lower().endswith('.xlsx'): return jsonify({'error': 'Only .xlsx files accepted'}), 400 try: data = _read_xlsx_file(file) result = detect_excel_mapping(data) return jsonify(result) except Exception as e: logger.error(f"Mapping detection error: {e}", exc_info=True) return jsonify({'error': str(e)}), 400 @admin_bp.route('/dropdowns/upload', methods=['POST']) @admin_required async def upload_dropdowns(): """Upload a new .xlsx to update global dropdown categories.""" files = await request.files form = await request.form categories, err = await _parse_uploaded_xlsx(files, form) if err: return jsonify({'error': err}), 400 save_dropdowns(categories) active_count = sum(1 for c in categories if c['status'] == 'Active') return jsonify({'success': True, 'total': len(categories), 'active': active_count, 'archived': len(categories) - active_count}) @admin_bp.route('/dropdowns/preview', methods=['POST']) @admin_required async def preview_dropdowns(): """Preview parsed categories from an uploaded file without saving.""" files = await request.files form = await request.form categories, err = await _parse_uploaded_xlsx(files, form) if err: return jsonify({'error': err}), 400 return jsonify({'categories': categories, 'total': len(categories)}) # ── Per-client dropdown endpoints ───────────────────────────────────────────── @admin_bp.route('/clients//dropdowns/detect-mapping', methods=['POST']) @admin_required async def detect_client_mapping(client_id: str): """Detect column mapping from a per-client .xlsx upload.""" files = await request.files file = files.get('file') if not file: return jsonify({'error': 'no_file'}), 400 if not (file.filename or '').lower().endswith('.xlsx'): return jsonify({'error': 'Only .xlsx files accepted'}), 400 try: data = _read_xlsx_file(file) result = detect_excel_mapping(data) return jsonify(result) except Exception as e: logger.error(f"Mapping detection error: {e}", exc_info=True) return jsonify({'error': str(e)}), 400 @admin_bp.route('/clients//dropdowns/upload', methods=['POST']) @admin_required async def upload_client_dropdowns(client_id: str): """Upload a per-client .xlsx dropdown file. Falls back to global if not set.""" if not get_client_by_id(client_id): return jsonify({'error': 'client_not_found'}), 404 files = await request.files form = await request.form categories, err = await _parse_uploaded_xlsx(files, form) if err: return jsonify({'error': err}), 400 save_dropdowns(categories, client_id=client_id) # Mark client as having custom dropdowns clients = load_clients() for c in clients: if c['id'] == client_id: c['hasCustomDropdowns'] = True break from ..api.clients import _save_clients _save_clients(clients) active_count = sum(1 for c in categories if c['status'] == 'Active') return jsonify({'success': True, 'total': len(categories), 'active': active_count, 'archived': len(categories) - active_count}) @admin_bp.route('/clients//dropdowns/preview', methods=['POST']) @admin_required async def preview_client_dropdowns(client_id: str): """Preview per-client dropdown file without saving.""" files = await request.files form = await request.form categories, err = await _parse_uploaded_xlsx(files, form) if err: return jsonify({'error': err}), 400 return jsonify({'categories': categories, 'total': len(categories)}) @admin_bp.route('/clients//dropdowns', methods=['DELETE']) @admin_required async def delete_client_dropdowns(client_id: str): """Remove per-client dropdown override — reverts to global.""" from ..config_runtime import server_config path = os.path.join(server_config.CLIENTS_DROPDOWNS_DIR, f"{client_id}.json") if os.path.exists(path): os.remove(path) clients = load_clients() for c in clients: if c['id'] == client_id: c['hasCustomDropdowns'] = False break from ..api.clients import _save_clients _save_clients(clients) return jsonify({'success': True}) # ── Export template endpoints ────────────────────────────────────────────────── @admin_bp.route('/export-template', methods=['GET']) @admin_required async def get_global_export_template(): return jsonify({'template': load_export_template(), 'fields': INTERNAL_FIELDS}) @admin_bp.route('/export-template/detect', methods=['POST']) @admin_required async def detect_global_export_template(): files = await request.files file = files.get('file') if not file: return jsonify({'error': 'no_file'}), 400 if not (file.filename or '').lower().endswith('.csv'): return jsonify({'error': 'Only .csv files accepted'}), 400 try: data = file.read() result = detect_csv_template(data) result['fields'] = INTERNAL_FIELDS return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 400 @admin_bp.route('/export-template', methods=['POST']) @admin_required async def save_global_export_template(): body = await request.get_json() or {} template = body.get('template') if not template or not isinstance(template, list): return jsonify({'error': 'invalid_template'}), 400 save_export_template(template) return jsonify({'success': True, 'columns': len(template)}) @admin_bp.route('/export-template', methods=['DELETE']) @admin_required async def delete_global_export_template(): from ..config_runtime import server_config if os.path.exists(server_config.EXPORT_TEMPLATE_FILE): os.remove(server_config.EXPORT_TEMPLATE_FILE) return jsonify({'success': True}) @admin_bp.route('/clients//export-template', methods=['GET']) @admin_required async def get_client_export_template(client_id: str): if not get_client_by_id(client_id): return jsonify({'error': 'client_not_found'}), 404 path = _client_template_path(client_id) has_custom = os.path.exists(path) return jsonify({ 'template': load_export_template(client_id), 'hasCustomTemplate': has_custom, 'fields': INTERNAL_FIELDS, }) @admin_bp.route('/clients//export-template/detect', methods=['POST']) @admin_required async def detect_client_export_template(client_id: str): files = await request.files file = files.get('file') if not file: return jsonify({'error': 'no_file'}), 400 if not (file.filename or '').lower().endswith('.csv'): return jsonify({'error': 'Only .csv files accepted'}), 400 try: data = file.read() result = detect_csv_template(data) result['fields'] = INTERNAL_FIELDS return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 400 @admin_bp.route('/clients//export-template', methods=['POST']) @admin_required async def save_client_export_template(client_id: str): if not get_client_by_id(client_id): return jsonify({'error': 'client_not_found'}), 404 body = await request.get_json() or {} template = body.get('template') if not template or not isinstance(template, list): return jsonify({'error': 'invalid_template'}), 400 save_export_template(template, client_id=client_id) return jsonify({'success': True, 'columns': len(template)}) @admin_bp.route('/clients//export-template', methods=['DELETE']) @admin_required async def delete_client_export_template(client_id: str): path = _client_template_path(client_id) if os.path.exists(path): os.remove(path) return jsonify({'success': True})