hm_ems_report/server.py
Vadym Samoilenko ad56b35835 Configure APPLICATION_ROOT for /hm-ems-report prefix
Flask now knows it's mounted at /hm-ems-report and builds correct URLs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 22:01:11 +00:00

457 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Flask server for H&M EMS Product Review Tool.
Serves the review UI and provides API endpoints for:
- Listing available JSON campaign files
- Loading campaign data with image mappings
- Approving/saving edits back to the master JSON with change logging
Configuration (environment variables):
IMAGE_BASE_PATH root folder containing year/campaign/Automation_LR images
(defaults to ./campaign_images)
MASTER_JSON_DIR folder containing campaign JSON files
(defaults to ./Master_Json)
PORT server port (defaults to 5000)
Usage:
python3 server.py
Then open http://localhost:5000
"""
import os
import json
import threading
from datetime import datetime
from functools import wraps
from flask import Flask, jsonify, request, send_from_directory, abort, session, redirect, url_for, render_template_string
from werkzeug.middleware.proxy_fix import ProxyFix
from html_generator import (
LANGUAGE_DISPLAY_NAMES,
_get_campaign_prefix,
_get_main_image_filename,
_get_all_image_filenames,
)
# ========== Config ==========
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MASTER_JSON_DIR = os.environ.get(
"MASTER_JSON_DIR",
os.path.join(BASE_DIR, "Master_Json"),
)
IMAGE_BASE_PATH = os.environ.get(
"IMAGE_BASE_PATH",
os.path.join(BASE_DIR, "campaign_images"),
)
# Thread lock for safe JSON read/write
file_lock = threading.Lock()
app = Flask(__name__, static_folder="static")
app.secret_key = os.environ.get("SECRET_KEY", "hm-ems-secret-key-change-in-production")
app.config['APPLICATION_ROOT'] = '/hm-ems-report'
# Configure for running behind Apache proxy
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
# Login credentials
USERNAME = "admin3_M"
PASSWORD = "Pa$$w0rd2026_!"
# Authentication decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
# For AJAX/API requests, return 401 JSON instead of redirect
if request.path.startswith('/api/') or request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({"error": "Unauthorized"}), 401
return redirect(app.config['APPLICATION_ROOT'] + '/login')
return f(*args, **kwargs)
return decorated_function
# ========== Login ==========
LOGIN_HTML = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>H&M EMS - Login</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Arial, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
.login-container {
background: white;
padding: 40px;
border-radius: 12px;
box-shadow: 0 10px 40px rgba(0,0,0,0.2);
width: 100%;
max-width: 400px;
}
.logo {
text-align: center;
margin-bottom: 30px;
}
.logo h1 {
color: #cc0000;
font-size: 32px;
font-weight: 700;
letter-spacing: 2px;
}
.logo p {
color: #666;
font-size: 14px;
margin-top: 5px;
}
.form-group {
margin-bottom: 20px;
}
label {
display: block;
margin-bottom: 8px;
color: #333;
font-weight: 500;
font-size: 14px;
}
input[type="text"],
input[type="password"] {
width: 100%;
padding: 12px 15px;
border: 2px solid #e0e0e0;
border-radius: 6px;
font-size: 14px;
transition: border-color 0.3s;
}
input[type="text"]:focus,
input[type="password"]:focus {
outline: none;
border-color: #667eea;
}
.btn-login {
width: 100%;
padding: 12px;
background: #cc0000;
color: white;
border: none;
border-radius: 6px;
font-size: 16px;
font-weight: 600;
cursor: pointer;
transition: background 0.3s;
}
.btn-login:hover {
background: #a30000;
}
.error {
background: #fee;
color: #c33;
padding: 12px;
border-radius: 6px;
margin-bottom: 20px;
font-size: 14px;
text-align: center;
}
</style>
</head>
<body>
<div class="login-container">
<div class="logo">
<h1>H&M</h1>
<p>EMS Product Review Tool</p>
</div>
{% if error %}
<div class="error">{{ error }}</div>
{% endif %}
<form method="POST">
<div class="form-group">
<label for="username">Username</label>
<input type="text" id="username" name="username" required autofocus>
</div>
<div class="form-group">
<label for="password">Password</label>
<input type="password" id="password" name="password" required>
</div>
<button type="submit" class="btn-login">Sign In</button>
</form>
</div>
</body>
</html>
"""
@app.route("/login", methods=["GET", "POST"])
def login():
error = None
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if username == USERNAME and password == PASSWORD:
session['logged_in'] = True
return redirect(app.config['APPLICATION_ROOT'] + '/')
else:
error = "Invalid username or password"
return render_template_string(LOGIN_HTML, error=error)
@app.route("/logout")
def logout():
session.pop('logged_in', None)
return redirect(app.config['APPLICATION_ROOT'] + '/login')
# ========== Static UI ==========
@app.route("/")
@login_required
def index():
return send_from_directory("static", "index.html")
# ========== API: List available JSON files ==========
@app.route("/api/files")
@login_required
def list_files():
files = []
for f in sorted(os.listdir(MASTER_JSON_DIR)):
if f.endswith(".json") and not f.endswith("_changelog.json"):
campaign = _get_campaign_prefix(f)
files.append({"filename": f, "campaign": campaign})
return jsonify(files)
# ========== API: Load campaign data ==========
@app.route("/api/load/<filename>")
@login_required
def load_campaign(filename):
# Security: strip path components
filename = os.path.basename(filename)
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404, description=f"File not found: {filename}")
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
campaign_prefix = _get_campaign_prefix(filename)
# Derive year from Season field (first 4 chars, e.g. "202502" -> "2025")
season = data[0].get("Season", "") if data else ""
year = season[:4] if len(season) >= 4 else "2025"
image_source_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
# Build image map: Article id -> list of server image URLs
image_map = {}
for record in data:
article_id = record.get("Article id", "")
if article_id in image_map:
continue
img_names = _get_all_image_filenames(record.get("Filename", ""))
urls = []
for img_name in img_names:
if os.path.isfile(os.path.join(image_source_dir, img_name)):
urls.append(f"/hm-ems-report/images/{year}/{campaign_prefix}/Automation_LR/{img_name}")
image_map[article_id] = urls if urls else None
# Build language display names for languages present in data
languages = sorted(set(
r.get("Language", "").lower() for r in data if r.get("Language")
))
lang_display = {}
for lang in languages:
lang_display[lang] = LANGUAGE_DISPLAY_NAMES.get(lang, lang)
# Derive campaign info
campaign_name = data[0].get("Campaign", campaign_prefix) if data else campaign_prefix
season = data[0].get("Season", "") if data else ""
# Load existing approvals from changelog
approvals = _load_approvals(filename)
return jsonify({
"data": data,
"image_map": image_map,
"lang_display": lang_display,
"campaign": campaign_name,
"season": season,
"filename": filename,
"approvals": approvals,
})
# ========== API: Approve / save edits ==========
@app.route("/api/approve", methods=["POST"])
@login_required
def approve():
body = request.get_json()
if not body:
abort(400, description="Missing JSON body")
filename = os.path.basename(body.get("filename", ""))
article_id = body.get("article_id", "")
language = body.get("language", "")
edits = body.get("edits", {})
action = body.get("action", "approve") # "approve" or "unapprove"
if not filename or not article_id or not language:
abort(400, description="Missing required fields: filename, article_id, language")
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404, description=f"File not found: {filename}")
with file_lock:
# Load master JSON
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
# Find matching record
record = None
for r in data:
if (r.get("Article id") == article_id and
r.get("Language", "").lower() == language.lower()):
record = r
break
if not record:
abort(404, description=f"Record not found: {article_id} / {language}")
# Build changelog entry
changelog_entry = {
"timestamp": datetime.now().isoformat(),
"article_id": article_id,
"language": language,
"country": record.get("Country", ""),
"product_id": record.get("Product id", ""),
"action": action,
"edits": [],
}
# Apply edits if approving
if action == "approve" and edits:
for field in ["Product name", "Price"]:
if field in edits and edits[field] is not None:
old_val = record.get(field, "")
new_val = edits[field]
if str(old_val) != str(new_val):
changelog_entry["edits"].append({
"field": field,
"old": old_val,
"new": new_val,
})
record[field] = new_val
# Write master JSON back
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Write changelog
_append_changelog(filename, changelog_entry)
return jsonify({"status": "ok", "action": action})
# ========== API: Unapprove ==========
@app.route("/api/unapprove", methods=["POST"])
def unapprove():
body = request.get_json()
if not body:
abort(400)
body["action"] = "unapprove"
# Reuse the approve logic for logging
return _handle_approval(body)
# ========== Image serving ==========
@app.route("/images/<year>/<campaign_prefix>/<path:filename>")
def serve_image(year, campaign_prefix, filename):
# Security: no path traversal
year = os.path.basename(year)
campaign_prefix = os.path.basename(campaign_prefix)
filename = os.path.basename(filename)
image_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
if not os.path.isdir(image_dir):
abort(404)
return send_from_directory(image_dir, filename)
# ========== Changelog helpers ==========
def _changelog_path(filename):
stem = os.path.splitext(filename)[0]
return os.path.join(MASTER_JSON_DIR, f"{stem}_changelog.json")
def _append_changelog(filename, entry):
path = _changelog_path(filename)
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
log = json.load(f)
else:
log = []
log.append(entry)
with open(path, "w", encoding="utf-8") as f:
json.dump(log, f, indent=2, ensure_ascii=False)
def _load_approvals(filename):
"""Load approval state from changelog - returns dict of approved keys."""
path = _changelog_path(filename)
approvals = {}
if not os.path.isfile(path):
return approvals
with open(path, "r", encoding="utf-8") as f:
log = json.load(f)
for entry in log:
key = entry["article_id"] + "::" + entry["language"].lower()
if entry.get("action") == "approve":
approvals[key] = True
elif entry.get("action") == "unapprove":
approvals.pop(key, None)
return approvals
def _handle_approval(body):
"""Shared logic for approve/unapprove."""
filename = os.path.basename(body.get("filename", ""))
article_id = body.get("article_id", "")
language = body.get("language", "")
action = body.get("action", "approve")
if not filename or not article_id or not language:
abort(400)
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404)
with file_lock:
changelog_entry = {
"timestamp": datetime.now().isoformat(),
"article_id": article_id,
"language": language,
"action": action,
"edits": [],
}
_append_changelog(filename, changelog_entry)
return jsonify({"status": "ok", "action": action})
# ========== Run ==========
if __name__ == "__main__":
port = int(os.environ.get("PORT", 5000))
print(f"Master JSON folder: {MASTER_JSON_DIR}")
print(f"Image base path: {IMAGE_BASE_PATH}")
print(f"Starting server at http://localhost:{port}")
app.run(host="0.0.0.0", port=port, debug=False)