loreal-utilisation-dept/backend/app/services/airtable_fetch.py
DJP d4c6576a95 parity v3: two-bar charts, airtable link fallbacks, filter split, weekly comparison, project-type detail
Round 3 of parity fixes after the user shared side-by-side screen
recordings of our build vs. the original SPA. 72/72 tests, frontend
typecheck/lint/build clean, main entry 16.95 KB gz.

Airtable booking → person linkage (root cause of empty Resource
Availability + Daily Breakdown):
- normalise_booking now tries Resource / Booking Resource / Booked
  Resource / Resource (from Booking) / Resource Name (from Resource)
  as fallbacks for resourceRecordIds — first non-empty wins. Only
  values matching `rec` + 17 chars are kept.
- One-shot LOG_AIRTABLE_SCHEMA_ONCE info log on the first booking
  response so we can see what fields the live base actually returns.
  Flip to False once we've confirmed the field name.
- Name-based fallback in department.py already in place.

Charts:
- DeptWeeklyChart: two bars per entity. Bar 1 stacks Soft Booked + Active
  Booked. Bar 2 stacks Allocated + per-billing-category. Red Avg %
  utilisation line crosses both. Legend gains Active/Soft Booked +
  Avg %.
- DailyBreakdownModal: two bars per weekday (allocated + billing),
  Active Booked + Soft Booked pills at the top, full two-row legend
  matching frame f020.

Filters:
- New GlobalFilterBar (Division/Brand/Hub/Role/From/To/Reset) lives
  above the tab nav in ProtectedShell, visible on every page.
- New DepartmentFilterBar (Name/Division/Brand/Employment) lives inside
  Department.tsx only.
- Resourcing / Bookings lose their redundant inline FilterBar — global
  one covers them.

Forecast:
- Pipeline chart bars now stacked by project type (PIPELINE_TYPES
  palette). Legend below the chart includes the type colours +
  Exiting + Forecast avg.
- New WeeklyComparisonTable below the pipeline: This Week / Next Week
  / Week +2 / Week +3 × project type, Active / Exit counts per cell,
  totals row.
- "Last Week" subtitle now reads "Full week actual hours (Mon–Fri)" —
  matches the original SPA's semantic.
- Backend: ForecastWeek gains activeAssetsByType + exitingByType maps.

Project Type Summary:
- Selected-type detail panel below the table: avg h/asset + avg
  duration tiles (with min–max range), totals line, dept hours
  segment bar with colour legend, Insights & Recommended Actions
  panel, Panel 1 chart (avg h/asset by completion month, stacked by
  division).
- Backend: ProjectTypeStatExtended gains deptHoursBreakdown,
  monthlyAvgHoursPerAsset (with byDivision), minDurationDays.

Adhoc People:
- Department page now surfaces a small warning card next to the dept
  pills listing the top 6 unmatched Zoho submitter emails + a "+N
  more" count.

Header subtitle:
- Reads "<filename> · last updated <localised dd/mm/yyyy hh:mm:ss>"
  when parsed_at is present in the parse response. Backend's
  /api/timelog/parse now emits parsed_at (ISO 8601). Falls back to
  row count if missing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 08:50:21 -04:00

334 lines
12 KiB
Python

