"""
Flask server for H&M EMS Product Review Tool.
Serves the review UI and provides API endpoints for:
- Listing available JSON campaign files
- Loading campaign data with image mappings
- Approving/saving edits back to the master JSON with change logging
Configuration (environment variables):
IMAGE_BASE_PATH – root folder containing year/campaign/Automation_LR images
(defaults to ./campaign_images)
MASTER_JSON_DIR – folder containing campaign JSON files
(defaults to ./Master_Json)
PORT – server port (defaults to 5000)
Usage:
python3 server.py
Then open http://localhost:5000
"""
import os
import json
import threading
from datetime import datetime
from functools import wraps
from flask import Flask, jsonify, request, send_from_directory, abort, session, redirect, url_for, render_template_string
from werkzeug.middleware.proxy_fix import ProxyFix
from html_generator import (
LANGUAGE_DISPLAY_NAMES,
_get_campaign_prefix,
_get_main_image_filename,
_get_all_image_filenames,
)
# ========== Config ==========
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MASTER_JSON_DIR = os.environ.get(
"MASTER_JSON_DIR",
os.path.join(BASE_DIR, "Master_Json"),
)
IMAGE_BASE_PATH = os.environ.get(
"IMAGE_BASE_PATH",
os.path.join(BASE_DIR, "campaign_images"),
)
# Thread lock for safe JSON read/write
file_lock = threading.Lock()
app = Flask(__name__, static_folder="static")
app.secret_key = os.environ.get("SECRET_KEY", "hm-ems-secret-key-change-in-production")
app.config['APPLICATION_ROOT'] = '/hm-ems-report'
# Configure for running behind Apache proxy
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
# Login credentials
USERNAME = "admin3_M"
PASSWORD = "Pa$$w0rd2026_!"
# Authentication decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
# For AJAX/API requests, return 401 JSON instead of redirect
if request.path.startswith('/api/') or request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({"error": "Unauthorized"}), 401
return redirect(app.config['APPLICATION_ROOT'] + '/login')
return f(*args, **kwargs)
return decorated_function
# ========== Login ==========
LOGIN_HTML = """
H&M EMS - Login
H&M
EMS Product Review Tool
{% if error %}
{{ error }}
{% endif %}
"""
@app.route("/login", methods=["GET", "POST"])
def login():
error = None
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if username == USERNAME and password == PASSWORD:
session['logged_in'] = True
return redirect(app.config['APPLICATION_ROOT'] + '/')
else:
error = "Invalid username or password"
return render_template_string(LOGIN_HTML, error=error)
@app.route("/logout")
def logout():
session.pop('logged_in', None)
return redirect(app.config['APPLICATION_ROOT'] + '/login')
# ========== Static UI ==========
@app.route("/")
@login_required
def index():
return send_from_directory("static", "index.html")
# ========== API: List available JSON files ==========
@app.route("/api/files")
@login_required
def list_files():
files = []
for f in sorted(os.listdir(MASTER_JSON_DIR)):
if f.endswith(".json") and not f.endswith("_changelog.json"):
campaign = _get_campaign_prefix(f)
files.append({"filename": f, "campaign": campaign})
return jsonify(files)
# ========== API: Load campaign data ==========
@app.route("/api/load/")
@login_required
def load_campaign(filename):
# Security: strip path components
filename = os.path.basename(filename)
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404, description=f"File not found: {filename}")
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
campaign_prefix = _get_campaign_prefix(filename)
# Derive year from Season field (first 4 chars, e.g. "202502" -> "2025")
season = data[0].get("Season", "") if data else ""
year = season[:4] if len(season) >= 4 else "2025"
image_source_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
# Build image map: Article id -> list of server image URLs
image_map = {}
for record in data:
article_id = record.get("Article id", "")
if article_id in image_map:
continue
img_names = _get_all_image_filenames(record.get("Filename", ""))
urls = []
for img_name in img_names:
if os.path.isfile(os.path.join(image_source_dir, img_name)):
urls.append(f"/hm-ems-report/images/{year}/{campaign_prefix}/Automation_LR/{img_name}")
image_map[article_id] = urls if urls else None
# Build language display names for languages present in data
languages = sorted(set(
r.get("Language", "").lower() for r in data if r.get("Language")
))
lang_display = {}
for lang in languages:
lang_display[lang] = LANGUAGE_DISPLAY_NAMES.get(lang, lang)
# Derive campaign info
campaign_name = data[0].get("Campaign", campaign_prefix) if data else campaign_prefix
season = data[0].get("Season", "") if data else ""
# Load existing approvals from changelog
approvals = _load_approvals(filename)
return jsonify({
"data": data,
"image_map": image_map,
"lang_display": lang_display,
"campaign": campaign_name,
"season": season,
"filename": filename,
"approvals": approvals,
})
# ========== API: Approve / save edits ==========
@app.route("/api/approve", methods=["POST"])
@login_required
def approve():
body = request.get_json()
if not body:
abort(400, description="Missing JSON body")
filename = os.path.basename(body.get("filename", ""))
article_id = body.get("article_id", "")
language = body.get("language", "")
edits = body.get("edits", {})
action = body.get("action", "approve") # "approve" or "unapprove"
if not filename or not article_id or not language:
abort(400, description="Missing required fields: filename, article_id, language")
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404, description=f"File not found: {filename}")
with file_lock:
# Load master JSON
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
# Find matching record
record = None
for r in data:
if (r.get("Article id") == article_id and
r.get("Language", "").lower() == language.lower()):
record = r
break
if not record:
abort(404, description=f"Record not found: {article_id} / {language}")
# Build changelog entry
changelog_entry = {
"timestamp": datetime.now().isoformat(),
"article_id": article_id,
"language": language,
"country": record.get("Country", ""),
"product_id": record.get("Product id", ""),
"action": action,
"edits": [],
}
# Apply edits if approving
if action == "approve" and edits:
for field in ["Product name", "Price"]:
if field in edits and edits[field] is not None:
old_val = record.get(field, "")
new_val = edits[field]
if str(old_val) != str(new_val):
changelog_entry["edits"].append({
"field": field,
"old": old_val,
"new": new_val,
})
record[field] = new_val
# Write master JSON back
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Write changelog
_append_changelog(filename, changelog_entry)
return jsonify({"status": "ok", "action": action})
# ========== API: Unapprove ==========
@app.route("/api/unapprove", methods=["POST"])
def unapprove():
body = request.get_json()
if not body:
abort(400)
body["action"] = "unapprove"
# Reuse the approve logic for logging
return _handle_approval(body)
# ========== Image serving ==========
@app.route("/images///")
def serve_image(year, campaign_prefix, filename):
# Security: no path traversal
year = os.path.basename(year)
campaign_prefix = os.path.basename(campaign_prefix)
filename = os.path.basename(filename)
image_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
if not os.path.isdir(image_dir):
abort(404)
return send_from_directory(image_dir, filename)
# ========== Changelog helpers ==========
def _changelog_path(filename):
stem = os.path.splitext(filename)[0]
return os.path.join(MASTER_JSON_DIR, f"{stem}_changelog.json")
def _append_changelog(filename, entry):
path = _changelog_path(filename)
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
log = json.load(f)
else:
log = []
log.append(entry)
with open(path, "w", encoding="utf-8") as f:
json.dump(log, f, indent=2, ensure_ascii=False)
def _load_approvals(filename):
"""Load approval state from changelog - returns dict of approved keys."""
path = _changelog_path(filename)
approvals = {}
if not os.path.isfile(path):
return approvals
with open(path, "r", encoding="utf-8") as f:
log = json.load(f)
for entry in log:
key = entry["article_id"] + "::" + entry["language"].lower()
if entry.get("action") == "approve":
approvals[key] = True
elif entry.get("action") == "unapprove":
approvals.pop(key, None)
return approvals
def _handle_approval(body):
"""Shared logic for approve/unapprove."""
filename = os.path.basename(body.get("filename", ""))
article_id = body.get("article_id", "")
language = body.get("language", "")
action = body.get("action", "approve")
if not filename or not article_id or not language:
abort(400)
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404)
with file_lock:
changelog_entry = {
"timestamp": datetime.now().isoformat(),
"article_id": article_id,
"language": language,
"action": action,
"edits": [],
}
_append_changelog(filename, changelog_entry)
return jsonify({"status": "ok", "action": action})
# ========== Run ==========
if __name__ == "__main__":
port = int(os.environ.get("PORT", 5000))
print(f"Master JSON folder: {MASTER_JSON_DIR}")
print(f"Image base path: {IMAGE_BASE_PATH}")
print(f"Starting server at http://localhost:{port}")
app.run(host="0.0.0.0", port=port, debug=False)