""" Flask server for H&M EMS Product Review Tool. Serves the review UI and provides API endpoints for: - Listing available JSON campaign files - Loading campaign data with image mappings - Approving/saving edits back to the master JSON with change logging Configuration (environment variables): IMAGE_BASE_PATH – root folder containing year/campaign/Automation_LR images (defaults to ./campaign_images) MASTER_JSON_DIR – folder containing campaign JSON files (defaults to ./Master_Json) PORT – server port (defaults to 5000) Usage: python3 server.py Then open http://localhost:5000 """ import os import json import threading from datetime import datetime from functools import wraps from flask import Flask, jsonify, request, send_from_directory, abort, session, redirect, url_for, render_template_string from werkzeug.middleware.proxy_fix import ProxyFix from html_generator import ( LANGUAGE_DISPLAY_NAMES, _get_campaign_prefix, _get_main_image_filename, _get_all_image_filenames, ) # ========== Config ========== BASE_DIR = os.path.dirname(os.path.abspath(__file__)) MASTER_JSON_DIR = os.environ.get( "MASTER_JSON_DIR", os.path.join(BASE_DIR, "Master_Json"), ) IMAGE_BASE_PATH = os.environ.get( "IMAGE_BASE_PATH", os.path.join(BASE_DIR, "campaign_images"), ) # Thread lock for safe JSON read/write file_lock = threading.Lock() app = Flask(__name__, static_folder="static") app.secret_key = os.environ.get("SECRET_KEY", "hm-ems-secret-key-change-in-production") app.config['APPLICATION_ROOT'] = '/hm-ems-report' # Configure for running behind Apache proxy app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # Login credentials USERNAME = "admin3_M" PASSWORD = "Pa$$w0rd2026_!" # Authentication decorator def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get('logged_in'): # For AJAX/API requests, return 401 JSON instead of redirect if request.path.startswith('/api/') or request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({"error": "Unauthorized"}), 401 return redirect(app.config['APPLICATION_ROOT'] + '/login') return f(*args, **kwargs) return decorated_function # ========== Login ========== LOGIN_HTML = """ H&M EMS - Login
{% if error %}
{{ error }}
{% endif %}
""" @app.route("/login", methods=["GET", "POST"]) def login(): error = None if request.method == "POST": username = request.form.get("username") password = request.form.get("password") if username == USERNAME and password == PASSWORD: session['logged_in'] = True return redirect(app.config['APPLICATION_ROOT'] + '/') else: error = "Invalid username or password" return render_template_string(LOGIN_HTML, error=error) @app.route("/logout") def logout(): session.pop('logged_in', None) return redirect(app.config['APPLICATION_ROOT'] + '/login') # ========== Static UI ========== @app.route("/") @login_required def index(): return send_from_directory("static", "index.html") # ========== API: List available JSON files ========== @app.route("/api/files") @login_required def list_files(): files = [] for f in sorted(os.listdir(MASTER_JSON_DIR)): if f.endswith(".json") and not f.endswith("_changelog.json"): campaign = _get_campaign_prefix(f) files.append({"filename": f, "campaign": campaign}) return jsonify(files) # ========== API: Load campaign data ========== @app.route("/api/load/") @login_required def load_campaign(filename): # Security: strip path components filename = os.path.basename(filename) filepath = os.path.join(MASTER_JSON_DIR, filename) if not os.path.isfile(filepath): abort(404, description=f"File not found: {filename}") with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) campaign_prefix = _get_campaign_prefix(filename) # Derive year from Season field (first 4 chars, e.g. "202502" -> "2025") season = data[0].get("Season", "") if data else "" year = season[:4] if len(season) >= 4 else "2025" image_source_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR") # Build image map: Article id -> list of server image URLs image_map = {} for record in data: article_id = record.get("Article id", "") if article_id in image_map: continue img_names = _get_all_image_filenames(record.get("Filename", "")) urls = [] for img_name in img_names: if os.path.isfile(os.path.join(image_source_dir, img_name)): urls.append(f"/hm-ems-report/images/{year}/{campaign_prefix}/Automation_LR/{img_name}") image_map[article_id] = urls if urls else None # Build language display names for languages present in data languages = sorted(set( r.get("Language", "").lower() for r in data if r.get("Language") )) lang_display = {} for lang in languages: lang_display[lang] = LANGUAGE_DISPLAY_NAMES.get(lang, lang) # Derive campaign info campaign_name = data[0].get("Campaign", campaign_prefix) if data else campaign_prefix season = data[0].get("Season", "") if data else "" # Load existing approvals from changelog approvals = _load_approvals(filename) return jsonify({ "data": data, "image_map": image_map, "lang_display": lang_display, "campaign": campaign_name, "season": season, "filename": filename, "approvals": approvals, }) # ========== API: Approve / save edits ========== @app.route("/api/approve", methods=["POST"]) @login_required def approve(): body = request.get_json() if not body: abort(400, description="Missing JSON body") filename = os.path.basename(body.get("filename", "")) article_id = body.get("article_id", "") language = body.get("language", "") edits = body.get("edits", {}) action = body.get("action", "approve") # "approve" or "unapprove" if not filename or not article_id or not language: abort(400, description="Missing required fields: filename, article_id, language") filepath = os.path.join(MASTER_JSON_DIR, filename) if not os.path.isfile(filepath): abort(404, description=f"File not found: {filename}") with file_lock: # Load master JSON with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) # Find matching record record = None for r in data: if (r.get("Article id") == article_id and r.get("Language", "").lower() == language.lower()): record = r break if not record: abort(404, description=f"Record not found: {article_id} / {language}") # Build changelog entry changelog_entry = { "timestamp": datetime.now().isoformat(), "article_id": article_id, "language": language, "country": record.get("Country", ""), "product_id": record.get("Product id", ""), "action": action, "edits": [], } # Apply edits if approving if action == "approve" and edits: for field in ["Product name", "Price"]: if field in edits and edits[field] is not None: old_val = record.get(field, "") new_val = edits[field] if str(old_val) != str(new_val): changelog_entry["edits"].append({ "field": field, "old": old_val, "new": new_val, }) record[field] = new_val # Write master JSON back with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # Write changelog _append_changelog(filename, changelog_entry) return jsonify({"status": "ok", "action": action}) # ========== API: Unapprove ========== @app.route("/api/unapprove", methods=["POST"]) def unapprove(): body = request.get_json() if not body: abort(400) body["action"] = "unapprove" # Reuse the approve logic for logging return _handle_approval(body) # ========== Image serving ========== @app.route("/images///") def serve_image(year, campaign_prefix, filename): # Security: no path traversal year = os.path.basename(year) campaign_prefix = os.path.basename(campaign_prefix) filename = os.path.basename(filename) image_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR") if not os.path.isdir(image_dir): abort(404) return send_from_directory(image_dir, filename) # ========== Changelog helpers ========== def _changelog_path(filename): stem = os.path.splitext(filename)[0] return os.path.join(MASTER_JSON_DIR, f"{stem}_changelog.json") def _append_changelog(filename, entry): path = _changelog_path(filename) if os.path.isfile(path): with open(path, "r", encoding="utf-8") as f: log = json.load(f) else: log = [] log.append(entry) with open(path, "w", encoding="utf-8") as f: json.dump(log, f, indent=2, ensure_ascii=False) def _load_approvals(filename): """Load approval state from changelog - returns dict of approved keys.""" path = _changelog_path(filename) approvals = {} if not os.path.isfile(path): return approvals with open(path, "r", encoding="utf-8") as f: log = json.load(f) for entry in log: key = entry["article_id"] + "::" + entry["language"].lower() if entry.get("action") == "approve": approvals[key] = True elif entry.get("action") == "unapprove": approvals.pop(key, None) return approvals def _handle_approval(body): """Shared logic for approve/unapprove.""" filename = os.path.basename(body.get("filename", "")) article_id = body.get("article_id", "") language = body.get("language", "") action = body.get("action", "approve") if not filename or not article_id or not language: abort(400) filepath = os.path.join(MASTER_JSON_DIR, filename) if not os.path.isfile(filepath): abort(404) with file_lock: changelog_entry = { "timestamp": datetime.now().isoformat(), "article_id": article_id, "language": language, "action": action, "edits": [], } _append_changelog(filename, changelog_entry) return jsonify({"status": "ok", "action": action}) # ========== Run ========== if __name__ == "__main__": port = int(os.environ.get("PORT", 5000)) print(f"Master JSON folder: {MASTER_JSON_DIR}") print(f"Image base path: {IMAGE_BASE_PATH}") print(f"Starting server at http://localhost:{port}") app.run(host="0.0.0.0", port=port, debug=False)