adeo-maturity-tool/import_file.py
Phil Dore b97889a9d9 Add activity log, multilingual support (ES/PT/IT/PL), and QA score column fix
- Activity log: import_log DB table, GET /activity endpoint, collapsible
  panel in Update Data tab showing who changed what score and when
- Sync logging: Box syncs now also write to import_log with before/after scores
- Multilingual: ES, PT, IT, PL UI strings in i18n/ui.js; content translations
  (pillars, scoring labels, About tab) in clients/adeo/config.json; language
  switcher upgraded from EN/FR toggle to 6-language dropdown
- cfg() and pillarDisplayName() generalised to support any language (not just FR)
- import_file.py: reads 'Final QA ...' column first (falls back to Score) so
  QA-updated XLSX files produce the correct overall score on import
- convert_data.py/docker-compose: use ADEO_DATA_ROOT env var for Box path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 13:23:22 +01:00

240 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
import_file.py <client_id> <entity_id> <filepath>
Parses a single CSV or XLSX maturity file and outputs JSON:
{ overall_score, overall_level, overall_label, pillars: [...] }
Called by the server's POST /api/clients/:id/import/file endpoint.
Supports Schema A (France/Poland), Schema B (Italy/Portugal/M-markets),
and XLSX_A (Spain V4).
"""
import csv
import json
import re
import sys
from pathlib import Path
ROOT = Path(__file__).parent
# ── Score helpers ─────────────────────────────────────────────────────────────
def parse_score(s):
if s is None:
return None
m = re.search(r"\d+(?:\.\d+)?", str(s))
return float(m.group()) if m else None
def score_to_level(score, scoring):
if score is None:
return None
mn, mx = scoring["min"], scoring["max"]
num_levels = len(scoring["labels"])
bucket = (mx - mn) / num_levels
for i in range(1, num_levels + 1):
if score <= mn + bucket * i:
return i
return num_levels
def level_label(level, scoring):
return scoring["labels"].get(str(level), "") if level is not None else ""
# ── Schema detection ──────────────────────────────────────────────────────────
def detect_schema(headers):
h = [x.lower().strip() for x in headers]
if "question_number" in h:
return "B"
if "q#" in h:
# Could be Schema A (CSV) or XLSX_A — disambiguate by looking for 'question topic'
if "question topic" in h:
return "XLSX_A"
return "A"
# Fallback: try B
return "B"
# ── Row normalisation ─────────────────────────────────────────────────────────
def find_qa_col(headers):
"""Return the header name of the first 'Final QA ...' column, or None."""
for h in headers:
if h.strip().lower().startswith("final qa"):
return h
return None
def pick_score(raw, qa_col, *fallback_keys):
"""Use QA column if present and non-empty, otherwise try fallback keys in order."""
if qa_col:
v = parse_score(raw.get(qa_col, ""))
if v is not None:
return v
for key in fallback_keys:
v = parse_score(raw.get(key, ""))
if v is not None:
return v
return None
def normalise_row(raw, schema, qa_col=None):
if schema == "A":
return {
"q_num": str(raw.get("Q#", raw.get("Q #", ""))).strip(),
"topic": str(raw.get("Question", "")).strip(),
"pillar": str(raw.get("Pillar", "")).strip().upper(),
"score": pick_score(raw, qa_col, "Score"),
"label": str(raw.get("Score_Label", "")).strip(),
"rationale": str(raw.get("Scoring_Rationale", "")).strip(),
"gaps": str(raw.get("Gaps_Identified", "")).strip(),
"refs": str(raw.get("Evidence_References", "")).strip(),
}
if schema == "XLSX_A":
return {
"q_num": str(raw.get("Q#", "")).strip(),
"topic": str(raw.get("Question Topic", "")).strip(),
"pillar": str(raw.get("Pillar", "")).strip().upper(),
"score": pick_score(raw, qa_col, "Score"),
"label": str(raw.get("Level", "")).strip(),
"rationale": str(raw.get("Rationale", "")).strip(),
"gaps": str(raw.get("Gaps", "")).strip(),
"refs": str(raw.get("References Used", "")).strip(),
}
# Schema B (default)
return {
"q_num": str(raw.get("Question_Number", "")).strip(),
"topic": str(raw.get("Question_Topic", "")).strip(),
"pillar": str(raw.get("Pillar", "")).strip().upper(),
"score": pick_score(raw, qa_col, "Score_Numeric", "Score"),
"label": str(raw.get("Score_Label", "")).strip(),
"rationale": str(raw.get("Rationale", "")).strip(),
"gaps": str(raw.get("Gaps", "")).strip(),
"refs": str(raw.get("References_Used", raw.get("References", ""))).strip(),
}
# ── File readers ──────────────────────────────────────────────────────────────
def read_csv(filepath):
with open(filepath, encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
headers = reader.fieldnames or []
rows = list(reader)
return headers, rows
def read_xlsx(filepath):
try:
import openpyxl
except ImportError:
raise RuntimeError("openpyxl required for XLSX files. Run: pip install openpyxl")
wb = openpyxl.load_workbook(filepath, read_only=True, data_only=True)
ws = wb.active
headers = None
rows = []
for excel_row in ws.iter_rows(values_only=True):
if headers is None:
headers = [str(c).strip() if c is not None else "" for c in excel_row]
continue
rows.append(dict(zip(headers, [str(v).strip() if v is not None else "" for v in excel_row])))
wb.close()
return headers or [], rows
def read_file(filepath):
ext = Path(filepath).suffix.lower()
if ext in (".xlsx", ".xls"):
return read_xlsx(filepath)
return read_csv(filepath)
# ── Main ──────────────────────────────────────────────────────────────────────
def build_result(client_id, entity_id, filepath):
cfg_path = ROOT / "clients" / client_id / "config.json"
with open(cfg_path) as f:
config = json.load(f)
scoring = config["scoring"]
pillar_order = config["pillars"]
headers, raw_rows = read_file(filepath)
schema = detect_schema(headers)
qa_col = find_qa_col(headers)
# Normalise and filter invalid scores
questions = []
for raw in raw_rows:
row = normalise_row(raw, schema, qa_col)
if row["score"] is None:
continue
score = int(round(row["score"]))
level = score_to_level(score, scoring)
questions.append({
"q_num": row["q_num"],
"topic": row["topic"],
"pillar": row["pillar"],
"score": score,
"level": level,
"label": level_label(level, scoring),
"rationale": row["rationale"],
"gaps": row["gaps"],
"refs": row["refs"],
})
# Group by pillar in config order
by_pillar = {}
for q in questions:
by_pillar.setdefault(q["pillar"], []).append(q)
pillars_out = []
all_scores = []
for pname in pillar_order:
qs = by_pillar.get(pname, [])
if not qs:
continue
scores = [q["score"] for q in qs]
avg = round(sum(scores) / len(scores), 2)
level = score_to_level(avg, scoring)
all_scores.extend(scores)
pillars_out.append({
"name": pname,
"avg": avg,
"level": level,
"label": level_label(level, scoring),
"questions": [{k: v for k, v in q.items() if k != "pillar"} for q in qs],
})
if not all_scores:
raise ValueError("No valid scores found in the uploaded file. Check the file format and column names.")
overall = round(sum(all_scores) / len(all_scores), 2)
overall_level = score_to_level(overall, scoring)
return {
"overall_score": overall,
"overall_level": overall_level,
"overall_label": level_label(overall_level, scoring),
"pillars": pillars_out,
}
if __name__ == "__main__":
if len(sys.argv) < 4:
print(json.dumps({"error": "Usage: import_file.py <client_id> <entity_id> <filepath>"}))
sys.exit(1)
client_id = sys.argv[1]
entity_id = sys.argv[2]
filepath = sys.argv[3]
try:
result = build_result(client_id, entity_id, filepath)
print(json.dumps(result))
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)