loreal-utilisation-dept/backend/app/services/forecast.py
DJP d4c6576a95 parity v3: two-bar charts, airtable link fallbacks, filter split, weekly comparison, project-type detail
Round 3 of parity fixes after the user shared side-by-side screen
recordings of our build vs. the original SPA. 72/72 tests, frontend
typecheck/lint/build clean, main entry 16.95 KB gz.

Airtable booking → person linkage (root cause of empty Resource
Availability + Daily Breakdown):
- normalise_booking now tries Resource / Booking Resource / Booked
  Resource / Resource (from Booking) / Resource Name (from Resource)
  as fallbacks for resourceRecordIds — first non-empty wins. Only
  values matching `rec` + 17 chars are kept.
- One-shot LOG_AIRTABLE_SCHEMA_ONCE info log on the first booking
  response so we can see what fields the live base actually returns.
  Flip to False once we've confirmed the field name.
- Name-based fallback in department.py already in place.

Charts:
- DeptWeeklyChart: two bars per entity. Bar 1 stacks Soft Booked + Active
  Booked. Bar 2 stacks Allocated + per-billing-category. Red Avg %
  utilisation line crosses both. Legend gains Active/Soft Booked +
  Avg %.
- DailyBreakdownModal: two bars per weekday (allocated + billing),
  Active Booked + Soft Booked pills at the top, full two-row legend
  matching frame f020.

Filters:
- New GlobalFilterBar (Division/Brand/Hub/Role/From/To/Reset) lives
  above the tab nav in ProtectedShell, visible on every page.
- New DepartmentFilterBar (Name/Division/Brand/Employment) lives inside
  Department.tsx only.
- Resourcing / Bookings lose their redundant inline FilterBar — global
  one covers them.

Forecast:
- Pipeline chart bars now stacked by project type (PIPELINE_TYPES
  palette). Legend below the chart includes the type colours +
  Exiting + Forecast avg.
- New WeeklyComparisonTable below the pipeline: This Week / Next Week
  / Week +2 / Week +3 × project type, Active / Exit counts per cell,
  totals row.
- "Last Week" subtitle now reads "Full week actual hours (Mon–Fri)" —
  matches the original SPA's semantic.
- Backend: ForecastWeek gains activeAssetsByType + exitingByType maps.

Project Type Summary:
- Selected-type detail panel below the table: avg h/asset + avg
  duration tiles (with min–max range), totals line, dept hours
  segment bar with colour legend, Insights & Recommended Actions
  panel, Panel 1 chart (avg h/asset by completion month, stacked by
  division).
- Backend: ProjectTypeStatExtended gains deptHoursBreakdown,
  monthlyAvgHoursPerAsset (with byDivision), minDurationDays.

Adhoc People:
- Department page now surfaces a small warning card next to the dept
  pills listing the top 6 unmatched Zoho submitter emails + a "+N
  more" count.

Header subtitle:
- Reads "<filename> · last updated <localised dd/mm/yyyy hh:mm:ss>"
  when parsed_at is present in the parse response. Backend's
  /api/timelog/parse now emits parsed_at (ISO 8601). Falls back to
  row count if missing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 08:50:21 -04:00

