- Activity log: import_log DB table, GET /activity endpoint, collapsible panel in Update Data tab showing who changed what score and when - Sync logging: Box syncs now also write to import_log with before/after scores - Multilingual: ES, PT, IT, PL UI strings in i18n/ui.js; content translations (pillars, scoring labels, About tab) in clients/adeo/config.json; language switcher upgraded from EN/FR toggle to 6-language dropdown - cfg() and pillarDisplayName() generalised to support any language (not just FR) - import_file.py: reads 'Final QA ...' column first (falls back to Score) so QA-updated XLSX files produce the correct overall score on import - convert_data.py/docker-compose: use ADEO_DATA_ROOT env var for Box path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
240 lines
8.3 KiB
Python
240 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
import_file.py <client_id> <entity_id> <filepath>
|
|
|
|
Parses a single CSV or XLSX maturity file and outputs JSON:
|
|
{ overall_score, overall_level, overall_label, pillars: [...] }
|
|
|
|
Called by the server's POST /api/clients/:id/import/file endpoint.
|
|
Supports Schema A (France/Poland), Schema B (Italy/Portugal/M-markets),
|
|
and XLSX_A (Spain V4).
|
|
"""
|
|
|
|
import csv
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).parent
|
|
|
|
|
|
# ── Score helpers ─────────────────────────────────────────────────────────────
|
|
|
|
def parse_score(s):
|
|
if s is None:
|
|
return None
|
|
m = re.search(r"\d+(?:\.\d+)?", str(s))
|
|
return float(m.group()) if m else None
|
|
|
|
|
|
def score_to_level(score, scoring):
|
|
if score is None:
|
|
return None
|
|
mn, mx = scoring["min"], scoring["max"]
|
|
num_levels = len(scoring["labels"])
|
|
bucket = (mx - mn) / num_levels
|
|
for i in range(1, num_levels + 1):
|
|
if score <= mn + bucket * i:
|
|
return i
|
|
return num_levels
|
|
|
|
|
|
def level_label(level, scoring):
|
|
return scoring["labels"].get(str(level), "") if level is not None else ""
|
|
|
|
|
|
# ── Schema detection ──────────────────────────────────────────────────────────
|
|
|
|
def detect_schema(headers):
|
|
h = [x.lower().strip() for x in headers]
|
|
if "question_number" in h:
|
|
return "B"
|
|
if "q#" in h:
|
|
# Could be Schema A (CSV) or XLSX_A — disambiguate by looking for 'question topic'
|
|
if "question topic" in h:
|
|
return "XLSX_A"
|
|
return "A"
|
|
# Fallback: try B
|
|
return "B"
|
|
|
|
|
|
# ── Row normalisation ─────────────────────────────────────────────────────────
|
|
|
|
def find_qa_col(headers):
|
|
"""Return the header name of the first 'Final QA ...' column, or None."""
|
|
for h in headers:
|
|
if h.strip().lower().startswith("final qa"):
|
|
return h
|
|
return None
|
|
|
|
|
|
def pick_score(raw, qa_col, *fallback_keys):
|
|
"""Use QA column if present and non-empty, otherwise try fallback keys in order."""
|
|
if qa_col:
|
|
v = parse_score(raw.get(qa_col, ""))
|
|
if v is not None:
|
|
return v
|
|
for key in fallback_keys:
|
|
v = parse_score(raw.get(key, ""))
|
|
if v is not None:
|
|
return v
|
|
return None
|
|
|
|
|
|
def normalise_row(raw, schema, qa_col=None):
|
|
if schema == "A":
|
|
return {
|
|
"q_num": str(raw.get("Q#", raw.get("Q #", ""))).strip(),
|
|
"topic": str(raw.get("Question", "")).strip(),
|
|
"pillar": str(raw.get("Pillar", "")).strip().upper(),
|
|
"score": pick_score(raw, qa_col, "Score"),
|
|
"label": str(raw.get("Score_Label", "")).strip(),
|
|
"rationale": str(raw.get("Scoring_Rationale", "")).strip(),
|
|
"gaps": str(raw.get("Gaps_Identified", "")).strip(),
|
|
"refs": str(raw.get("Evidence_References", "")).strip(),
|
|
}
|
|
if schema == "XLSX_A":
|
|
return {
|
|
"q_num": str(raw.get("Q#", "")).strip(),
|
|
"topic": str(raw.get("Question Topic", "")).strip(),
|
|
"pillar": str(raw.get("Pillar", "")).strip().upper(),
|
|
"score": pick_score(raw, qa_col, "Score"),
|
|
"label": str(raw.get("Level", "")).strip(),
|
|
"rationale": str(raw.get("Rationale", "")).strip(),
|
|
"gaps": str(raw.get("Gaps", "")).strip(),
|
|
"refs": str(raw.get("References Used", "")).strip(),
|
|
}
|
|
# Schema B (default)
|
|
return {
|
|
"q_num": str(raw.get("Question_Number", "")).strip(),
|
|
"topic": str(raw.get("Question_Topic", "")).strip(),
|
|
"pillar": str(raw.get("Pillar", "")).strip().upper(),
|
|
"score": pick_score(raw, qa_col, "Score_Numeric", "Score"),
|
|
"label": str(raw.get("Score_Label", "")).strip(),
|
|
"rationale": str(raw.get("Rationale", "")).strip(),
|
|
"gaps": str(raw.get("Gaps", "")).strip(),
|
|
"refs": str(raw.get("References_Used", raw.get("References", ""))).strip(),
|
|
}
|
|
|
|
|
|
# ── File readers ──────────────────────────────────────────────────────────────
|
|
|
|
def read_csv(filepath):
|
|
with open(filepath, encoding="utf-8-sig", newline="") as f:
|
|
reader = csv.DictReader(f)
|
|
headers = reader.fieldnames or []
|
|
rows = list(reader)
|
|
return headers, rows
|
|
|
|
|
|
def read_xlsx(filepath):
|
|
try:
|
|
import openpyxl
|
|
except ImportError:
|
|
raise RuntimeError("openpyxl required for XLSX files. Run: pip install openpyxl")
|
|
wb = openpyxl.load_workbook(filepath, read_only=True, data_only=True)
|
|
ws = wb.active
|
|
headers = None
|
|
rows = []
|
|
for excel_row in ws.iter_rows(values_only=True):
|
|
if headers is None:
|
|
headers = [str(c).strip() if c is not None else "" for c in excel_row]
|
|
continue
|
|
rows.append(dict(zip(headers, [str(v).strip() if v is not None else "" for v in excel_row])))
|
|
wb.close()
|
|
return headers or [], rows
|
|
|
|
|
|
def read_file(filepath):
|
|
ext = Path(filepath).suffix.lower()
|
|
if ext in (".xlsx", ".xls"):
|
|
return read_xlsx(filepath)
|
|
return read_csv(filepath)
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_result(client_id, entity_id, filepath):
|
|
cfg_path = ROOT / "clients" / client_id / "config.json"
|
|
with open(cfg_path) as f:
|
|
config = json.load(f)
|
|
|
|
scoring = config["scoring"]
|
|
pillar_order = config["pillars"]
|
|
|
|
headers, raw_rows = read_file(filepath)
|
|
schema = detect_schema(headers)
|
|
qa_col = find_qa_col(headers)
|
|
|
|
# Normalise and filter invalid scores
|
|
questions = []
|
|
for raw in raw_rows:
|
|
row = normalise_row(raw, schema, qa_col)
|
|
if row["score"] is None:
|
|
continue
|
|
score = int(round(row["score"]))
|
|
level = score_to_level(score, scoring)
|
|
questions.append({
|
|
"q_num": row["q_num"],
|
|
"topic": row["topic"],
|
|
"pillar": row["pillar"],
|
|
"score": score,
|
|
"level": level,
|
|
"label": level_label(level, scoring),
|
|
"rationale": row["rationale"],
|
|
"gaps": row["gaps"],
|
|
"refs": row["refs"],
|
|
})
|
|
|
|
# Group by pillar in config order
|
|
by_pillar = {}
|
|
for q in questions:
|
|
by_pillar.setdefault(q["pillar"], []).append(q)
|
|
|
|
pillars_out = []
|
|
all_scores = []
|
|
for pname in pillar_order:
|
|
qs = by_pillar.get(pname, [])
|
|
if not qs:
|
|
continue
|
|
scores = [q["score"] for q in qs]
|
|
avg = round(sum(scores) / len(scores), 2)
|
|
level = score_to_level(avg, scoring)
|
|
all_scores.extend(scores)
|
|
pillars_out.append({
|
|
"name": pname,
|
|
"avg": avg,
|
|
"level": level,
|
|
"label": level_label(level, scoring),
|
|
"questions": [{k: v for k, v in q.items() if k != "pillar"} for q in qs],
|
|
})
|
|
|
|
if not all_scores:
|
|
raise ValueError("No valid scores found in the uploaded file. Check the file format and column names.")
|
|
|
|
overall = round(sum(all_scores) / len(all_scores), 2)
|
|
overall_level = score_to_level(overall, scoring)
|
|
return {
|
|
"overall_score": overall,
|
|
"overall_level": overall_level,
|
|
"overall_label": level_label(overall_level, scoring),
|
|
"pillars": pillars_out,
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 4:
|
|
print(json.dumps({"error": "Usage: import_file.py <client_id> <entity_id> <filepath>"}))
|
|
sys.exit(1)
|
|
|
|
client_id = sys.argv[1]
|
|
entity_id = sys.argv[2]
|
|
filepath = sys.argv[3]
|
|
|
|
try:
|
|
result = build_result(client_id, entity_id, filepath)
|
|
print(json.dumps(result))
|
|
except Exception as e:
|
|
print(json.dumps({"error": str(e)}))
|
|
sys.exit(1)
|