#!/usr/bin/env python3 """ Maturity Tool — Data Converter Usage: python3 convert_data.py adeo Reads source CSVs/XLSXs for the given client and writes: clients/{client}/data.json """ import csv import json import os import re import sys from collections import defaultdict from datetime import date from pathlib import Path # ── Paths ───────────────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).parent BOX_ROOT = Path(os.environ.get("ADEO_DATA_ROOT", "/Users/phildore/Library/CloudStorage/Box-Box/ADEO_EXTERNAL_SHARE")) # ── Score helpers ────────────────────────────────────────────────────────────── def parse_score(s): """Extract numeric score from strings like '3', '3.5', '3 (Master)'. Returns None if unparseable.""" m = re.search(r"\d+(?:\.\d+)?", str(s)) return float(m.group()) if m else None def score_to_level(score, scoring): """Map a numeric score to 1-based tier index within the configured scale.""" if score is None: return None mn, mx = scoring["min"], scoring["max"] num_levels = len(scoring["labels"]) # Evenly divide the scale into num_levels buckets bucket = (mx - mn) / num_levels for i in range(1, num_levels + 1): if score <= mn + bucket * i: return i return num_levels def level_label(level, scoring): if level is None: return "" return scoring["labels"].get(str(level), "") # ── Column normalisation ─────────────────────────────────────────────────────── def normalise_row(raw, schema): """Return canonical dict regardless of CSV schema variant.""" if schema == "A": q_num = raw.get("Q#", "").strip() topic = raw.get("Question", "").strip() pillar = raw.get("Pillar", "").strip().upper() score_s = raw.get("Score", "") label = raw.get("Score_Label", "").strip() rationale = raw.get("Scoring_Rationale", "").strip() gaps = raw.get("Gaps_Identified", "").strip() refs = raw.get("Evidence_References", "").strip() elif schema == "B": q_num = raw.get("Question_Number", "").strip() topic = raw.get("Question_Topic", "").strip() pillar = raw.get("Pillar", "").strip().upper() score_s = raw.get("Score_Numeric", "") label = raw.get("Score_Label", "").strip() rationale = raw.get("Rationale", "").strip() gaps = raw.get("Gaps", "").strip() refs = raw.get("References_Used", "").strip() else: raise ValueError(f"Unknown schema: {schema}") score_num = parse_score(score_s) return { "q_num": q_num, "topic": topic, "pillar": pillar, "score": score_num, "label": label, "rationale": rationale, "gaps": gaps, "refs": refs, } def normalise_xlsx_row(row_dict, schema="B"): """Same as normalise_row but input already keyed by column header (from openpyxl).""" return normalise_row(row_dict, schema) # ── ADEO source definitions ──────────────────────────────────────────────────── ADEO_SOURCES = [ { "id": "BU_LM_FRANCE", "label": "Leroy Merlin France", "short": "LM France", "group": "Leroy Merlin", "file": BOX_ROOT / "BU_LM_FRANCE/05_MATURITY/01_OUTPUTS/CSV/BU_LM_FRANCE_Maturity_Scores_Consolidated_v3.xlsx", "schema": "B", "xlsx": True, }, { "id": "BU_LM_ITALY", "label": "Leroy Merlin Italy", "short": "LM Italy", "group": "Leroy Merlin", "file": BOX_ROOT / "BU_LM_ITALY/06_MATURITY_GRID/02_OUTPUTS/V3/CSV/BU_LM_ITALY_Maturity_Scores_Consolidated_v1.csv", "schema": "B", "xlsx": False, }, { "id": "BU_LM_POLAND", "label": "Leroy Merlin Poland", "short": "LM Poland", "group": "Leroy Merlin", "file": BOX_ROOT / "BU_LM_POLAND/05_MATURITY/01_OUTPUTS/CSV/BU_LM_POLAND_Maturity_Scores_Consolidated_v3.xlsx", "schema": "B", "xlsx": True, }, { "id": "BU_LM_SPAIN", "label": "Leroy Merlin Spain", "short": "LM Spain", "group": "Leroy Merlin", "file": BOX_ROOT / "BU_LM_SPAIN/06_MATURITY/01_OUTPUT/CSV/V4/BU_LM_SPAIN_Maturity_Scores_Consolidated_v5.xlsx", "schema": "B", "xlsx": True, }, { "id": "BU_LM_PORTUGAL", "label": "Leroy Merlin Portugal", "short": "LM Portugal", "group": "Leroy Merlin", "file": BOX_ROOT / "BU_LM_PORTUGAL copy/05_MATURITY/02_OUTPUTS/CSV/BU_LM_PORTUGAL_Maturity_Scores_Consolidated_v5.csv", "schema": "B", "xlsx": False, }, { "id": "BU_M_SPAIN", "label": "Obramat Spain", "short": "Obramat Spain", "group": "Obramat", "file": BOX_ROOT / "BU_M_SPAIN/08_ASSESSMENT/02_Round_2/CSV/BU_M_SPAIN_Maturity_Scores_Consolidated_v1.csv", "schema": "B", "xlsx": False, }, { "id": "BU_M_BRAZIL", "label": "Obramax Brazil", "short": "Obramax Brazil", "group": "Obramax", "file": BOX_ROOT / "BU_M_BRAZIL/06_ASSESSMENT/02_Round_2/CSV/BU_M_BRAZIL_Maturity_Scores_Consolidated_v1.csv", "schema": "B", "xlsx": False, }, { "id": "BU_M_ITALY", "label": "Tecnomat Italy", "short": "Tecnomat Italy", "group": "Tecnomat", "file": BOX_ROOT / "BU_M_ITALY/06_ASSESSMENT/02_Round_2/CSV/BU_M_ITALY_Maturity_Scores_Consolidated_v1.csv", "schema": "B", "xlsx": False, }, ] # ── Parsers ──────────────────────────────────────────────────────────────────── def read_csv_rows(path, schema): rows = [] with open(path, encoding="utf-8-sig", newline="") as f: for raw in csv.DictReader(f): row = normalise_row(raw, schema) if row["score"] is None: continue rows.append(row) return rows def detect_xlsx_schema(headers): """Pick the right schema based on whichever Q# column name is present.""" h = [h.lower() for h in headers] if "question_number" in h: return "B" if "q#" in h: # Spain V4 XLSX — Q#, Question Topic, Score, Level, Rationale, Gaps, References Used return "XLSX_A" return "B" def normalise_xlsx_a_row(raw): """Spain V4 XLSX variant: Q#, Question Topic, Score, Level, Rationale, Gaps, References Used.""" score_s = raw.get("Score", "") score_num = parse_score(score_s) return { "q_num": str(raw.get("Q#", "")).strip(), "topic": raw.get("Question Topic", "").strip(), "pillar": raw.get("Pillar", "").strip().upper(), "score": score_num, "label": raw.get("Level", "").strip(), "rationale": raw.get("Rationale", "").strip(), "gaps": raw.get("Gaps", "").strip(), "refs": raw.get("References Used", "").strip(), } def read_xlsx_rows(path, schema): try: import openpyxl except ImportError: print("ERROR: openpyxl required for XLSX files. Run: pip install openpyxl") sys.exit(1) wb = openpyxl.load_workbook(path, read_only=True, data_only=True) ws = wb.active headers = None detected_schema = schema rows = [] for excel_row in ws.iter_rows(values_only=True): if headers is None: headers = [str(c).strip() if c else "" for c in excel_row] detected_schema = detect_xlsx_schema(headers) continue raw = dict(zip(headers, [str(c).strip() if c is not None else "" for c in excel_row])) if detected_schema == "XLSX_A": row = normalise_xlsx_a_row(raw) else: row = normalise_xlsx_row(raw, detected_schema) if row["score"] is None: continue rows.append(row) wb.close() return rows def parse_entity(source, scoring, pillar_order): path = source["file"] if not path.exists(): print(f" WARNING: file not found — {path}") return None rows = read_xlsx_rows(path, source["schema"]) if source["xlsx"] else read_csv_rows(path, source["schema"]) by_pillar = defaultdict(list) for row in rows: by_pillar[row["pillar"]].append(row) pillars_out = [] all_scores = [] for pname in pillar_order: qs = by_pillar.get(pname, []) if not qs: continue scores = [q["score"] for q in qs] avg = round(sum(scores) / len(scores), 2) all_scores.extend(scores) level = score_to_level(avg, scoring) pillars_out.append({ "name": pname, "avg": avg, "level": level, "label": level_label(level, scoring), "questions": [ { "q_num": q["q_num"], "topic": q["topic"], "score": int(q["score"]) if q["score"] == int(q["score"]) else q["score"], "level": score_to_level(q["score"], scoring), "label": q["label"] or level_label(score_to_level(q["score"], scoring), scoring), "rationale": q["rationale"], "gaps": q["gaps"], "refs": q["refs"], } for q in qs ], }) if not all_scores: print(f" WARNING: no valid scores found for {source['id']}") return None overall = round(sum(all_scores) / len(all_scores), 2) overall_level = score_to_level(overall, scoring) return { "id": source["id"], "label": source["label"], "short": source["short"], "group": source["group"], "overall_score": overall, "overall_level": overall_level, "overall_label": level_label(overall_level, scoring), "synced": date.today().isoformat(), "pillars": pillars_out, } # ── Client handlers ──────────────────────────────────────────────────────────── def build_adeo(config, entity_filter=None): scoring = config["scoring"] pillar_order = config["pillars"] sources = ADEO_SOURCES if entity_filter: sources = [s for s in ADEO_SOURCES if s["id"] == entity_filter] if not sources: print(f"ERROR: entity '{entity_filter}' not found in ADEO sources") sys.exit(1) entities = [] for source in sources: print(f" Reading {source['short']}…") entity = parse_entity(source, scoring, pillar_order) if entity: entities.append(entity) print(f" → {entity['overall_score']} ({entity['overall_label']})") entities.sort(key=lambda e: e["overall_score"], reverse=True) return entities CLIENT_BUILDERS = { "adeo": build_adeo, } # ── Main ─────────────────────────────────────────────────────────────────────── def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("client_id", nargs="?", default="adeo") parser.add_argument("--entity", default=None, help="Sync only this entity ID") args = parser.parse_args() client_id = args.client_id entity_filter = args.entity config_path = SCRIPT_DIR / "clients" / client_id / "config.json" if not config_path.exists(): print(f"ERROR: config not found at {config_path}") sys.exit(1) with open(config_path, encoding="utf-8") as f: config = json.load(f) if client_id not in CLIENT_BUILDERS: print(f"ERROR: no builder registered for client '{client_id}'") print(f" Available: {list(CLIENT_BUILDERS.keys())}") sys.exit(1) print(f"Building data for client: {config['name']}" + (f" [entity: {entity_filter}]" if entity_filter else "")) entities = CLIENT_BUILDERS[client_id](config, entity_filter=entity_filter) out_path = SCRIPT_DIR / "clients" / client_id / "data.json" if entity_filter: # Merge updated entity into existing data, preserve all others existing_entities = [] if out_path.exists(): with open(out_path, encoding="utf-8") as f: existing = json.load(f) existing_entities = existing.get("entities", []) updated = {e["id"]: e for e in entities} merged = [updated.get(e["id"], e) for e in existing_entities] # Add if not already present existing_ids = {e["id"] for e in existing_entities} for e in entities: if e["id"] not in existing_ids: merged.append(e) merged.sort(key=lambda e: e["overall_score"], reverse=True) output = { "generated": date.today().isoformat(), "client_id": client_id, "entities": merged, } else: output = { "generated": date.today().isoformat(), "client_id": client_id, "entities": entities, } with open(out_path, "w", encoding="utf-8") as f: json.dump(output, f, indent=2, ensure_ascii=False) print(f"\n✓ Written {len(output['entities'])} entities → {out_path}") scores_str = " · ".join(f"{e['short']} {e['overall_score']}" for e in output["entities"]) print(f" Scores: {scores_str}") if __name__ == "__main__": main()