- Activity log: import_log DB table, GET /activity endpoint, collapsible panel in Update Data tab showing who changed what score and when - Sync logging: Box syncs now also write to import_log with before/after scores - Multilingual: ES, PT, IT, PL UI strings in i18n/ui.js; content translations (pillars, scoring labels, About tab) in clients/adeo/config.json; language switcher upgraded from EN/FR toggle to 6-language dropdown - cfg() and pillarDisplayName() generalised to support any language (not just FR) - import_file.py: reads 'Final QA ...' column first (falls back to Score) so QA-updated XLSX files produce the correct overall score on import - convert_data.py/docker-compose: use ADEO_DATA_ROOT env var for Box path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
374 lines
14 KiB
Python
374 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Maturity Tool — Data Converter
|
|
Usage: python3 convert_data.py adeo
|
|
|
|
Reads source CSVs/XLSXs for the given client and writes:
|
|
clients/{client}/data.json
|
|
"""
|
|
|
|
import csv
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import date
|
|
from pathlib import Path
|
|
|
|
# ── Paths ─────────────────────────────────────────────────────────────────────
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
BOX_ROOT = Path(os.environ.get("ADEO_DATA_ROOT", "/Users/phildore/Library/CloudStorage/Box-Box/ADEO_EXTERNAL_SHARE"))
|
|
|
|
# ── Score helpers ──────────────────────────────────────────────────────────────
|
|
def parse_score(s):
|
|
"""Extract numeric score from strings like '3', '3.5', '3 (Master)'. Returns None if unparseable."""
|
|
m = re.search(r"\d+(?:\.\d+)?", str(s))
|
|
return float(m.group()) if m else None
|
|
|
|
def score_to_level(score, scoring):
|
|
"""Map a numeric score to 1-based tier index within the configured scale."""
|
|
if score is None:
|
|
return None
|
|
mn, mx = scoring["min"], scoring["max"]
|
|
num_levels = len(scoring["labels"])
|
|
# Evenly divide the scale into num_levels buckets
|
|
bucket = (mx - mn) / num_levels
|
|
for i in range(1, num_levels + 1):
|
|
if score <= mn + bucket * i:
|
|
return i
|
|
return num_levels
|
|
|
|
def level_label(level, scoring):
|
|
if level is None:
|
|
return ""
|
|
return scoring["labels"].get(str(level), "")
|
|
|
|
# ── Column normalisation ───────────────────────────────────────────────────────
|
|
def normalise_row(raw, schema):
|
|
"""Return canonical dict regardless of CSV schema variant."""
|
|
if schema == "A":
|
|
q_num = raw.get("Q#", "").strip()
|
|
topic = raw.get("Question", "").strip()
|
|
pillar = raw.get("Pillar", "").strip().upper()
|
|
score_s = raw.get("Score", "")
|
|
label = raw.get("Score_Label", "").strip()
|
|
rationale = raw.get("Scoring_Rationale", "").strip()
|
|
gaps = raw.get("Gaps_Identified", "").strip()
|
|
refs = raw.get("Evidence_References", "").strip()
|
|
elif schema == "B":
|
|
q_num = raw.get("Question_Number", "").strip()
|
|
topic = raw.get("Question_Topic", "").strip()
|
|
pillar = raw.get("Pillar", "").strip().upper()
|
|
score_s = raw.get("Score_Numeric", "")
|
|
label = raw.get("Score_Label", "").strip()
|
|
rationale = raw.get("Rationale", "").strip()
|
|
gaps = raw.get("Gaps", "").strip()
|
|
refs = raw.get("References_Used", "").strip()
|
|
else:
|
|
raise ValueError(f"Unknown schema: {schema}")
|
|
|
|
score_num = parse_score(score_s)
|
|
return {
|
|
"q_num": q_num,
|
|
"topic": topic,
|
|
"pillar": pillar,
|
|
"score": score_num,
|
|
"label": label,
|
|
"rationale": rationale,
|
|
"gaps": gaps,
|
|
"refs": refs,
|
|
}
|
|
|
|
def normalise_xlsx_row(row_dict, schema="B"):
|
|
"""Same as normalise_row but input already keyed by column header (from openpyxl)."""
|
|
return normalise_row(row_dict, schema)
|
|
|
|
# ── ADEO source definitions ────────────────────────────────────────────────────
|
|
ADEO_SOURCES = [
|
|
{
|
|
"id": "BU_LM_FRANCE",
|
|
"label": "Leroy Merlin France",
|
|
"short": "LM France",
|
|
"group": "Leroy Merlin",
|
|
"file": BOX_ROOT / "BU_LM_FRANCE/05_MATURITY/01_OUTPUTS/CSV/BU_LM_FRANCE_Maturity_Scores_Consolidated_v3.xlsx",
|
|
"schema": "B",
|
|
"xlsx": True,
|
|
},
|
|
{
|
|
"id": "BU_LM_ITALY",
|
|
"label": "Leroy Merlin Italy",
|
|
"short": "LM Italy",
|
|
"group": "Leroy Merlin",
|
|
"file": BOX_ROOT / "BU_LM_ITALY/06_MATURITY_GRID/02_OUTPUTS/V3/CSV/BU_LM_ITALY_Maturity_Scores_Consolidated_v1.csv",
|
|
"schema": "B",
|
|
"xlsx": False,
|
|
},
|
|
{
|
|
"id": "BU_LM_POLAND",
|
|
"label": "Leroy Merlin Poland",
|
|
"short": "LM Poland",
|
|
"group": "Leroy Merlin",
|
|
"file": BOX_ROOT / "BU_LM_POLAND/05_MATURITY/01_OUTPUTS/CSV/BU_LM_POLAND_Maturity_Scores_Consolidated_v3.xlsx",
|
|
"schema": "B",
|
|
"xlsx": True,
|
|
},
|
|
{
|
|
"id": "BU_LM_SPAIN",
|
|
"label": "Leroy Merlin Spain",
|
|
"short": "LM Spain",
|
|
"group": "Leroy Merlin",
|
|
"file": BOX_ROOT / "BU_LM_SPAIN/06_MATURITY/01_OUTPUT/CSV/V4/BU_LM_SPAIN_Maturity_Scores_Consolidated_v5.xlsx",
|
|
"schema": "B",
|
|
"xlsx": True,
|
|
},
|
|
{
|
|
"id": "BU_LM_PORTUGAL",
|
|
"label": "Leroy Merlin Portugal",
|
|
"short": "LM Portugal",
|
|
"group": "Leroy Merlin",
|
|
"file": BOX_ROOT / "BU_LM_PORTUGAL copy/05_MATURITY/02_OUTPUTS/CSV/BU_LM_PORTUGAL_Maturity_Scores_Consolidated_v5.csv",
|
|
"schema": "B",
|
|
"xlsx": False,
|
|
},
|
|
{
|
|
"id": "BU_M_SPAIN",
|
|
"label": "Obramat Spain",
|
|
"short": "Obramat Spain",
|
|
"group": "Obramat",
|
|
"file": BOX_ROOT / "BU_M_SPAIN/08_ASSESSMENT/02_Round_2/CSV/BU_M_SPAIN_Maturity_Scores_Consolidated_v1.csv",
|
|
"schema": "B",
|
|
"xlsx": False,
|
|
},
|
|
{
|
|
"id": "BU_M_BRAZIL",
|
|
"label": "Obramax Brazil",
|
|
"short": "Obramax Brazil",
|
|
"group": "Obramax",
|
|
"file": BOX_ROOT / "BU_M_BRAZIL/06_ASSESSMENT/02_Round_2/CSV/BU_M_BRAZIL_Maturity_Scores_Consolidated_v1.csv",
|
|
"schema": "B",
|
|
"xlsx": False,
|
|
},
|
|
{
|
|
"id": "BU_M_ITALY",
|
|
"label": "Tecnomat Italy",
|
|
"short": "Tecnomat Italy",
|
|
"group": "Tecnomat",
|
|
"file": BOX_ROOT / "BU_M_ITALY/06_ASSESSMENT/02_Round_2/CSV/BU_M_ITALY_Maturity_Scores_Consolidated_v1.csv",
|
|
"schema": "B",
|
|
"xlsx": False,
|
|
},
|
|
]
|
|
|
|
# ── Parsers ────────────────────────────────────────────────────────────────────
|
|
def read_csv_rows(path, schema):
|
|
rows = []
|
|
with open(path, encoding="utf-8-sig", newline="") as f:
|
|
for raw in csv.DictReader(f):
|
|
row = normalise_row(raw, schema)
|
|
if row["score"] is None:
|
|
continue
|
|
rows.append(row)
|
|
return rows
|
|
|
|
def detect_xlsx_schema(headers):
|
|
"""Pick the right schema based on whichever Q# column name is present."""
|
|
h = [h.lower() for h in headers]
|
|
if "question_number" in h:
|
|
return "B"
|
|
if "q#" in h:
|
|
# Spain V4 XLSX — Q#, Question Topic, Score, Level, Rationale, Gaps, References Used
|
|
return "XLSX_A"
|
|
return "B"
|
|
|
|
def normalise_xlsx_a_row(raw):
|
|
"""Spain V4 XLSX variant: Q#, Question Topic, Score, Level, Rationale, Gaps, References Used."""
|
|
score_s = raw.get("Score", "")
|
|
score_num = parse_score(score_s)
|
|
return {
|
|
"q_num": str(raw.get("Q#", "")).strip(),
|
|
"topic": raw.get("Question Topic", "").strip(),
|
|
"pillar": raw.get("Pillar", "").strip().upper(),
|
|
"score": score_num,
|
|
"label": raw.get("Level", "").strip(),
|
|
"rationale": raw.get("Rationale", "").strip(),
|
|
"gaps": raw.get("Gaps", "").strip(),
|
|
"refs": raw.get("References Used", "").strip(),
|
|
}
|
|
|
|
def read_xlsx_rows(path, schema):
|
|
try:
|
|
import openpyxl
|
|
except ImportError:
|
|
print("ERROR: openpyxl required for XLSX files. Run: pip install openpyxl")
|
|
sys.exit(1)
|
|
|
|
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
|
|
ws = wb.active
|
|
headers = None
|
|
detected_schema = schema
|
|
rows = []
|
|
for excel_row in ws.iter_rows(values_only=True):
|
|
if headers is None:
|
|
headers = [str(c).strip() if c else "" for c in excel_row]
|
|
detected_schema = detect_xlsx_schema(headers)
|
|
continue
|
|
raw = dict(zip(headers, [str(c).strip() if c is not None else "" for c in excel_row]))
|
|
if detected_schema == "XLSX_A":
|
|
row = normalise_xlsx_a_row(raw)
|
|
else:
|
|
row = normalise_xlsx_row(raw, detected_schema)
|
|
if row["score"] is None:
|
|
continue
|
|
rows.append(row)
|
|
wb.close()
|
|
return rows
|
|
|
|
def parse_entity(source, scoring, pillar_order):
|
|
path = source["file"]
|
|
if not path.exists():
|
|
print(f" WARNING: file not found — {path}")
|
|
return None
|
|
|
|
rows = read_xlsx_rows(path, source["schema"]) if source["xlsx"] else read_csv_rows(path, source["schema"])
|
|
|
|
by_pillar = defaultdict(list)
|
|
for row in rows:
|
|
by_pillar[row["pillar"]].append(row)
|
|
|
|
pillars_out = []
|
|
all_scores = []
|
|
for pname in pillar_order:
|
|
qs = by_pillar.get(pname, [])
|
|
if not qs:
|
|
continue
|
|
scores = [q["score"] for q in qs]
|
|
avg = round(sum(scores) / len(scores), 2)
|
|
all_scores.extend(scores)
|
|
level = score_to_level(avg, scoring)
|
|
pillars_out.append({
|
|
"name": pname,
|
|
"avg": avg,
|
|
"level": level,
|
|
"label": level_label(level, scoring),
|
|
"questions": [
|
|
{
|
|
"q_num": q["q_num"],
|
|
"topic": q["topic"],
|
|
"score": int(q["score"]) if q["score"] == int(q["score"]) else q["score"],
|
|
"level": score_to_level(q["score"], scoring),
|
|
"label": q["label"] or level_label(score_to_level(q["score"], scoring), scoring),
|
|
"rationale": q["rationale"],
|
|
"gaps": q["gaps"],
|
|
"refs": q["refs"],
|
|
}
|
|
for q in qs
|
|
],
|
|
})
|
|
|
|
if not all_scores:
|
|
print(f" WARNING: no valid scores found for {source['id']}")
|
|
return None
|
|
|
|
overall = round(sum(all_scores) / len(all_scores), 2)
|
|
overall_level = score_to_level(overall, scoring)
|
|
return {
|
|
"id": source["id"],
|
|
"label": source["label"],
|
|
"short": source["short"],
|
|
"group": source["group"],
|
|
"overall_score": overall,
|
|
"overall_level": overall_level,
|
|
"overall_label": level_label(overall_level, scoring),
|
|
"synced": date.today().isoformat(),
|
|
"pillars": pillars_out,
|
|
}
|
|
|
|
# ── Client handlers ────────────────────────────────────────────────────────────
|
|
def build_adeo(config, entity_filter=None):
|
|
scoring = config["scoring"]
|
|
pillar_order = config["pillars"]
|
|
sources = ADEO_SOURCES
|
|
if entity_filter:
|
|
sources = [s for s in ADEO_SOURCES if s["id"] == entity_filter]
|
|
if not sources:
|
|
print(f"ERROR: entity '{entity_filter}' not found in ADEO sources")
|
|
sys.exit(1)
|
|
entities = []
|
|
for source in sources:
|
|
print(f" Reading {source['short']}…")
|
|
entity = parse_entity(source, scoring, pillar_order)
|
|
if entity:
|
|
entities.append(entity)
|
|
print(f" → {entity['overall_score']} ({entity['overall_label']})")
|
|
entities.sort(key=lambda e: e["overall_score"], reverse=True)
|
|
return entities
|
|
|
|
CLIENT_BUILDERS = {
|
|
"adeo": build_adeo,
|
|
}
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("client_id", nargs="?", default="adeo")
|
|
parser.add_argument("--entity", default=None, help="Sync only this entity ID")
|
|
args = parser.parse_args()
|
|
client_id = args.client_id
|
|
entity_filter = args.entity
|
|
|
|
config_path = SCRIPT_DIR / "clients" / client_id / "config.json"
|
|
if not config_path.exists():
|
|
print(f"ERROR: config not found at {config_path}")
|
|
sys.exit(1)
|
|
|
|
with open(config_path, encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
|
|
if client_id not in CLIENT_BUILDERS:
|
|
print(f"ERROR: no builder registered for client '{client_id}'")
|
|
print(f" Available: {list(CLIENT_BUILDERS.keys())}")
|
|
sys.exit(1)
|
|
|
|
print(f"Building data for client: {config['name']}" + (f" [entity: {entity_filter}]" if entity_filter else ""))
|
|
entities = CLIENT_BUILDERS[client_id](config, entity_filter=entity_filter)
|
|
|
|
out_path = SCRIPT_DIR / "clients" / client_id / "data.json"
|
|
|
|
if entity_filter:
|
|
# Merge updated entity into existing data, preserve all others
|
|
existing_entities = []
|
|
if out_path.exists():
|
|
with open(out_path, encoding="utf-8") as f:
|
|
existing = json.load(f)
|
|
existing_entities = existing.get("entities", [])
|
|
updated = {e["id"]: e for e in entities}
|
|
merged = [updated.get(e["id"], e) for e in existing_entities]
|
|
# Add if not already present
|
|
existing_ids = {e["id"] for e in existing_entities}
|
|
for e in entities:
|
|
if e["id"] not in existing_ids:
|
|
merged.append(e)
|
|
merged.sort(key=lambda e: e["overall_score"], reverse=True)
|
|
output = {
|
|
"generated": date.today().isoformat(),
|
|
"client_id": client_id,
|
|
"entities": merged,
|
|
}
|
|
else:
|
|
output = {
|
|
"generated": date.today().isoformat(),
|
|
"client_id": client_id,
|
|
"entities": entities,
|
|
}
|
|
|
|
with open(out_path, "w", encoding="utf-8") as f:
|
|
json.dump(output, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n✓ Written {len(output['entities'])} entities → {out_path}")
|
|
scores_str = " · ".join(f"{e['short']} {e['overall_score']}" for e in output["entities"])
|
|
print(f" Scores: {scores_str}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|