adeo-maturity-tool/convert_data.py
Phil Dore b97889a9d9 Add activity log, multilingual support (ES/PT/IT/PL), and QA score column fix
- Activity log: import_log DB table, GET /activity endpoint, collapsible
  panel in Update Data tab showing who changed what score and when
- Sync logging: Box syncs now also write to import_log with before/after scores
- Multilingual: ES, PT, IT, PL UI strings in i18n/ui.js; content translations
  (pillars, scoring labels, About tab) in clients/adeo/config.json; language
  switcher upgraded from EN/FR toggle to 6-language dropdown
- cfg() and pillarDisplayName() generalised to support any language (not just FR)
- import_file.py: reads 'Final QA ...' column first (falls back to Score) so
  QA-updated XLSX files produce the correct overall score on import
- convert_data.py/docker-compose: use ADEO_DATA_ROOT env var for Box path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 13:23:22 +01:00

374 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Maturity Tool — Data Converter
Usage: python3 convert_data.py adeo
Reads source CSVs/XLSXs for the given client and writes:
clients/{client}/data.json
"""
import csv
import json
import os
import re
import sys
from collections import defaultdict
from datetime import date
from pathlib import Path
# ── Paths ─────────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).parent
BOX_ROOT = Path(os.environ.get("ADEO_DATA_ROOT", "/Users/phildore/Library/CloudStorage/Box-Box/ADEO_EXTERNAL_SHARE"))
# ── Score helpers ──────────────────────────────────────────────────────────────
def parse_score(s):
"""Extract numeric score from strings like '3', '3.5', '3 (Master)'. Returns None if unparseable."""
m = re.search(r"\d+(?:\.\d+)?", str(s))
return float(m.group()) if m else None
def score_to_level(score, scoring):
"""Map a numeric score to 1-based tier index within the configured scale."""
if score is None:
return None
mn, mx = scoring["min"], scoring["max"]
num_levels = len(scoring["labels"])
# Evenly divide the scale into num_levels buckets
bucket = (mx - mn) / num_levels
for i in range(1, num_levels + 1):
if score <= mn + bucket * i:
return i
return num_levels
def level_label(level, scoring):
if level is None:
return ""
return scoring["labels"].get(str(level), "")
# ── Column normalisation ───────────────────────────────────────────────────────
def normalise_row(raw, schema):
"""Return canonical dict regardless of CSV schema variant."""
if schema == "A":
q_num = raw.get("Q#", "").strip()
topic = raw.get("Question", "").strip()
pillar = raw.get("Pillar", "").strip().upper()
score_s = raw.get("Score", "")
label = raw.get("Score_Label", "").strip()
rationale = raw.get("Scoring_Rationale", "").strip()
gaps = raw.get("Gaps_Identified", "").strip()
refs = raw.get("Evidence_References", "").strip()
elif schema == "B":
q_num = raw.get("Question_Number", "").strip()
topic = raw.get("Question_Topic", "").strip()
pillar = raw.get("Pillar", "").strip().upper()
score_s = raw.get("Score_Numeric", "")
label = raw.get("Score_Label", "").strip()
rationale = raw.get("Rationale", "").strip()
gaps = raw.get("Gaps", "").strip()
refs = raw.get("References_Used", "").strip()
else:
raise ValueError(f"Unknown schema: {schema}")
score_num = parse_score(score_s)
return {
"q_num": q_num,
"topic": topic,
"pillar": pillar,
"score": score_num,
"label": label,
"rationale": rationale,
"gaps": gaps,
"refs": refs,
}
def normalise_xlsx_row(row_dict, schema="B"):
"""Same as normalise_row but input already keyed by column header (from openpyxl)."""
return normalise_row(row_dict, schema)
# ── ADEO source definitions ────────────────────────────────────────────────────
ADEO_SOURCES = [
{
"id": "BU_LM_FRANCE",
"label": "Leroy Merlin France",
"short": "LM France",
"group": "Leroy Merlin",
"file": BOX_ROOT / "BU_LM_FRANCE/05_MATURITY/01_OUTPUTS/CSV/BU_LM_FRANCE_Maturity_Scores_Consolidated_v3.xlsx",
"schema": "B",
"xlsx": True,
},
{
"id": "BU_LM_ITALY",
"label": "Leroy Merlin Italy",
"short": "LM Italy",
"group": "Leroy Merlin",
"file": BOX_ROOT / "BU_LM_ITALY/06_MATURITY_GRID/02_OUTPUTS/V3/CSV/BU_LM_ITALY_Maturity_Scores_Consolidated_v1.csv",
"schema": "B",
"xlsx": False,
},
{
"id": "BU_LM_POLAND",
"label": "Leroy Merlin Poland",
"short": "LM Poland",
"group": "Leroy Merlin",
"file": BOX_ROOT / "BU_LM_POLAND/05_MATURITY/01_OUTPUTS/CSV/BU_LM_POLAND_Maturity_Scores_Consolidated_v3.xlsx",
"schema": "B",
"xlsx": True,
},
{
"id": "BU_LM_SPAIN",
"label": "Leroy Merlin Spain",
"short": "LM Spain",
"group": "Leroy Merlin",
"file": BOX_ROOT / "BU_LM_SPAIN/06_MATURITY/01_OUTPUT/CSV/V4/BU_LM_SPAIN_Maturity_Scores_Consolidated_v5.xlsx",
"schema": "B",
"xlsx": True,
},
{
"id": "BU_LM_PORTUGAL",
"label": "Leroy Merlin Portugal",
"short": "LM Portugal",
"group": "Leroy Merlin",
"file": BOX_ROOT / "BU_LM_PORTUGAL copy/05_MATURITY/02_OUTPUTS/CSV/BU_LM_PORTUGAL_Maturity_Scores_Consolidated_v5.csv",
"schema": "B",
"xlsx": False,
},
{
"id": "BU_M_SPAIN",
"label": "Obramat Spain",
"short": "Obramat Spain",
"group": "Obramat",
"file": BOX_ROOT / "BU_M_SPAIN/08_ASSESSMENT/02_Round_2/CSV/BU_M_SPAIN_Maturity_Scores_Consolidated_v1.csv",
"schema": "B",
"xlsx": False,
},
{
"id": "BU_M_BRAZIL",
"label": "Obramax Brazil",
"short": "Obramax Brazil",
"group": "Obramax",
"file": BOX_ROOT / "BU_M_BRAZIL/06_ASSESSMENT/02_Round_2/CSV/BU_M_BRAZIL_Maturity_Scores_Consolidated_v1.csv",
"schema": "B",
"xlsx": False,
},
{
"id": "BU_M_ITALY",
"label": "Tecnomat Italy",
"short": "Tecnomat Italy",
"group": "Tecnomat",
"file": BOX_ROOT / "BU_M_ITALY/06_ASSESSMENT/02_Round_2/CSV/BU_M_ITALY_Maturity_Scores_Consolidated_v1.csv",
"schema": "B",
"xlsx": False,
},
]
# ── Parsers ────────────────────────────────────────────────────────────────────
def read_csv_rows(path, schema):
rows = []
with open(path, encoding="utf-8-sig", newline="") as f:
for raw in csv.DictReader(f):
row = normalise_row(raw, schema)
if row["score"] is None:
continue
rows.append(row)
return rows
def detect_xlsx_schema(headers):
"""Pick the right schema based on whichever Q# column name is present."""
h = [h.lower() for h in headers]
if "question_number" in h:
return "B"
if "q#" in h:
# Spain V4 XLSX — Q#, Question Topic, Score, Level, Rationale, Gaps, References Used
return "XLSX_A"
return "B"
def normalise_xlsx_a_row(raw):
"""Spain V4 XLSX variant: Q#, Question Topic, Score, Level, Rationale, Gaps, References Used."""
score_s = raw.get("Score", "")
score_num = parse_score(score_s)
return {
"q_num": str(raw.get("Q#", "")).strip(),
"topic": raw.get("Question Topic", "").strip(),
"pillar": raw.get("Pillar", "").strip().upper(),
"score": score_num,
"label": raw.get("Level", "").strip(),
"rationale": raw.get("Rationale", "").strip(),
"gaps": raw.get("Gaps", "").strip(),
"refs": raw.get("References Used", "").strip(),
}
def read_xlsx_rows(path, schema):
try:
import openpyxl
except ImportError:
print("ERROR: openpyxl required for XLSX files. Run: pip install openpyxl")
sys.exit(1)
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
ws = wb.active
headers = None
detected_schema = schema
rows = []
for excel_row in ws.iter_rows(values_only=True):
if headers is None:
headers = [str(c).strip() if c else "" for c in excel_row]
detected_schema = detect_xlsx_schema(headers)
continue
raw = dict(zip(headers, [str(c).strip() if c is not None else "" for c in excel_row]))
if detected_schema == "XLSX_A":
row = normalise_xlsx_a_row(raw)
else:
row = normalise_xlsx_row(raw, detected_schema)
if row["score"] is None:
continue
rows.append(row)
wb.close()
return rows
def parse_entity(source, scoring, pillar_order):
path = source["file"]
if not path.exists():
print(f" WARNING: file not found — {path}")
return None
rows = read_xlsx_rows(path, source["schema"]) if source["xlsx"] else read_csv_rows(path, source["schema"])
by_pillar = defaultdict(list)
for row in rows:
by_pillar[row["pillar"]].append(row)
pillars_out = []
all_scores = []
for pname in pillar_order:
qs = by_pillar.get(pname, [])
if not qs:
continue
scores = [q["score"] for q in qs]
avg = round(sum(scores) / len(scores), 2)
all_scores.extend(scores)
level = score_to_level(avg, scoring)
pillars_out.append({
"name": pname,
"avg": avg,
"level": level,
"label": level_label(level, scoring),
"questions": [
{
"q_num": q["q_num"],
"topic": q["topic"],
"score": int(q["score"]) if q["score"] == int(q["score"]) else q["score"],
"level": score_to_level(q["score"], scoring),
"label": q["label"] or level_label(score_to_level(q["score"], scoring), scoring),
"rationale": q["rationale"],
"gaps": q["gaps"],
"refs": q["refs"],
}
for q in qs
],
})
if not all_scores:
print(f" WARNING: no valid scores found for {source['id']}")
return None
overall = round(sum(all_scores) / len(all_scores), 2)
overall_level = score_to_level(overall, scoring)
return {
"id": source["id"],
"label": source["label"],
"short": source["short"],
"group": source["group"],
"overall_score": overall,
"overall_level": overall_level,
"overall_label": level_label(overall_level, scoring),
"synced": date.today().isoformat(),
"pillars": pillars_out,
}
# ── Client handlers ────────────────────────────────────────────────────────────
def build_adeo(config, entity_filter=None):
scoring = config["scoring"]
pillar_order = config["pillars"]
sources = ADEO_SOURCES
if entity_filter:
sources = [s for s in ADEO_SOURCES if s["id"] == entity_filter]
if not sources:
print(f"ERROR: entity '{entity_filter}' not found in ADEO sources")
sys.exit(1)
entities = []
for source in sources:
print(f" Reading {source['short']}")
entity = parse_entity(source, scoring, pillar_order)
if entity:
entities.append(entity)
print(f"{entity['overall_score']} ({entity['overall_label']})")
entities.sort(key=lambda e: e["overall_score"], reverse=True)
return entities
CLIENT_BUILDERS = {
"adeo": build_adeo,
}
# ── Main ───────────────────────────────────────────────────────────────────────
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("client_id", nargs="?", default="adeo")
parser.add_argument("--entity", default=None, help="Sync only this entity ID")
args = parser.parse_args()
client_id = args.client_id
entity_filter = args.entity
config_path = SCRIPT_DIR / "clients" / client_id / "config.json"
if not config_path.exists():
print(f"ERROR: config not found at {config_path}")
sys.exit(1)
with open(config_path, encoding="utf-8") as f:
config = json.load(f)
if client_id not in CLIENT_BUILDERS:
print(f"ERROR: no builder registered for client '{client_id}'")
print(f" Available: {list(CLIENT_BUILDERS.keys())}")
sys.exit(1)
print(f"Building data for client: {config['name']}" + (f" [entity: {entity_filter}]" if entity_filter else ""))
entities = CLIENT_BUILDERS[client_id](config, entity_filter=entity_filter)
out_path = SCRIPT_DIR / "clients" / client_id / "data.json"
if entity_filter:
# Merge updated entity into existing data, preserve all others
existing_entities = []
if out_path.exists():
with open(out_path, encoding="utf-8") as f:
existing = json.load(f)
existing_entities = existing.get("entities", [])
updated = {e["id"]: e for e in entities}
merged = [updated.get(e["id"], e) for e in existing_entities]
# Add if not already present
existing_ids = {e["id"] for e in existing_entities}
for e in entities:
if e["id"] not in existing_ids:
merged.append(e)
merged.sort(key=lambda e: e["overall_score"], reverse=True)
output = {
"generated": date.today().isoformat(),
"client_id": client_id,
"entities": merged,
}
else:
output = {
"generated": date.today().isoformat(),
"client_id": client_id,
"entities": entities,
}
with open(out_path, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"\n✓ Written {len(output['entities'])} entities → {out_path}")
scores_str = " · ".join(f"{e['short']} {e['overall_score']}" for e in output["entities"])
print(f" Scores: {scores_str}")
if __name__ == "__main__":
main()