408 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Forecast pipeline — 4 weeks ahead capacity planning.
Decisions:
- Mirrors the original SPA's `buildForecast` math but trims the output
surface to the brief's contract: weekly rows with
activeAssets / exitingAssets / exitRate / weeklyThroughput /
deptCapacityAssetsPerWeek / canTakeOn, plus a single capacity decision
string. The TS port had a full multi-type breakdown — out of scope.
- We avoid pulling the original's 8-week look-back / weighted-mix logic
in full; the brief just needs the weekly assets/throughput pair.
- Pure function (no FastAPI imports here) so it can be unit-tested.
- Inputs are the parsed-timelog rows (camelCase from zoho_parse) plus
optional projectSummary rows and deliverables. When PS is supplied we
use it as the authoritative active-projects list; otherwise we infer
from the time log (last-seen projectStartDate / projectEndDate).
"""
from __future__ import annotations
from datetime import date, datetime, timedelta
from typing import Any, Iterable
def _iso(d: date) -> str:
return d.isoformat()
def _iso_week_label(d: date) -> str:
iso = d.isocalendar()
return f"W{iso.week:02d}"
def _monday_of(d: date) -> date:
return d - timedelta(days=d.weekday())
def _coerce_date(v: Any) -> date | None:
if v is None or v == "":
return None
if isinstance(v, date) and not isinstance(v, datetime):
return v
if isinstance(v, datetime):
return v.date()
try:
return date.fromisoformat(str(v)[:10])
except (ValueError, TypeError):
return None
def _weekday_overlap(a_start: date, a_end: date, b_start: date, b_end: date) -> int:
"""Mon-Fri overlap count between [a] and [b] (inclusive)."""
if not a_start or not a_end or not b_start or not b_end:
return 0
start = max(a_start, b_start)
end = min(a_end, b_end)
if end < start:
return 0
days = 0
d = start
while d <= end:
if d.weekday() < 5:
days += 1
d += timedelta(days=1)
return days
def _weekdays_in(start: date, end: date) -> int:
return _weekday_overlap(start, end, start, end)
def _historic_weekly_throughput(rows: Iterable[dict[str, Any]]) -> float:
"""Average ISO-week asset throughput from completed projects in the
time log. We approximate "throughput" as the average distinct
project count per ISO week over the last 4 full weeks of data.
"""
by_week: dict[str, set[str]] = {}
for r in rows:
d = _coerce_date(r.get("date"))
if not d:
continue
title = (r.get("projectTitle") or r.get("projectNumber") or "").strip()
if not title:
continue
iso = d.isocalendar()
key = f"{iso.year:04d}-W{iso.week:02d}"
by_week.setdefault(key, set()).add(title)
if not by_week:
return 0.0
keys = sorted(by_week.keys())
# Drop the most recent ("partial") week and average over the next 4.
full = keys[:-1] if len(keys) > 1 else keys
baseline = full[-4:]
if not baseline:
return 0.0
return sum(len(by_week[k]) for k in baseline) / len(baseline)
def _avg_hours_per_asset(rows: Iterable[dict[str, Any]]) -> float:
"""Weighted avg hours-per-asset across the time log."""
project_hours: dict[str, float] = {}
project_assets: dict[str, float] = {}
for r in rows:
title = (r.get("projectTitle") or r.get("projectNumber") or "").strip()
if not title:
continue
project_hours[title] = project_hours.get(title, 0.0) + float(r.get("hoursLogged") or r.get("hours") or 0)
ac = r.get("assetCount")
if ac and ac > 0:
project_assets[title] = float(ac)
total_h = 0.0
total_a = 0.0
for p, a in project_assets.items():
total_h += project_hours.get(p, 0.0)
total_a += a
return (total_h / total_a) if total_a > 0 else 0.0
def _is_active_status(s: str | None) -> bool:
if not s:
return True
u = s.upper()
return u not in {"COMPLETE", "CANCELLED", "REJECTED", "DECLINED", "APPROVED"}
def _project_universe(
logs: Iterable[dict[str, Any]],
project_summary: Iterable[dict[str, Any]] | None,
deliverables: Iterable[dict[str, Any]] | None,
) -> list[dict[str, Any]]:
"""Project records with {projectNumber, projectTitle, projectType,
projectStartDate, projectEndDate, assetCount, status}. PS wins; else
fall back to time-log-derived rows enriched with deliverable dates.
"""
if project_summary:
ps = [dict(p) for p in project_summary]
return ps
del_dates: dict[str, tuple[str | None, str | None, str | None]] = {}
del_assets: dict[str, float] = {}
for d in (deliverables or []):
pn = d.get("projectNumber")
if not pn:
continue
if pn not in del_dates:
del_dates[pn] = (d.get("projectStartDate"), d.get("projectEndDate"), d.get("projectStatus"))
del_assets[pn] = del_assets.get(pn, 0.0) + 1.0
by_proj: dict[str, dict[str, Any]] = {}
for r in logs:
pn = r.get("projectNumber") or r.get("projectTitle")
if not pn:
continue
p = by_proj.setdefault(pn, {
"projectNumber": r.get("projectNumber"),
"projectTitle": r.get("projectTitle") or pn,
"projectType": r.get("projectType"),
"projectStartDate": r.get("projectStartDate"),
"projectEndDate": r.get("projectEndDate"),
"projectStatus": r.get("projectStatus"),
"assetCount": r.get("assetCount"),
})
if not p.get("projectStartDate") and r.get("projectStartDate"):
p["projectStartDate"] = r["projectStartDate"]
if not p.get("projectEndDate") and r.get("projectEndDate"):
p["projectEndDate"] = r["projectEndDate"]
if p.get("assetCount") in (None, 0) and r.get("assetCount"):
p["assetCount"] = r["assetCount"]
# Enrich from deliverables when fields are missing.
for pn, p in by_proj.items():
if pn in del_dates:
ds, de, st = del_dates[pn]
if not p.get("projectStartDate"):
p["projectStartDate"] = ds
if not p.get("projectEndDate"):
p["projectEndDate"] = de
if not p.get("projectStatus") and st:
p["projectStatus"] = st
if not p.get("assetCount") and pn in del_assets:
p["assetCount"] = del_assets[pn]
return list(by_proj.values())
def build_forecast(
*,
logs: list[dict[str, Any]],
project_summary: list[dict[str, Any]] | None = None,
deliverables: list[dict[str, Any]] | None = None,
weeks_ahead: int = 4,
headcount: int | None = None,
workday_hours_per_week: float = 40.0,
from_date: date | None = None,
) -> dict[str, Any]:
"""Compute the 4-week capacity outlook.
Returns the contract documented in the brief: a list of week dicts +
totals + decision string. Pure function, no I/O.
"""
today = from_date or date.today()
week_start = _monday_of(today)
projects = _project_universe(logs, project_summary, deliverables)
# Active project filter: status not complete AND start <= week-end AND
# (no end OR end >= window-start). We materialise the per-project
# date window once.
active_projects: list[dict[str, Any]] = []
for p in projects:
if not _is_active_status(p.get("projectStatus")):
continue
ps = _coerce_date(p.get("projectStartDate"))
pe = _coerce_date(p.get("projectEndDate")) or (ps and ps + timedelta(days=90))
if not ps or not pe:
continue
active_projects.append({
**p,
"_start": ps,
"_end": pe,
"_assets": float(p.get("assetCount") or 0.0),
"_total_weekdays": max(_weekdays_in(ps, pe), 1),
})
# Historical baselines.
weekly_throughput = _historic_weekly_throughput(logs)
avg_hpa = _avg_hours_per_asset(logs)
if headcount is None:
headcount = _baseline_headcount(logs)
weekly_team_hours = headcount * workday_hours_per_week
dept_capacity = (weekly_team_hours / avg_hpa) if avg_hpa > 0 else 0.0
weeks: list[dict[str, Any]] = []
for i in range(weeks_ahead):
w_start = week_start + timedelta(days=7 * i)
w_end = w_start + timedelta(days=6)
active_assets = 0.0
exiting_assets = 0.0
# Per-project-type breakdowns — populates the stacked pipeline chart
# and the weekly comparison table on the Forecast page.
active_by_type: dict[str, float] = {}
exiting_by_type: dict[str, float] = {}
for p in active_projects:
overlap = _weekday_overlap(p["_start"], p["_end"], w_start, w_end)
if overlap <= 0:
continue
share = overlap / p["_total_weekdays"]
contribution = p["_assets"] * share
active_assets += contribution
ptype = (p.get("projectType") or "Other") or "Other"
active_by_type[ptype] = active_by_type.get(ptype, 0.0) + contribution
if w_start <= p["_end"] <= w_end:
exiting_assets += p["_assets"]
exiting_by_type[ptype] = (
exiting_by_type.get(ptype, 0.0) + p["_assets"]
)
exit_rate = (exiting_assets / active_assets * 100.0) if active_assets > 0 else 0.0
can_take_on = max(dept_capacity - active_assets, 0.0)
weeks.append({
"weekStart": _iso(w_start),
"weekEnd": _iso(w_end),
"weekLabel": _iso_week_label(w_start),
"activeAssets": round(active_assets, 1),
"exitingAssets": round(exiting_assets, 1),
"exitRatePct": round(exit_rate, 1),
"weeklyThroughput": round(weekly_throughput, 1),
"deptCapacityAssetsPerWeek": round(dept_capacity, 1),
"canTakeOn": round(can_take_on, 1),
"activeAssetsByType": {k: round(v, 1) for k, v in active_by_type.items()},
"exitingByType": {k: round(v, 1) for k, v in exiting_by_type.items()},
})
decision = _capacity_decision(weeks, dept_capacity)
# ── Current-week + next-week snapshots ─────────────────────────────
# The rebuilt Forecast page wants a two-row KPI strip describing the
# current week's status. Compute these from `weeks[0]` plus the logs
# already in scope. Keeps backward compat — existing consumers of the
# `weeks` / `totals` / `decision` fields still see them unchanged.
this_week = weeks[0] if weeks else None
next_week = weeks[1] if len(weeks) > 1 else None
capacity_hours = headcount * workday_hours_per_week
this_week_logged = 0.0
if this_week:
wf = date.fromisoformat(this_week["weekStart"])
wt = date.fromisoformat(this_week["weekEnd"])
for r in logs:
d = _coerce_date(r.get("date"))
if d and wf <= d <= wt:
this_week_logged += float(r.get("hoursLogged") or r.get("hours") or 0.0)
this_week_forecast_h = (
this_week["activeAssets"] * avg_hpa if this_week and avg_hpa > 0 else 0.0
)
# Per-type forecast subbreakdown for the "Forecast Hours Needed" tile.
type_breakdown: list[dict[str, Any]] = []
if this_week:
wf = date.fromisoformat(this_week["weekStart"])
wt = date.fromisoformat(this_week["weekEnd"])
by_type: dict[str, dict[str, float]] = {}
for p in active_projects:
ov = _weekday_overlap(p["_start"], p["_end"], wf, wt)
if ov <= 0:
continue
share = ov / p["_total_weekdays"]
assets = p["_assets"] * share
ptype = (p.get("projectType") or "Other") or "Other"
entry = by_type.setdefault(ptype, {"assets": 0.0, "hours": 0.0})
entry["assets"] += assets
entry["hours"] += assets * avg_hpa
for ptype, v in sorted(by_type.items(), key=lambda kv: -kv[1]["hours"]):
type_breakdown.append({
"projectType": ptype,
"assets": round(v["assets"], 1),
"hours": round(v["hours"], 1),
})
# Available next week = capacity forecast hours.
available_this_week = max(capacity_hours - this_week_forecast_h, 0.0)
return {
"weeks": weeks,
"totals": {
"weeklyThroughput": round(weekly_throughput, 1),
"deptCapacityAssetsPerWeek": round(dept_capacity, 1),
"avgHoursPerAsset": round(avg_hpa, 2),
"baselineHeadcount": headcount,
"weeksAhead": weeks_ahead,
"from": _iso(week_start),
},
"decision": decision,
"thisWeek": (
{
"weekStart": this_week["weekStart"],
"weekEnd": this_week["weekEnd"],
"weekLabel": this_week["weekLabel"],
"capacityHours": round(capacity_hours, 1),
"forecastHours": round(this_week_forecast_h, 1),
"forecastByType": type_breakdown,
"loggedHours": round(this_week_logged, 1),
"availableHours": round(available_this_week, 1),
"utilForecastPct": round((this_week_forecast_h / capacity_hours * 100.0) if capacity_hours > 0 else 0.0, 1),
"utilActualPct": round((this_week_logged / capacity_hours * 100.0) if capacity_hours > 0 else 0.0, 1),
"activeAssets": this_week["activeAssets"],
"exitingAssets": this_week["exitingAssets"],
"exitRatePct": this_week["exitRatePct"],
"canTakeOn": this_week["canTakeOn"],
"assetForecastPerWeek": round(weekly_throughput, 1),
}
if this_week
else None
),
"nextWeek": (
{
"weekStart": next_week["weekStart"],
"weekEnd": next_week["weekEnd"],
"weekLabel": next_week["weekLabel"],
"activeAssets": next_week["activeAssets"],
}
if next_week
else None
),
}
def _baseline_headcount(logs: Iterable[dict[str, Any]]) -> int:
"""Distinct people active in the last 4 full ISO weeks of data."""
by_week: dict[str, set[str]] = {}
for r in logs:
d = _coerce_date(r.get("date"))
if not d:
continue
ident = (r.get("submitterEmail") or r.get("submitter") or "").strip()
if not ident:
continue
iso = d.isocalendar()
key = f"{iso.year:04d}-W{iso.week:02d}"
by_week.setdefault(key, set()).add(ident)
if not by_week:
return 0
keys = sorted(by_week.keys())
full = keys[:-1] if len(keys) > 1 else keys
baseline = full[-4:]
if not baseline:
return 0
avg = sum(len(by_week[k]) for k in baseline) / len(baseline)
return max(round(avg), 0)
def _capacity_decision(weeks: list[dict[str, Any]], dept_capacity: float) -> str:
"""Pick a decision label from the first week's active vs capacity ratio.
Mirrors the original SPA's "Capacity Decision" wording.
"""
if not weeks:
return "No data — upload a time log to compute capacity"
head = weeks[0]
active = float(head["activeAssets"])
cap = float(dept_capacity)
if cap <= 0:
return "No capacity baseline — needs hours-per-asset history"
ratio = active / cap
if ratio < 0.85:
return "OK to take on small briefs"
if ratio < 1.1:
return "At capacity"
return "Overloaded — push back"