API endpoints now return 401 Unauthorized instead of redirecting to login page when session is invalid. JavaScript detects 401 and redirects the browser to login. This fixes the 'Unexpected token DOCTYPE' error when session expires. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
452 lines
14 KiB
Python
452 lines
14 KiB
Python
"""
|
||
Flask server for H&M EMS Product Review Tool.
|
||
|
||
Serves the review UI and provides API endpoints for:
|
||
- Listing available JSON campaign files
|
||
- Loading campaign data with image mappings
|
||
- Approving/saving edits back to the master JSON with change logging
|
||
|
||
Configuration (environment variables):
|
||
IMAGE_BASE_PATH – root folder containing year/campaign/Automation_LR images
|
||
(defaults to ./campaign_images)
|
||
MASTER_JSON_DIR – folder containing campaign JSON files
|
||
(defaults to ./Master_Json)
|
||
PORT – server port (defaults to 5000)
|
||
|
||
Usage:
|
||
python3 server.py
|
||
Then open http://localhost:5000
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import threading
|
||
from datetime import datetime
|
||
from functools import wraps
|
||
from flask import Flask, jsonify, request, send_from_directory, abort, session, redirect, url_for, render_template_string
|
||
|
||
from html_generator import (
|
||
LANGUAGE_DISPLAY_NAMES,
|
||
_get_campaign_prefix,
|
||
_get_main_image_filename,
|
||
_get_all_image_filenames,
|
||
)
|
||
|
||
# ========== Config ==========
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
MASTER_JSON_DIR = os.environ.get(
|
||
"MASTER_JSON_DIR",
|
||
os.path.join(BASE_DIR, "Master_Json"),
|
||
)
|
||
IMAGE_BASE_PATH = os.environ.get(
|
||
"IMAGE_BASE_PATH",
|
||
os.path.join(BASE_DIR, "campaign_images"),
|
||
)
|
||
|
||
# Thread lock for safe JSON read/write
|
||
file_lock = threading.Lock()
|
||
|
||
app = Flask(__name__, static_folder="static")
|
||
app.secret_key = os.environ.get("SECRET_KEY", "hm-ems-secret-key-change-in-production")
|
||
|
||
# Login credentials
|
||
USERNAME = "admin3_M"
|
||
PASSWORD = "Pa$$w0rd2026_!"
|
||
|
||
|
||
# Authentication decorator
|
||
def login_required(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
if not session.get('logged_in'):
|
||
# For AJAX/API requests, return 401 instead of redirect
|
||
if request.path.startswith('/api/') or request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
|
||
abort(401)
|
||
return redirect(url_for('login'))
|
||
return f(*args, **kwargs)
|
||
return decorated_function
|
||
|
||
|
||
# ========== Login ==========
|
||
LOGIN_HTML = """
|
||
<!DOCTYPE html>
|
||
<html lang="en">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<title>H&M EMS - Login</title>
|
||
<style>
|
||
* { margin: 0; padding: 0; box-sizing: border-box; }
|
||
body {
|
||
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Arial, sans-serif;
|
||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||
min-height: 100vh;
|
||
display: flex;
|
||
align-items: center;
|
||
justify-content: center;
|
||
}
|
||
.login-container {
|
||
background: white;
|
||
padding: 40px;
|
||
border-radius: 12px;
|
||
box-shadow: 0 10px 40px rgba(0,0,0,0.2);
|
||
width: 100%;
|
||
max-width: 400px;
|
||
}
|
||
.logo {
|
||
text-align: center;
|
||
margin-bottom: 30px;
|
||
}
|
||
.logo h1 {
|
||
color: #cc0000;
|
||
font-size: 32px;
|
||
font-weight: 700;
|
||
letter-spacing: 2px;
|
||
}
|
||
.logo p {
|
||
color: #666;
|
||
font-size: 14px;
|
||
margin-top: 5px;
|
||
}
|
||
.form-group {
|
||
margin-bottom: 20px;
|
||
}
|
||
label {
|
||
display: block;
|
||
margin-bottom: 8px;
|
||
color: #333;
|
||
font-weight: 500;
|
||
font-size: 14px;
|
||
}
|
||
input[type="text"],
|
||
input[type="password"] {
|
||
width: 100%;
|
||
padding: 12px 15px;
|
||
border: 2px solid #e0e0e0;
|
||
border-radius: 6px;
|
||
font-size: 14px;
|
||
transition: border-color 0.3s;
|
||
}
|
||
input[type="text"]:focus,
|
||
input[type="password"]:focus {
|
||
outline: none;
|
||
border-color: #667eea;
|
||
}
|
||
.btn-login {
|
||
width: 100%;
|
||
padding: 12px;
|
||
background: #cc0000;
|
||
color: white;
|
||
border: none;
|
||
border-radius: 6px;
|
||
font-size: 16px;
|
||
font-weight: 600;
|
||
cursor: pointer;
|
||
transition: background 0.3s;
|
||
}
|
||
.btn-login:hover {
|
||
background: #a30000;
|
||
}
|
||
.error {
|
||
background: #fee;
|
||
color: #c33;
|
||
padding: 12px;
|
||
border-radius: 6px;
|
||
margin-bottom: 20px;
|
||
font-size: 14px;
|
||
text-align: center;
|
||
}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="login-container">
|
||
<div class="logo">
|
||
<h1>H&M</h1>
|
||
<p>EMS Product Review Tool</p>
|
||
</div>
|
||
{% if error %}
|
||
<div class="error">{{ error }}</div>
|
||
{% endif %}
|
||
<form method="POST">
|
||
<div class="form-group">
|
||
<label for="username">Username</label>
|
||
<input type="text" id="username" name="username" required autofocus>
|
||
</div>
|
||
<div class="form-group">
|
||
<label for="password">Password</label>
|
||
<input type="password" id="password" name="password" required>
|
||
</div>
|
||
<button type="submit" class="btn-login">Sign In</button>
|
||
</form>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
error = None
|
||
if request.method == "POST":
|
||
username = request.form.get("username")
|
||
password = request.form.get("password")
|
||
|
||
if username == USERNAME and password == PASSWORD:
|
||
session['logged_in'] = True
|
||
return redirect(url_for('index'))
|
||
else:
|
||
error = "Invalid username or password"
|
||
|
||
return render_template_string(LOGIN_HTML, error=error)
|
||
|
||
|
||
@app.route("/logout")
|
||
def logout():
|
||
session.pop('logged_in', None)
|
||
return redirect(url_for('login'))
|
||
|
||
|
||
# ========== Static UI ==========
|
||
@app.route("/")
|
||
@login_required
|
||
def index():
|
||
return send_from_directory("static", "index.html")
|
||
|
||
|
||
# ========== API: List available JSON files ==========
|
||
@app.route("/api/files")
|
||
@login_required
|
||
def list_files():
|
||
files = []
|
||
for f in sorted(os.listdir(MASTER_JSON_DIR)):
|
||
if f.endswith(".json") and not f.endswith("_changelog.json"):
|
||
campaign = _get_campaign_prefix(f)
|
||
files.append({"filename": f, "campaign": campaign})
|
||
return jsonify(files)
|
||
|
||
|
||
# ========== API: Load campaign data ==========
|
||
@app.route("/api/load/<filename>")
|
||
@login_required
|
||
def load_campaign(filename):
|
||
# Security: strip path components
|
||
filename = os.path.basename(filename)
|
||
filepath = os.path.join(MASTER_JSON_DIR, filename)
|
||
|
||
if not os.path.isfile(filepath):
|
||
abort(404, description=f"File not found: {filename}")
|
||
|
||
with open(filepath, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
campaign_prefix = _get_campaign_prefix(filename)
|
||
|
||
# Derive year from Season field (first 4 chars, e.g. "202502" -> "2025")
|
||
season = data[0].get("Season", "") if data else ""
|
||
year = season[:4] if len(season) >= 4 else "2025"
|
||
|
||
image_source_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
|
||
|
||
# Build image map: Article id -> list of server image URLs
|
||
image_map = {}
|
||
for record in data:
|
||
article_id = record.get("Article id", "")
|
||
if article_id in image_map:
|
||
continue
|
||
img_names = _get_all_image_filenames(record.get("Filename", ""))
|
||
urls = []
|
||
for img_name in img_names:
|
||
if os.path.isfile(os.path.join(image_source_dir, img_name)):
|
||
urls.append(f"/hm-ems-report/images/{year}/{campaign_prefix}/Automation_LR/{img_name}")
|
||
image_map[article_id] = urls if urls else None
|
||
|
||
# Build language display names for languages present in data
|
||
languages = sorted(set(
|
||
r.get("Language", "").lower() for r in data if r.get("Language")
|
||
))
|
||
lang_display = {}
|
||
for lang in languages:
|
||
lang_display[lang] = LANGUAGE_DISPLAY_NAMES.get(lang, lang)
|
||
|
||
# Derive campaign info
|
||
campaign_name = data[0].get("Campaign", campaign_prefix) if data else campaign_prefix
|
||
season = data[0].get("Season", "") if data else ""
|
||
|
||
# Load existing approvals from changelog
|
||
approvals = _load_approvals(filename)
|
||
|
||
return jsonify({
|
||
"data": data,
|
||
"image_map": image_map,
|
||
"lang_display": lang_display,
|
||
"campaign": campaign_name,
|
||
"season": season,
|
||
"filename": filename,
|
||
"approvals": approvals,
|
||
})
|
||
|
||
|
||
# ========== API: Approve / save edits ==========
|
||
@app.route("/api/approve", methods=["POST"])
|
||
@login_required
|
||
def approve():
|
||
body = request.get_json()
|
||
if not body:
|
||
abort(400, description="Missing JSON body")
|
||
|
||
filename = os.path.basename(body.get("filename", ""))
|
||
article_id = body.get("article_id", "")
|
||
language = body.get("language", "")
|
||
edits = body.get("edits", {})
|
||
action = body.get("action", "approve") # "approve" or "unapprove"
|
||
|
||
if not filename or not article_id or not language:
|
||
abort(400, description="Missing required fields: filename, article_id, language")
|
||
|
||
filepath = os.path.join(MASTER_JSON_DIR, filename)
|
||
if not os.path.isfile(filepath):
|
||
abort(404, description=f"File not found: {filename}")
|
||
|
||
with file_lock:
|
||
# Load master JSON
|
||
with open(filepath, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
# Find matching record
|
||
record = None
|
||
for r in data:
|
||
if (r.get("Article id") == article_id and
|
||
r.get("Language", "").lower() == language.lower()):
|
||
record = r
|
||
break
|
||
|
||
if not record:
|
||
abort(404, description=f"Record not found: {article_id} / {language}")
|
||
|
||
# Build changelog entry
|
||
changelog_entry = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"article_id": article_id,
|
||
"language": language,
|
||
"country": record.get("Country", ""),
|
||
"product_id": record.get("Product id", ""),
|
||
"action": action,
|
||
"edits": [],
|
||
}
|
||
|
||
# Apply edits if approving
|
||
if action == "approve" and edits:
|
||
for field in ["Product name", "Price"]:
|
||
if field in edits and edits[field] is not None:
|
||
old_val = record.get(field, "")
|
||
new_val = edits[field]
|
||
if str(old_val) != str(new_val):
|
||
changelog_entry["edits"].append({
|
||
"field": field,
|
||
"old": old_val,
|
||
"new": new_val,
|
||
})
|
||
record[field] = new_val
|
||
|
||
# Write master JSON back
|
||
with open(filepath, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
|
||
# Write changelog
|
||
_append_changelog(filename, changelog_entry)
|
||
|
||
return jsonify({"status": "ok", "action": action})
|
||
|
||
|
||
# ========== API: Unapprove ==========
|
||
@app.route("/api/unapprove", methods=["POST"])
|
||
def unapprove():
|
||
body = request.get_json()
|
||
if not body:
|
||
abort(400)
|
||
body["action"] = "unapprove"
|
||
# Reuse the approve logic for logging
|
||
return _handle_approval(body)
|
||
|
||
|
||
# ========== Image serving ==========
|
||
@app.route("/images/<year>/<campaign_prefix>/<path:filename>")
|
||
def serve_image(year, campaign_prefix, filename):
|
||
# Security: no path traversal
|
||
year = os.path.basename(year)
|
||
campaign_prefix = os.path.basename(campaign_prefix)
|
||
filename = os.path.basename(filename)
|
||
image_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
|
||
if not os.path.isdir(image_dir):
|
||
abort(404)
|
||
return send_from_directory(image_dir, filename)
|
||
|
||
|
||
# ========== Changelog helpers ==========
|
||
def _changelog_path(filename):
|
||
stem = os.path.splitext(filename)[0]
|
||
return os.path.join(MASTER_JSON_DIR, f"{stem}_changelog.json")
|
||
|
||
|
||
def _append_changelog(filename, entry):
|
||
path = _changelog_path(filename)
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
log = json.load(f)
|
||
else:
|
||
log = []
|
||
log.append(entry)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(log, f, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def _load_approvals(filename):
|
||
"""Load approval state from changelog - returns dict of approved keys."""
|
||
path = _changelog_path(filename)
|
||
approvals = {}
|
||
if not os.path.isfile(path):
|
||
return approvals
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
log = json.load(f)
|
||
for entry in log:
|
||
key = entry["article_id"] + "::" + entry["language"].lower()
|
||
if entry.get("action") == "approve":
|
||
approvals[key] = True
|
||
elif entry.get("action") == "unapprove":
|
||
approvals.pop(key, None)
|
||
return approvals
|
||
|
||
|
||
def _handle_approval(body):
|
||
"""Shared logic for approve/unapprove."""
|
||
filename = os.path.basename(body.get("filename", ""))
|
||
article_id = body.get("article_id", "")
|
||
language = body.get("language", "")
|
||
action = body.get("action", "approve")
|
||
|
||
if not filename or not article_id or not language:
|
||
abort(400)
|
||
|
||
filepath = os.path.join(MASTER_JSON_DIR, filename)
|
||
if not os.path.isfile(filepath):
|
||
abort(404)
|
||
|
||
with file_lock:
|
||
changelog_entry = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"article_id": article_id,
|
||
"language": language,
|
||
"action": action,
|
||
"edits": [],
|
||
}
|
||
_append_changelog(filename, changelog_entry)
|
||
|
||
return jsonify({"status": "ok", "action": action})
|
||
|
||
|
||
# ========== Run ==========
|
||
if __name__ == "__main__":
|
||
port = int(os.environ.get("PORT", 5000))
|
||
print(f"Master JSON folder: {MASTER_JSON_DIR}")
|
||
print(f"Image base path: {IMAGE_BASE_PATH}")
|
||
print(f"Starting server at http://localhost:{port}")
|
||
app.run(host="0.0.0.0", port=port, debug=False)
|