loreal-utilisation-dept/backend/app/services/zoho_parse.py
DJP 993e370cea feat: Forecast, Project Type Summary, Time Log Detail, AI Chat, filters v2, stats bar, RBAC
Brings the new app to full parity with the original L'Oréal SPA and
beyond. Backend 59/59 tests (was 40, +19). Frontend typecheck/lint/build
clean. Main entry chunk 15.76 KB gz (budget 30 KB).

Backend — new endpoints + services:
- POST /api/deliverable/parse        — parse Deliverable Summary CSV/XLSX
- POST /api/projectsummary/parse     — parse Project Summary CSV/XLSX
- GET  /api/timelog/rows             — paginated, searchable, sortable view
                                        over the parsed Zoho upload
- GET  /api/forecast                 — 4-week pipeline + capacity decision
- GET  /api/project-types            — hours/asset, duration, concentration
                                        per project type + auto-insights
- POST /api/chat                     — Claude API proxy. 503s gracefully
                                        when ANTHROPIC_API_KEY is unset.
                                        Prompt-cached system prompt;
                                        rate-limited 20/min/IP.
- GET  /api/auth/me now returns role.

Backend — services:
- zoho_parse.py: extracts ~20 fields (brand, division, hub, userRole,
  projectType, assetCount, projectStatus, project start/end dates,
  userAgency, employingCompany, sageJobProfile, …) with back-compat
  aliases so existing callers keep working.
- parse_store.py: in-process TTL-cached registry of parsed uploads keyed
  by content hash. Lets endpoints reference an upload without re-sending it.
- forecast.py: working-day overlap math, exit-rate, weekly throughput
  baseline, capacity decision string mirroring the original wording.
- project_types.py: per-type aggregation + concentration-risk insights.
- timelog_filters.py: server-side filter by brands/divisions/hubs/roles.
- ai_context.py: builds the dashboard context block fed to Claude.

Frontend — new pages + components:
- pages/Forecast.tsx                 — ComposedChart (stacked bars + line)
                                        + capacity-decision banner + table
- pages/ProjectTypeSummary.tsx       — sortable table + small trend chart
- pages/TimeLogDetail.tsx            — virtualised, searchable, sortable
                                        view over all parsed timelog rows
- components/ChatView.tsx            — floating side panel with Claude.
                                        6 preset prompts mirroring the
                                        original. Visible only for roles
                                        with chat access.
- components/ChatToggle.tsx          — bottom-right FAB.
- components/StatsBar.tsx            — always-visible: Time Entries /
                                        People / Projects / Total Hours /
                                        Date Range.
- hooks/useDataContext.tsx           — single source of truth for filter
                                        state + parsed upload + filter
                                        dimensions (brands/divs/hubs/
                                        roles derived from uploads).

Frontend — modified:
- App.tsx, Navbar.tsx                — 7 tabs + role gating per the
                                        original TAB_ACCESS matrix.
- hooks/useAuth.tsx                  — role + canAccess(tab).
- lib/filters.ts, FilterBar.tsx      — Brand / Division / Hub / Role
                                        multiselects added (additive — keep
                                        Department / Name / Billing).
- pages/Department, Resourcing,
  Bookings, Tutorial.tsx             — wired into DataContext; tutorial
                                        is now a single 9-step global tour
                                        mirroring the original's narrative.

Config:
- backend/.env.example: ADMIN_ROLE, ANTHROPIC_API_KEY, ANTHROPIC_MODEL.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 21:40:03 -04:00

423 lines
15 KiB
Python

