hm_ems_report/server.py
Vadym Samoilenko 14d67b5f2b Add production deployment setup for Ubuntu with Apache
Configured application for deployment with Apache reverse proxy and systemd service. The app runs without Docker, using Python venv and Gunicorn.

Key changes:
- deploy.sh: Idempotent deployment script with backup, health checks
- systemd/hm-ems.service: Systemd unit for auto-start/restart
- DEPLOY.md: Complete deployment and troubleshooting guide
- .env.example: Configuration template
- Updated API paths: /api/* → /hm-ems/api/* for Apache routing
- Updated image URLs: /images/* → /hm-ems/images/* for Apache alias
- Updated CLAUDE.md: Added production deployment section

Architecture:
- Flask/Gunicorn on localhost:5000 (systemd service)
- Apache reverse proxy for /hm-ems/api/*
- Apache serves static files from /var/www/html/hm-ems-report/
- Apache serves images from /opt/hm-ems-data/campaign_images/
- Data persistence in /opt/hm-ems-data/ (JSON + images)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:30:54 +00:00

291 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Flask server for H&M EMS Product Review Tool.
Serves the review UI and provides API endpoints for:
- Listing available JSON campaign files
- Loading campaign data with image mappings
- Approving/saving edits back to the master JSON with change logging
Configuration (environment variables):
IMAGE_BASE_PATH root folder containing year/campaign/Automation_LR images
(defaults to ./campaign_images)
MASTER_JSON_DIR folder containing campaign JSON files
(defaults to ./Master_Json)
PORT server port (defaults to 5000)
Usage:
python3 server.py
Then open http://localhost:5000
"""
import os
import json
import threading
from datetime import datetime
from flask import Flask, jsonify, request, send_from_directory, abort
from html_generator import (
LANGUAGE_DISPLAY_NAMES,
_get_campaign_prefix,
_get_main_image_filename,
_get_all_image_filenames,
)
# ========== Config ==========
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MASTER_JSON_DIR = os.environ.get(
"MASTER_JSON_DIR",
os.path.join(BASE_DIR, "Master_Json"),
)
IMAGE_BASE_PATH = os.environ.get(
"IMAGE_BASE_PATH",
os.path.join(BASE_DIR, "campaign_images"),
)
# Thread lock for safe JSON read/write
file_lock = threading.Lock()
app = Flask(__name__, static_folder="static")
# ========== Static UI ==========
@app.route("/")
def index():
return send_from_directory("static", "index.html")
# ========== API: List available JSON files ==========
@app.route("/api/files")
def list_files():
files = []
for f in sorted(os.listdir(MASTER_JSON_DIR)):
if f.endswith(".json") and not f.endswith("_changelog.json"):
campaign = _get_campaign_prefix(f)
files.append({"filename": f, "campaign": campaign})
return jsonify(files)
# ========== API: Load campaign data ==========
@app.route("/api/load/<filename>")
def load_campaign(filename):
# Security: strip path components
filename = os.path.basename(filename)
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404, description=f"File not found: {filename}")
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
campaign_prefix = _get_campaign_prefix(filename)
# Derive year from Season field (first 4 chars, e.g. "202502" -> "2025")
season = data[0].get("Season", "") if data else ""
year = season[:4] if len(season) >= 4 else "2025"
image_source_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
# Build image map: Article id -> list of server image URLs
image_map = {}
for record in data:
article_id = record.get("Article id", "")
if article_id in image_map:
continue
img_names = _get_all_image_filenames(record.get("Filename", ""))
urls = []
for img_name in img_names:
if os.path.isfile(os.path.join(image_source_dir, img_name)):
urls.append(f"/hm-ems/images/{year}/{campaign_prefix}/{img_name}")
image_map[article_id] = urls if urls else None
# Build language display names for languages present in data
languages = sorted(set(
r.get("Language", "").lower() for r in data if r.get("Language")
))
lang_display = {}
for lang in languages:
lang_display[lang] = LANGUAGE_DISPLAY_NAMES.get(lang, lang)
# Derive campaign info
campaign_name = data[0].get("Campaign", campaign_prefix) if data else campaign_prefix
season = data[0].get("Season", "") if data else ""
# Load existing approvals from changelog
approvals = _load_approvals(filename)
return jsonify({
"data": data,
"image_map": image_map,
"lang_display": lang_display,
"campaign": campaign_name,
"season": season,
"filename": filename,
"approvals": approvals,
})
# ========== API: Approve / save edits ==========
@app.route("/api/approve", methods=["POST"])
def approve():
body = request.get_json()
if not body:
abort(400, description="Missing JSON body")
filename = os.path.basename(body.get("filename", ""))
article_id = body.get("article_id", "")
language = body.get("language", "")
edits = body.get("edits", {})
action = body.get("action", "approve") # "approve" or "unapprove"
if not filename or not article_id or not language:
abort(400, description="Missing required fields: filename, article_id, language")
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404, description=f"File not found: {filename}")
with file_lock:
# Load master JSON
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
# Find matching record
record = None
for r in data:
if (r.get("Article id") == article_id and
r.get("Language", "").lower() == language.lower()):
record = r
break
if not record:
abort(404, description=f"Record not found: {article_id} / {language}")
# Build changelog entry
changelog_entry = {
"timestamp": datetime.now().isoformat(),
"article_id": article_id,
"language": language,
"country": record.get("Country", ""),
"product_id": record.get("Product id", ""),
"action": action,
"edits": [],
}
# Apply edits if approving
if action == "approve" and edits:
for field in ["Product name", "Price"]:
if field in edits and edits[field] is not None:
old_val = record.get(field, "")
new_val = edits[field]
if str(old_val) != str(new_val):
changelog_entry["edits"].append({
"field": field,
"old": old_val,
"new": new_val,
})
record[field] = new_val
# Write master JSON back
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Write changelog
_append_changelog(filename, changelog_entry)
return jsonify({"status": "ok", "action": action})
# ========== API: Unapprove ==========
@app.route("/api/unapprove", methods=["POST"])
def unapprove():
body = request.get_json()
if not body:
abort(400)
body["action"] = "unapprove"
# Reuse the approve logic for logging
return _handle_approval(body)
# ========== Image serving ==========
@app.route("/images/<year>/<campaign_prefix>/<path:filename>")
def serve_image(year, campaign_prefix, filename):
# Security: no path traversal
year = os.path.basename(year)
campaign_prefix = os.path.basename(campaign_prefix)
filename = os.path.basename(filename)
image_dir = os.path.join(IMAGE_BASE_PATH, year, campaign_prefix, "Automation_LR")
if not os.path.isdir(image_dir):
abort(404)
return send_from_directory(image_dir, filename)
# ========== Changelog helpers ==========
def _changelog_path(filename):
stem = os.path.splitext(filename)[0]
return os.path.join(MASTER_JSON_DIR, f"{stem}_changelog.json")
def _append_changelog(filename, entry):
path = _changelog_path(filename)
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
log = json.load(f)
else:
log = []
log.append(entry)
with open(path, "w", encoding="utf-8") as f:
json.dump(log, f, indent=2, ensure_ascii=False)
def _load_approvals(filename):
"""Load approval state from changelog - returns dict of approved keys."""
path = _changelog_path(filename)
approvals = {}
if not os.path.isfile(path):
return approvals
with open(path, "r", encoding="utf-8") as f:
log = json.load(f)
for entry in log:
key = entry["article_id"] + "::" + entry["language"].lower()
if entry.get("action") == "approve":
approvals[key] = True
elif entry.get("action") == "unapprove":
approvals.pop(key, None)
return approvals
def _handle_approval(body):
"""Shared logic for approve/unapprove."""
filename = os.path.basename(body.get("filename", ""))
article_id = body.get("article_id", "")
language = body.get("language", "")
action = body.get("action", "approve")
if not filename or not article_id or not language:
abort(400)
filepath = os.path.join(MASTER_JSON_DIR, filename)
if not os.path.isfile(filepath):
abort(404)
with file_lock:
changelog_entry = {
"timestamp": datetime.now().isoformat(),
"article_id": article_id,
"language": language,
"action": action,
"edits": [],
}
_append_changelog(filename, changelog_entry)
return jsonify({"status": "ok", "action": action})
# ========== Run ==========
if __name__ == "__main__":
port = int(os.environ.get("PORT", 5000))
print(f"Master JSON folder: {MASTER_JSON_DIR}")
print(f"Image base path: {IMAGE_BASE_PATH}")
print(f"Starting server at http://localhost:{port}")
app.run(host="0.0.0.0", port=port, debug=False)