Flask now knows it's mounted at /hm-ems-report and builds correct URLs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
457 lines
14 KiB
Python
457 lines
14 KiB
Python
"""
|
||
Flask server for H&M EMS Product Review Tool.
|
||
|
||
Serves the review UI and provides API endpoints for:
|
||
- Listing available JSON campaign files
|
||
- Loading campaign data with image mappings
|
||
- Approving/saving edits back to the master JSON with change logging
|
||
|
||
Configuration (environment variables):
|
||
IMAGE_BASE_PATH – root folder containing year/campaign/Automation_LR images
|
||
(defaults to ./campaign_images)
|
||
MASTER_JSON_DIR – folder containing campaign JSON files
|
||
(defaults to ./Master_Json)
|
||
PORT – server port (defaults to 5000)
|
||
|
||
Usage:
|
||
python3 server.py
|
||
Then open http://localhost:5000
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import threading
|
||
from datetime import datetime
|
||
from functools import wraps
|
||
from flask import Flask, jsonify, request, send_from_directory, abort, session, redirect, url_for, render_template_string
|
||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||
|
||
from html_generator import (
|
||
LANGUAGE_DISPLAY_NAMES,
|
||
_get_campaign_prefix,
|
||
_get_main_image_filename,
|
||
_get_all_image_filenames,
|
||
)
|
||
|
||
# ========== Config ==========
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
MASTER_JSON_DIR = os.environ.get(
|
||
"MASTER_JSON_DIR",
|
||
os.path.join(BASE_DIR, "Master_Json"),
|
||
)
|
||
IMAGE_BASE_PATH = os.environ.get(
|
||
"IMAGE_BASE_PATH",
|
||
os.path.join(BASE_DIR, "campaign_images"),
|
||
)
|
||
|
||
# Thread lock for safe JSON read/write
|
||
file_lock = threading.Lock()
|
||
|
||
app = Flask(__name__, static_folder="static")
|
||
app.secret_key = os.environ.get("SECRET_KEY", "hm-ems-secret-key-change-in-production")
|
||
app.config['APPLICATION_ROOT'] = '/hm-ems-report'
|
||
|
||
# Configure for running behind Apache proxy
|
||
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
|
||
|
||
# Login credentials
|
||
USERNAME = "admin3_M"
|
||
PASSWORD = "Pa$$w0rd2026_!"
|
||
|
||
|
||
# Authentication decorator
|
||
def login_required(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
if not session.get('logged_in'):
|
||
# For AJAX/API requests, return 401 JSON instead of redirect
|
||
if request.path.startswith('/api/') or request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
|
||
return jsonify({"error": "Unauthorized"}), 401
|
||
return redirect(app.config['APPLICATION_ROOT'] + '/login')
|
||
return f(*args, **kwargs)
|
||
return decorated_function
|
||
|
||
|
||
# ========== Login ==========
|
||
LOGIN_HTML = """
|
||
<!DOCTYPE html>
|
||
<html lang="en">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<title>H&M EMS - Login</title>
|
||
<style>
|
||
* { margin: 0; padding: 0; box-sizing: border-box; }
|
||
body {
|
||
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Arial, sans-serif;
|
||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||
min-height: 100vh;
|
||
display: flex;
|
||
align-items: center;
|
||
justify-content: center;
|
||
}
|
||
.login-container {
|
||
background: white;
|
||
padding: 40px;
|
||
border-radius: 12px;
|
||
box-shadow: 0 10px 40px rgba(0,0,0,0.2);
|
||
width: 100%;
|
||
max-width: 400px;
|
||
}
|
||
.logo {
|
||
text-align: center;
|
||
margin-bottom: 30px;
|
||
}
|
||
.logo h1 {
|
||
color: #cc0000;
|
||
font-size: 32px;
|
||
font-weight: 700;
|
||
letter-spacing: 2px;
|
||
}
|
||
.logo p {
|
||
color: #666;
|
||
font-size: 14px;
|
||
margin-top: 5px;
|
||
}
|
||
.form-group {
|
||
margin-bottom: 20px;
|
||
}
|
||
label {
|
||
display: block;
|
||
margin-bottom: 8px;
|
||
color: #333;
|
||
font-weight: 500;
|
||
font-size: 14px;
|
||
}
|
||
input[type="text"],
|
||
input[type="password"] {
|
||
width: 100%;
|
||
padding: 12px 15px;
|
||
border: 2px solid #e0e0e0;
|
||
border-radius: 6px;
|
||
font-size: 14px;
|
||
transition: border-color 0.3s;
|
||
}
|
||
input[type="text"]:focus,
|
||
input[type="password"]:focus {
|
||
outline: none;
|
||
border-color: #667eea;
|
||
}
|
||
.btn-login {
|
||
width: 100%;
|
||
padding: 12px;
|
||
background: #cc0000;
|
||
color: white;
|
||
border: none;
|
||
border-radius: 6px;
|
||
font-size: 16px;
|
||
font-weight: 600;
|
||
cursor: pointer;
|
||
transition: background 0.3s;
|
||
}
|
||
.btn-login:hover {
|
||
background: #a30000;
|
||
}
|
||
.error {
|
||
background: #fee;
|
||
color: #c33;
|
||
padding: 12px;
|
||
border-radius: 6px;
|
||
margin-bottom: 20px;
|
||
font-size: 14px;
|
||
text-align: center;
|
||
}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="login-container">
|
||
<div class="logo">
|
||
<h1>H&M</h1>
|
||
<p>EMS Product Review Tool</p>
|
||
</div>
|
||
{% if error %}
|
||
<div class="error">{{ error }}</div>
|
||
{% endif %}
|
||
<form method="POST">
|
||
<div class="form-group">
|
||
<label for="username">Username</label>
|
||
<input type="text" id="username" name="username" required autofocus>
|
||
</div>
|
||
<div class="form-group">
|
||
<label for="password">Password</label>
|
||
<input type="password" id="password" name="password" required>
|
||
</div>
|
||
<button type="submit" class="btn-login">Sign In</button>
|
||
</form>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
error = None
|
||
if request.method == "POST":
|
||
username = request.form.get("username")
|
||
password = request.form.get("password")
|
||
|
||
if username == USERNAME and password == PASSWORD:
|
||
session['logged_in'] = True
|
||
return redirect(app.config['APPLICATION_ROOT'] + '/')
|
||
else:
|
||
error = "Invalid username or password"
|
||
|
||
return render_template_string(LOGIN_HTML, error=error)
|
||
|
||
|
||
@app.route("/logout")
|
||
def logout():
|
||
session.pop('logged_in', None)
|
||
return redirect(app.config['APPLICATION_ROOT'] + '/login')
|
||
|
||
|
||
# ========== Static UI ==========
|
||
@app.route("/")
|
||
@login_required
|
||
def index():
|
||
return send_from_directory("static", "index.html")
|
||
|
||
|
||
# ========== API: List available JSON files ==========
|
||
@app.route("/api/files")
|
||
@login_required
|
||
def list_files():
|
||
files = []
|
||
for f in sorted(os.listdir(MASTER_JSON_DIR)):
|
||
if f.endswith(".json") and not f.endswith("_changelog.json"):
|
||
campaign = _get_campaign_prefix(f)
|
||
files.append({"filename": f, "campaign": campaign})
|
||
return jsonify(files)
|
||
|
||
|
||
# ========== API: Load campaign data ==========
|
||
@app.route("/api/load/<filename>")
|
||
@login_required
|
||
def load_campaign(filename):
|
||
# Security: strip path components
|
||
filename = os.path.basename(filename)
|
||
filepath = os.path.join(MASTER_JSON_DIR, filename)
|
||
|
||
if not os.path.isfile(filepath):
|
||
abort(404, description=f"File not found: {filename}")
|
||
|
||
with open(filepath, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
campaign_prefix = _get_campaign_prefix(filename)
|
||
|
||
# Derive year from Season field (first 4 chars, e.g. "202502" -> "2025")
|
||
season = data[0].get("Season", "") if data else ""
|
||
year = season[:4] if len(season) >= 4 else "2025"
|
||
|
||
image_source_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
|
||
|
||
# Build image map: Article id -> list of server image URLs
|
||
image_map = {}
|
||
for record in data:
|
||
article_id = record.get("Article id", "")
|
||
if article_id in image_map:
|
||
continue
|
||
img_names = _get_all_image_filenames(record.get("Filename", ""))
|
||
urls = []
|
||
for img_name in img_names:
|
||
if os.path.isfile(os.path.join(image_source_dir, img_name)):
|
||
urls.append(f"/hm-ems-report/images/{year}/{campaign_prefix}/Automation_LR/{img_name}")
|
||
image_map[article_id] = urls if urls else None
|
||
|
||
# Build language display names for languages present in data
|
||
languages = sorted(set(
|
||
r.get("Language", "").lower() for r in data if r.get("Language")
|
||
))
|
||
lang_display = {}
|
||
for lang in languages:
|
||
lang_display[lang] = LANGUAGE_DISPLAY_NAMES.get(lang, lang)
|
||
|
||
# Derive campaign info
|
||
campaign_name = data[0].get("Campaign", campaign_prefix) if data else campaign_prefix
|
||
season = data[0].get("Season", "") if data else ""
|
||
|
||
# Load existing approvals from changelog
|
||
approvals = _load_approvals(filename)
|
||
|
||
return jsonify({
|
||
"data": data,
|
||
"image_map": image_map,
|
||
"lang_display": lang_display,
|
||
"campaign": campaign_name,
|
||
"season": season,
|
||
"filename": filename,
|
||
"approvals": approvals,
|
||
})
|
||
|
||
|
||
# ========== API: Approve / save edits ==========
|
||
@app.route("/api/approve", methods=["POST"])
|
||
@login_required
|
||
def approve():
|
||
body = request.get_json()
|
||
if not body:
|
||
abort(400, description="Missing JSON body")
|
||
|
||
filename = os.path.basename(body.get("filename", ""))
|
||
article_id = body.get("article_id", "")
|
||
language = body.get("language", "")
|
||
edits = body.get("edits", {})
|
||
action = body.get("action", "approve") # "approve" or "unapprove"
|
||
|
||
if not filename or not article_id or not language:
|
||
abort(400, description="Missing required fields: filename, article_id, language")
|
||
|
||
filepath = os.path.join(MASTER_JSON_DIR, filename)
|
||
if not os.path.isfile(filepath):
|
||
abort(404, description=f"File not found: {filename}")
|
||
|
||
with file_lock:
|
||
# Load master JSON
|
||
with open(filepath, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
# Find matching record
|
||
record = None
|
||
for r in data:
|
||
if (r.get("Article id") == article_id and
|
||
r.get("Language", "").lower() == language.lower()):
|
||
record = r
|
||
break
|
||
|
||
if not record:
|
||
abort(404, description=f"Record not found: {article_id} / {language}")
|
||
|
||
# Build changelog entry
|
||
changelog_entry = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"article_id": article_id,
|
||
"language": language,
|
||
"country": record.get("Country", ""),
|
||
"product_id": record.get("Product id", ""),
|
||
"action": action,
|
||
"edits": [],
|
||
}
|
||
|
||
# Apply edits if approving
|
||
if action == "approve" and edits:
|
||
for field in ["Product name", "Price"]:
|
||
if field in edits and edits[field] is not None:
|
||
old_val = record.get(field, "")
|
||
new_val = edits[field]
|
||
if str(old_val) != str(new_val):
|
||
changelog_entry["edits"].append({
|
||
"field": field,
|
||
"old": old_val,
|
||
"new": new_val,
|
||
})
|
||
record[field] = new_val
|
||
|
||
# Write master JSON back
|
||
with open(filepath, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
|
||
# Write changelog
|
||
_append_changelog(filename, changelog_entry)
|
||
|
||
return jsonify({"status": "ok", "action": action})
|
||
|
||
|
||
# ========== API: Unapprove ==========
|
||
@app.route("/api/unapprove", methods=["POST"])
|
||
def unapprove():
|
||
body = request.get_json()
|
||
if not body:
|
||
abort(400)
|
||
body["action"] = "unapprove"
|
||
# Reuse the approve logic for logging
|
||
return _handle_approval(body)
|
||
|
||
|
||
# ========== Image serving ==========
|
||
@app.route("/images/<year>/<campaign_prefix>/<path:filename>")
|
||
def serve_image(year, campaign_prefix, filename):
|
||
# Security: no path traversal
|
||
year = os.path.basename(year)
|
||
campaign_prefix = os.path.basename(campaign_prefix)
|
||
filename = os.path.basename(filename)
|
||
image_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
|
||
if not os.path.isdir(image_dir):
|
||
abort(404)
|
||
return send_from_directory(image_dir, filename)
|
||
|
||
|
||
# ========== Changelog helpers ==========
|
||
def _changelog_path(filename):
|
||
stem = os.path.splitext(filename)[0]
|
||
return os.path.join(MASTER_JSON_DIR, f"{stem}_changelog.json")
|
||
|
||
|
||
def _append_changelog(filename, entry):
|
||
path = _changelog_path(filename)
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
log = json.load(f)
|
||
else:
|
||
log = []
|
||
log.append(entry)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(log, f, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def _load_approvals(filename):
|
||
"""Load approval state from changelog - returns dict of approved keys."""
|
||
path = _changelog_path(filename)
|
||
approvals = {}
|
||
if not os.path.isfile(path):
|
||
return approvals
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
log = json.load(f)
|
||
for entry in log:
|
||
key = entry["article_id"] + "::" + entry["language"].lower()
|
||
if entry.get("action") == "approve":
|
||
approvals[key] = True
|
||
elif entry.get("action") == "unapprove":
|
||
approvals.pop(key, None)
|
||
return approvals
|
||
|
||
|
||
def _handle_approval(body):
|
||
"""Shared logic for approve/unapprove."""
|
||
filename = os.path.basename(body.get("filename", ""))
|
||
article_id = body.get("article_id", "")
|
||
language = body.get("language", "")
|
||
action = body.get("action", "approve")
|
||
|
||
if not filename or not article_id or not language:
|
||
abort(400)
|
||
|
||
filepath = os.path.join(MASTER_JSON_DIR, filename)
|
||
if not os.path.isfile(filepath):
|
||
abort(404)
|
||
|
||
with file_lock:
|
||
changelog_entry = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"article_id": article_id,
|
||
"language": language,
|
||
"action": action,
|
||
"edits": [],
|
||
}
|
||
_append_changelog(filename, changelog_entry)
|
||
|
||
return jsonify({"status": "ok", "action": action})
|
||
|
||
|
||
# ========== Run ==========
|
||
if __name__ == "__main__":
|
||
port = int(os.environ.get("PORT", 5000))
|
||
print(f"Master JSON folder: {MASTER_JSON_DIR}")
|
||
print(f"Image base path: {IMAGE_BASE_PATH}")
|
||
print(f"Starting server at http://localhost:{port}")
|
||
app.run(host="0.0.0.0", port=port, debug=False)
|