"""Zoho timelog parser.
Decisions:
- Header matching is case-insensitive and trim-stripped. Unknown headers
are surfaced in `unrecognised_columns` so the operator notices when
Zoho silently renames a column.
- Billable detection: we keep TWO canonical fields. `billable` accepts
literal "Billable" / "Is Billable" columns (boolean-ish). `billingType`
accepts a "Billing Type" column whose values look like
"Client Related" / "Fee Related" / "Idle Time" / "Leave Hours".
When only one of the two is present we cross-fill the other: a
billingType of client/fee implies billable=True; leave implies False.
- Date parsing tries ISO first, then dateutil for the messy formats Zoho
occasionally emits ("01/05/2026", "1-May-2026", etc.). The real Zoho
CSV uses DD/MM/YYYY UK format, so dayfirst=True is the right default.
- For .xlsx we use openpyxl read-only mode — keeps memory low on big files.
v2 (parity with original SPA):
- Extracts ~20 fields rather than the original 6.
- "Time Submitter" header carries "Name (email)" — we split it into
`submitter` + `submitterEmail`. Same field is also exposed as `employee`
(back-compat alias for existing merge code).
- Date preference: "Month & Year (Log Date)" first, then "Time Log Start".
The Month & Year column is monthly-bucketed which is what the
utilisation views want; Time Log Start is the actual day the user
picked. We expose Time Log Start as `timeLogStartDisplay` so the
TimeLogDetail view can show the original date.
- Header keys with duplicates: the real CSV repeats "Project Number"
later in the file (col index 56) for project-rollup metadata. We honour
the FIRST occurrence, matching the original.
"""
from __future__ import annotations
import csv
import hashlib
import io
import logging
from datetime import date, datetime
from typing import Any, Iterable
from dateutil import parser as dateparser
from openpyxl import load_workbook
logger = logging.getLogger(__name__)
# Canonical name → set of accepted aliases (compared after .strip().lower()).
#
# Order matters for "date": we prefer "Month & Year (Log Date)" over
# "Time Log Start" because the original SPA does the same. Both produce a
# `date` field; "Time Log Start" populates `timeLogStartDisplay` separately
# so the row carries both pieces.
HEADER_ALIASES: dict[str, set[str]] = {
"date": {
"month & year (log date)",
"month and year (log date)",
"time log start",
"log date",
"start date",
"date",
},
"timeLogStartDisplay": {"time log start", "time_log_start"},
"submitter": {"time submitter", "submitter", "resource name", "resource", "employee", "user", "name"},
"hoursLogged": {"time logged", "hours logged", "total hours", "hours", "actual logged"},
"userRole": {"user role", "role"},
"brand": {"brand", "project brand"},
"division": {"business division", "division"},
"hub": {"market", "hub", "business area - lv 2", "business area"},
"projectTitle": {"project title", "project name", "project"},
"projectType": {"project type (from omg)", "project type"},
"projectNumber": {"project number", "project no"},
"assetCount": {"no. of assets", "no of assets", "number of assets", "asset count", "assets"},
"userAgency": {"user agency"},
"employingCompany": {"user employing company", "employing company"},
"sageJobProfile": {"sage job profile", "job profile"},
"projectBillingType": {"project billing type"},
"taskDescription": {"task description", "time log task description", "task name", "task", "activity", "description"},
"projectStatus": {"project status", "status"},
"projectStartDate": {"project start date"},
"projectEndDate": {"project end date"},
"billable": {"billable", "is billable"},
"billingType": {"billing type"},
}
# Generic truthy strings for a literal "Billable" column.
BILLABLE_TRUE_VALUES = {"true", "yes", "1", "billable"}
# Billing-type values (lower-cased) that imply billable=True.
BILLING_TYPE_BILLABLE = {"client related", "fee related"}
# Billing-type values that imply billable=False (and are leave-coded).
BILLING_TYPE_LEAVE = {"leave hours", "leave"}
def _canonicalise_header(raw: str) -> str | None:
if raw is None:
return None
key = str(raw).strip().lower()
if not key:
return None
for canonical, aliases in HEADER_ALIASES.items():
if key in aliases:
return canonical
return None
def _parse_date(v: Any) -> date | None:
"""Parse a date cell. Handles ISO, DD/MM/YYYY (UK/Zoho default), Excel
serials passed through as numbers, and "Month, YYYY" buckets from the
Salesforce "Month & Year (Log Date)" column."""
if v is None or v == "":
return None
if isinstance(v, date) and not isinstance(v, datetime):
return v
if isinstance(v, datetime):
return v.date()
# Excel serial — pass through openpyxl as a number, but the CSV path
# may also see a stringified serial (rare).
if isinstance(v, (int, float)):
# Excel serial → Python date. Skip implausible values to avoid
# treating a real integer like an asset count as a date.
try:
n = float(v)
if 30000 < n < 80000:
# 1900-based serial origin (with the well-known leap-year bug).
base = date(1899, 12, 30)
return date.fromordinal(base.toordinal() + int(n))
except (ValueError, OverflowError):
pass
return None
s = str(v).strip()
if not s:
return None
try:
# ISO short-circuit
return date.fromisoformat(s[:10])
except ValueError:
pass
# "Month YYYY" / "Month, YYYY" — produces the first of the month.
months_short = {
"jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6,
"jul": 7, "aug": 8, "sep": 9, "oct": 10, "nov": 11, "dec": 12,
}
months_full = {
"january": 1, "february": 2, "march": 3, "april": 4,
"may": 5, "june": 6, "july": 7, "august": 8,
"september": 9, "october": 10, "november": 11, "december": 12,
}
parts = s.replace(",", " ").split()
if len(parts) == 2:
m = (months_full.get(parts[0].lower())
or months_short.get(parts[0].lower()[:3]))
if m:
try:
yr = int(parts[1])
if 2000 <= yr <= 2099:
return date(yr, m, 1)
except ValueError:
pass
try:
# dayfirst=True because Zoho regional defaults are commonly DD/MM.
return dateparser.parse(s, dayfirst=True).date()
except (ValueError, TypeError, OverflowError):
return None
def _parse_hours(v: Any) -> float:
if v is None or v == "":
return 0.0
if isinstance(v, (int, float)):
return float(v)
s = str(v).strip()
# Zoho sometimes outputs "7:30" (HH:MM). Convert.
if ":" in s and all(p.isdigit() for p in s.split(":") if p):
parts = s.split(":")
try:
h = int(parts[0])
m = int(parts[1]) if len(parts) > 1 else 0
return h + m / 60.0
except ValueError:
pass
try:
return float(s.replace(",", ""))
except ValueError:
return 0.0
def _parse_billable(v: Any) -> bool:
"""Parse a literal Billable / Is Billable column value."""
if v is None:
return False
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return bool(v)
s = str(v).strip().lower()
if not s:
return False
return s in BILLABLE_TRUE_VALUES
def _parse_billing_type(v: Any) -> str | None:
"""Parse a Billing Type column value to a lowercase canonical string."""
if v is None:
return None
s = str(v).strip().lower()
return s or None
def _parse_asset_count(v: Any) -> float | None:
if v is None or v == "":
return None
if isinstance(v, (int, float)):
return float(v) if v > 0 else None
s = str(v).strip().replace(",", "")
if not s:
return None
try:
n = float(s)
return n if n > 0 else None
except ValueError:
return None
def _parse_str(v: Any) -> str | None:
if v is None:
return None
s = str(v).strip()
return s or None
def _split_submitter(raw: Any) -> tuple[str | None, str | None]:
"""Zoho's "Time Submitter" is "Name (email)" — split into the two parts.
When called on aliased columns ("Resource Name" etc.) the value is just
a plain name with no parens; we return (name, None) in that case.
"""
if raw is None:
return None, None
s = str(raw).strip()
if not s:
return None, None
if "(" in s and s.endswith(")"):
try:
name, rest = s.split("(", 1)
email = rest[:-1].strip()
return name.strip() or None, email or None
except ValueError:
pass
return s, None
# ----------------------------------------------------------------------
# Public API
# ----------------------------------------------------------------------
def parse(filename: str, content: bytes) -> dict[str, Any]:
"""Parse uploaded file. Returns dict with rows, unrecognised_columns, content_hash."""
fn = (filename or "").lower()
if fn.endswith(".xlsx") or fn.endswith(".xlsm"):
rows, unknown = _parse_xlsx(content)
elif fn.endswith(".csv") or fn.endswith(".txt"):
rows, unknown = _parse_csv(content)
else:
# Best-effort sniff: try CSV first, fall back to xlsx.
try:
rows, unknown = _parse_csv(content)
except Exception:
rows, unknown = _parse_xlsx(content)
digest = hashlib.sha256(content).hexdigest()
return {
"rows": rows,
"unrecognised_columns": unknown,
"content_hash": f"sha256:{digest}",
}
# Canonical → default value when the column is missing entirely.
_DEFAULT_ROW: dict[str, Any] = {
"date": None,
"timeLogStartDisplay": None,
"submitter": None,
"submitterEmail": None,
"hoursLogged": 0.0,
"userRole": None,
"brand": None,
"division": None,
"hub": None,
"projectTitle": None,
"projectType": None,
"projectNumber": None,
"assetCount": None,
"userAgency": None,
"employingCompany": None,
"sageJobProfile": None,
"projectBillingType": None,
"taskDescription": None,
"projectStatus": None,
"projectStartDate": None,
"projectEndDate": None,
"billable": False,
"billingType": None,
}
def _build_rows(
raw_rows: Iterable[list[Any]],
headers: list[Any],
) -> tuple[list[dict[str, Any]], list[str]]:
# Map column index → canonical key. Track unknown ones.
# FIRST occurrence of a header wins — the real Zoho CSV repeats
# "Project Number" later in the row, and only the first column has
# reliable per-time-entry data.
canonical_by_idx: dict[int, str] = {}
canonical_seen: set[str] = set()
unrecognised: list[str] = []
unrecognised_seen: set[str] = set()
for idx, raw in enumerate(headers):
if raw is None or str(raw).strip() == "":
continue
canon = _canonicalise_header(raw)
if canon:
if canon in canonical_seen:
continue
canonical_by_idx[idx] = canon
canonical_seen.add(canon)
else:
name = str(raw).strip()
if name and name not in unrecognised_seen:
unrecognised.append(name)
unrecognised_seen.add(name)
present_canonicals = set(canonical_seen)
out: list[dict[str, Any]] = []
for raw_row in raw_rows:
if not raw_row or all(c in (None, "") for c in raw_row):
continue
row = dict(_DEFAULT_ROW)
for idx, canon in canonical_by_idx.items():
if idx >= len(raw_row):
continue
v = raw_row[idx]
if v is None or (isinstance(v, str) and v.strip() == ""):
continue
if canon == "date":
row["date"] = _parse_date(v)
elif canon == "timeLogStartDisplay":
d = _parse_date(v)
row["timeLogStartDisplay"] = d.isoformat() if d else None
elif canon == "hoursLogged":
row["hoursLogged"] = _parse_hours(v)
elif canon == "billable":
row["billable"] = _parse_billable(v)
elif canon == "billingType":
row["billingType"] = _parse_billing_type(v)
elif canon == "assetCount":
row["assetCount"] = _parse_asset_count(v)
elif canon == "submitter":
name, email = _split_submitter(v)
row["submitter"] = name
if email:
row["submitterEmail"] = email
elif canon == "projectStatus":
s = _parse_str(v)
row["projectStatus"] = s.upper() if s else None
elif canon in {"projectStartDate", "projectEndDate"}:
d = _parse_date(v)
row[canon] = d.isoformat() if d else None
else:
row[canon] = _parse_str(v)
# Cross-fill: when only billingType is present, derive billable.
bt = row.get("billingType")
if "billingType" in present_canonicals and bt is not None:
if bt in BILLING_TYPE_BILLABLE:
row["billable"] = True
elif bt in BILLING_TYPE_LEAVE:
row["billable"] = False
# Fall-back: project title defaults to project number when blank.
if not row.get("projectTitle") and row.get("projectNumber"):
row["projectTitle"] = row["projectNumber"]
# Back-compat aliases consumed by services.merge (existing summarise).
# These mirror the v1 field names so downstream code keeps working
# without each call-site needing to be updated.
row["employee"] = row["submitter"]
row["project"] = row["projectTitle"]
row["task"] = row["taskDescription"]
row["hours"] = row["hoursLogged"]
out.append(row)
return out, unrecognised
def _parse_csv(content: bytes) -> tuple[list[dict[str, Any]], list[str]]:
# Decode permissively; Zoho exports are usually utf-8 or utf-8-sig.
text = content.decode("utf-8-sig", errors="replace")
reader = csv.reader(io.StringIO(text))
rows = list(reader)
if not rows:
return [], []
headers = rows[0]
data = rows[1:]
return _build_rows(data, headers)
def _parse_xlsx(content: bytes) -> tuple[list[dict[str, Any]], list[str]]:
wb = load_workbook(io.BytesIO(content), read_only=True, data_only=True)
ws = wb.active
if ws is None:
return [], []
rows_iter = ws.iter_rows(values_only=True)
try:
headers = list(next(rows_iter))
except StopIteration:
return [], []
data = (list(r) for r in rows_iter)
return _build_rows(data, headers)