"""Airtable fetch helpers.
Decisions:
- Paginated with pageSize=100; we follow the `offset` cursor.
- 429 → sleep 30s then retry once (Airtable docs). Any subsequent 429 raises.
- Field normalisation lives here so routers/handlers stay schema-pure.
- Date filtering for bookings uses filterByFormula on Start/End Date — we
fetch bookings that overlap the requested window (start <= to AND end >= from).
"""
from __future__ import annotations
import asyncio
import logging
from datetime import date
from typing import Any, AsyncIterator
from urllib.parse import urlencode
import httpx
from app.config import settings
from app.deps.airtable import airtable_client
logger = logging.getLogger(__name__)
# One-shot schema logger — set to True to log the first raw booking record's
# field keys so we can verify which link-field name Airtable actually uses.
# Flips to False after the first booking is normalised (see normalise_booking).
LOG_AIRTABLE_SCHEMA_ONCE: bool = True
# ----------------------------------------------------------------------
# Low-level pagination
# ----------------------------------------------------------------------
async def _paginate(
table: str,
params: dict[str, Any] | None = None,
*,
max_retries_429: int = 1,
) -> AsyncIterator[dict[str, Any]]:
"""Async iterator yielding individual Airtable records."""
client = airtable_client.client
base_params: dict[str, Any] = {"pageSize": 100}
if params:
base_params.update(params)
offset: str | None = None
while True:
q = dict(base_params)
if offset:
q["offset"] = offset
# urlencode here so list values (filterByFormula doesn't use lists,
# but fields[] would) are serialised consistently.
url = f"/{table}?{urlencode(q, doseq=True)}"
retries_left = max_retries_429
while True:
resp = await client.get(url)
if resp.status_code == 429 and retries_left > 0:
logger.warning("Airtable 429 on %s — sleeping 30s before retry", table)
await asyncio.sleep(30)
retries_left -= 1
continue
resp.raise_for_status()
break
payload = resp.json()
for rec in payload.get("records", []):
yield rec
offset = payload.get("offset")
if not offset:
return
# ----------------------------------------------------------------------
# Field normalisation
# ----------------------------------------------------------------------
def _to_bool(v: Any) -> bool:
if isinstance(v, bool):
return v
if v is None:
return False
if isinstance(v, str):
return v.strip().lower() in {"true", "yes", "1", "checked"}
return bool(v)
def _to_float(v: Any, default: float = 0.0) -> float:
if v is None or v == "":
return default
try:
return float(v)
except (TypeError, ValueError):
return default
def _to_date(v: Any) -> date | None:
if not v:
return None
if isinstance(v, date):
return v
try:
# Airtable returns ISO date strings.
return date.fromisoformat(str(v)[:10])
except ValueError:
return None
def _as_list(v: Any) -> list[str]:
if v is None:
return []
if isinstance(v, list):
return [str(x) for x in v]
return [str(v)]
def _flatten(v: Any) -> str | None:
# Airtable returns linked-record and lookup fields as lists, even when
# the cardinality is 1 — e.g. "Resource Name" on Booking Resource comes
# back as ["Aadesh Khale"]. Callers downstream (merge service, frontend
# types) expect scalars; flatten here at the boundary.
if v is None:
return None
if isinstance(v, list):
return str(v[0]).strip() if v and v[0] is not None else None
s = str(v).strip()
return s or None
def normalise_resource(rec: dict[str, Any]) -> dict[str, Any]:
f = rec.get("fields", {})
# Roles may be a multi-select array, a single linked record name, or a string.
roles_raw = f.get("Roles") or f.get("Role") or []
return {
"recordId": rec.get("id"),
"name": f.get("Name") or f.get("Resource Name") or "",
"email": f.get("Email") or None,
"department": f.get("Department") or None,
"roles": _as_list(roles_raw),
"inactive": _to_bool(f.get("Inactive")),
"availHoursPerWeek": _to_float(
f.get("Availability Hour (per week)")
or f.get("Availability Hours (per week)")
or f.get("Available Hours")
or 0
),
"startDate": _to_date(f.get("Start Date")),
"endDate": _to_date(f.get("End Date")),
"employmentType": f.get("Employment Type") or f.get("FTE / Freelancer") or None,
"country": f.get("Country") or None,
}
def normalise_booking(rec: dict[str, Any]) -> dict[str, Any]:
global LOG_AIRTABLE_SCHEMA_ONCE
f = rec.get("fields", {})
# One-shot schema log so deployments can see the exact field names the
# live Airtable base exposes. Flips to False after the first booking is
# processed so we don't spam logs.
if LOG_AIRTABLE_SCHEMA_ONCE:
try:
logger.info(
"Airtable booking schema (first record) — id=%s · field keys=%s",
rec.get("id"),
sorted(list(f.keys())),
)
finally:
LOG_AIRTABLE_SCHEMA_ONCE = False
# Linked-record fields can live under several different names depending
# on how the base was set up. Try each in priority order and take the
# FIRST non-empty list — falling back to flattened name lookup later.
rec_id_candidates = (
f.get("Resource"),
f.get("Booking Resource"),
f.get("Booked Resource"),
f.get("Resource (from Booking)"),
f.get("Resource Name (from Resource)"),
)
resource_record_ids: list[str] = []
for candidate in rec_id_candidates:
rids = _as_list(candidate or [])
# Only treat as record-ids if values look like Airtable recIDs
# (`rec` prefix, 17 chars). Otherwise this was probably a name lookup
# and we use the flattened name path instead.
rids = [r for r in rids if r.startswith("rec") and len(r) == 17]
if rids:
resource_record_ids = rids
break
# Every linked/lookup field on Booking Resource comes back as a list from
# Airtable — flatten at the boundary so downstream consumers get scalars.
return {
"id": rec.get("id"),
"task": _flatten(f.get("Task") or f.get("Task Description")),
"startDate": _to_date(f.get("Start Date")),
"endDate": _to_date(f.get("End Date")),
"resourceName": _flatten(
f.get("Resource Name (from Resource)")
or f.get("Resource Name")
or f.get("Resource")
or f.get("Booking Resource")
or f.get("Booked Resource")
),
# Linked-record ids — used by the Department / Daily Breakdown
# services to resolve bookings to people without relying on the
# flattened name lookup (which can mis-match for renamed people).
"resourceRecordIds": resource_record_ids,
"projectNumber": _flatten(
f.get("Project Number (from Master)")
or f.get("Project Number")
or f.get("Project No.")
),
"projectName": _flatten(
f.get("Project Name (from Master)")
or f.get("Project Name")
or f.get("Project Title")
),
"department": _flatten(
f.get("Department (from Resource Name)")
or f.get("Department")
),
"division": _flatten(f.get("Division")),
"hoursSelection": _as_list(f.get("Hours Selection") or f.get("Days") or []),
"totalHoursBooked": _to_float(
f.get("Total Hours Booked") or f.get("Total Hours") or 0
),
"bookingStatus": _flatten(f.get("Booking Status") or f.get("Status")),
"placeholder": _to_bool(f.get("Placeholder")),
}
# ----------------------------------------------------------------------
# Public fetchers
# ----------------------------------------------------------------------
async def fetch_resources(*, include_inactive: bool = False) -> list[dict[str, Any]]:
params: dict[str, Any] = {}
if not include_inactive:
# Airtable formula — only resources not marked inactive.
params["filterByFormula"] = "NOT({Inactive})"
out: list[dict[str, Any]] = []
async for rec in _paginate(settings.AIRTABLE_TABLE_RESOURCES, params):
out.append(normalise_resource(rec))
return out
def _date_filter(from_: date | None, to: date | None) -> str | None:
"""Build a filterByFormula that picks bookings overlapping [from_, to]."""
if not from_ and not to:
return None
clauses: list[str] = []
if to is not None:
# Start <= to → IS_BEFORE({Start Date}, to+1) for safety.
clauses.append(f"IS_BEFORE({{Start Date}}, '{to.isoformat()}')")
if from_ is not None:
clauses.append(f"IS_AFTER({{End Date}}, '{from_.isoformat()}')")
if len(clauses) == 1:
return clauses[0]
return f"AND({', '.join(clauses)})"
def _escape_formula_literal(s: str) -> str:
"""Escape a value for safe embedding inside a single-quoted Airtable
formula literal. Single quotes are the only escape we need; Airtable
treats backslashes literally inside string literals, so we use
`\\'` to terminate-and-resume the quoted string semantics expected
by formula engines."""
return s.replace("\\", "\\\\").replace("'", "\\'")
def _multi_value_or(field: str, raw: str) -> str | None:
"""Build an OR(...) clause matching `field` against each comma-
separated value in `raw`. Returns None if no non-empty values."""
parts = [v.strip() for v in raw.split(",")]
parts = [p for p in parts if p]
if not parts:
return None
pieces = [f"{{{field}}}='{_escape_formula_literal(v)}'" for v in parts]
if len(pieces) == 1:
return pieces[0]
return f"OR({', '.join(pieces)})"
def _bookings_filter(
from_: date | None,
to: date | None,
department: str | None,
name: str | None,
) -> str | None:
"""Combine date overlap + optional department/name filters into a
single AND(...) filterByFormula. Department matches against the
flattened lookup field `Department (from Resource Name)`; name
against `Resource Name (from Resource)`."""
clauses: list[str] = []
date_clause = _date_filter(from_, to)
if date_clause:
clauses.append(date_clause)
if department:
dep_clause = _multi_value_or("Department (from Resource Name)", department)
if dep_clause:
clauses.append(dep_clause)
if name:
name_clause = _multi_value_or("Resource Name (from Resource)", name)
if name_clause:
clauses.append(name_clause)
if not clauses:
return None
if len(clauses) == 1:
return clauses[0]
return f"AND({', '.join(clauses)})"
async def fetch_bookings(
*,
from_: date | None = None,
to: date | None = None,
department: str | None = None,
name: str | None = None,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {}
formula = _bookings_filter(from_, to, department, name)
if formula:
params["filterByFormula"] = formula
out: list[dict[str, Any]] = []
async for rec in _paginate(settings.AIRTABLE_TABLE_BOOKINGS, params):
out.append(normalise_booking(rec))
return out