Replaces a static SPA that shipped an Airtable PAT in the JS bundle.
The new architecture holds all secrets server-side, fronts the app
behind Apache on optical-dev with the shared-vhost split-build pattern,
and is designed for a later Azure AD/MSAL swap-in.
- backend/ FastAPI + uvicorn, local auth (Azure AD stub), Airtable
proxy with TTL cache, Zoho .xlsx/.csv parser, merge
service for utilisation summaries. 28 pytest tests.
- frontend/ React + Vite + TS + Tailwind + Recharts SPA. Login entry
chunk 12.83 KB gzipped; Recharts lazy-loaded. No tokens
or Airtable URLs in the built bundle.
- deploy/ Idempotent deploy.sh (port auto-pick 8200-8299,
.env-persisted) + split-build Apache include template.
- docker-compose.yml pins name: utilisation-dept and binds 127.0.0.1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
203 lines
6.7 KiB
Python
203 lines
6.7 KiB
Python
"""Airtable fetch helpers.
|
|
|
|
Decisions:
|
|
- Paginated with pageSize=100; we follow the `offset` cursor.
|
|
- 429 → sleep 30s then retry once (Airtable docs). Any subsequent 429 raises.
|
|
- Field normalisation lives here so routers/handlers stay schema-pure.
|
|
- Date filtering for bookings uses filterByFormula on Start/End Date — we
|
|
fetch bookings that overlap the requested window (start <= to AND end >= from).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import date
|
|
from typing import Any, AsyncIterator
|
|
from urllib.parse import urlencode
|
|
|
|
import httpx
|
|
|
|
from app.config import settings
|
|
from app.deps.airtable import airtable_client
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Low-level pagination
|
|
# ----------------------------------------------------------------------
|
|
|
|
async def _paginate(
|
|
table: str,
|
|
params: dict[str, Any] | None = None,
|
|
*,
|
|
max_retries_429: int = 1,
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
"""Async iterator yielding individual Airtable records."""
|
|
client = airtable_client.client
|
|
base_params: dict[str, Any] = {"pageSize": 100}
|
|
if params:
|
|
base_params.update(params)
|
|
|
|
offset: str | None = None
|
|
while True:
|
|
q = dict(base_params)
|
|
if offset:
|
|
q["offset"] = offset
|
|
|
|
# urlencode here so list values (filterByFormula doesn't use lists,
|
|
# but fields[] would) are serialised consistently.
|
|
url = f"/{table}?{urlencode(q, doseq=True)}"
|
|
|
|
retries_left = max_retries_429
|
|
while True:
|
|
resp = await client.get(url)
|
|
if resp.status_code == 429 and retries_left > 0:
|
|
logger.warning("Airtable 429 on %s — sleeping 30s before retry", table)
|
|
await asyncio.sleep(30)
|
|
retries_left -= 1
|
|
continue
|
|
resp.raise_for_status()
|
|
break
|
|
|
|
payload = resp.json()
|
|
for rec in payload.get("records", []):
|
|
yield rec
|
|
offset = payload.get("offset")
|
|
if not offset:
|
|
return
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Field normalisation
|
|
# ----------------------------------------------------------------------
|
|
|
|
def _to_bool(v: Any) -> bool:
|
|
if isinstance(v, bool):
|
|
return v
|
|
if v is None:
|
|
return False
|
|
if isinstance(v, str):
|
|
return v.strip().lower() in {"true", "yes", "1", "checked"}
|
|
return bool(v)
|
|
|
|
|
|
def _to_float(v: Any, default: float = 0.0) -> float:
|
|
if v is None or v == "":
|
|
return default
|
|
try:
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _to_date(v: Any) -> date | None:
|
|
if not v:
|
|
return None
|
|
if isinstance(v, date):
|
|
return v
|
|
try:
|
|
# Airtable returns ISO date strings.
|
|
return date.fromisoformat(str(v)[:10])
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _as_list(v: Any) -> list[str]:
|
|
if v is None:
|
|
return []
|
|
if isinstance(v, list):
|
|
return [str(x) for x in v]
|
|
return [str(v)]
|
|
|
|
|
|
def normalise_resource(rec: dict[str, Any]) -> dict[str, Any]:
|
|
f = rec.get("fields", {})
|
|
# Roles may be a multi-select array, a single linked record name, or a string.
|
|
roles_raw = f.get("Roles") or f.get("Role") or []
|
|
return {
|
|
"recordId": rec.get("id"),
|
|
"name": f.get("Name") or f.get("Resource Name") or "",
|
|
"email": f.get("Email") or None,
|
|
"department": f.get("Department") or None,
|
|
"roles": _as_list(roles_raw),
|
|
"inactive": _to_bool(f.get("Inactive")),
|
|
"availHoursPerWeek": _to_float(
|
|
f.get("Availability Hour (per week)")
|
|
or f.get("Availability Hours (per week)")
|
|
or f.get("Available Hours")
|
|
or 0
|
|
),
|
|
"startDate": _to_date(f.get("Start Date")),
|
|
"endDate": _to_date(f.get("End Date")),
|
|
"employmentType": f.get("Employment Type") or f.get("FTE / Freelancer") or None,
|
|
"country": f.get("Country") or None,
|
|
}
|
|
|
|
|
|
def normalise_booking(rec: dict[str, Any]) -> dict[str, Any]:
|
|
f = rec.get("fields", {})
|
|
return {
|
|
"id": rec.get("id"),
|
|
"task": f.get("Task") or f.get("Task Description") or None,
|
|
"startDate": _to_date(f.get("Start Date")),
|
|
"endDate": _to_date(f.get("End Date")),
|
|
"resourceName": (
|
|
(f.get("Resource Name (from Resource)") or [None])[0]
|
|
if isinstance(f.get("Resource Name (from Resource)"), list)
|
|
else f.get("Resource Name") or f.get("Resource") or None
|
|
),
|
|
"projectNumber": f.get("Project Number") or f.get("Project No.") or None,
|
|
"projectName": f.get("Project Name") or f.get("Project Title") or None,
|
|
"department": f.get("Department") or None,
|
|
"division": f.get("Division") or None,
|
|
"hoursSelection": _as_list(f.get("Hours Selection") or f.get("Days") or []),
|
|
"totalHoursBooked": _to_float(
|
|
f.get("Total Hours Booked") or f.get("Total Hours") or 0
|
|
),
|
|
"bookingStatus": f.get("Booking Status") or f.get("Status") or None,
|
|
"placeholder": _to_bool(f.get("Placeholder")),
|
|
}
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Public fetchers
|
|
# ----------------------------------------------------------------------
|
|
|
|
async def fetch_resources(*, include_inactive: bool = False) -> list[dict[str, Any]]:
|
|
params: dict[str, Any] = {}
|
|
if not include_inactive:
|
|
# Airtable formula — only resources not marked inactive.
|
|
params["filterByFormula"] = "NOT({Inactive})"
|
|
out: list[dict[str, Any]] = []
|
|
async for rec in _paginate(settings.AIRTABLE_TABLE_RESOURCES, params):
|
|
out.append(normalise_resource(rec))
|
|
return out
|
|
|
|
|
|
def _date_filter(from_: date | None, to: date | None) -> str | None:
|
|
"""Build a filterByFormula that picks bookings overlapping [from_, to]."""
|
|
if not from_ and not to:
|
|
return None
|
|
clauses: list[str] = []
|
|
if to is not None:
|
|
# Start <= to → IS_BEFORE({Start Date}, to+1) for safety.
|
|
clauses.append(f"IS_BEFORE({{Start Date}}, '{to.isoformat()}')")
|
|
if from_ is not None:
|
|
clauses.append(f"IS_AFTER({{End Date}}, '{from_.isoformat()}')")
|
|
if len(clauses) == 1:
|
|
return clauses[0]
|
|
return f"AND({', '.join(clauses)})"
|
|
|
|
|
|
async def fetch_bookings(*, from_: date | None = None, to: date | None = None) -> list[dict[str, Any]]:
|
|
params: dict[str, Any] = {}
|
|
formula = _date_filter(from_, to)
|
|
if formula:
|
|
params["filterByFormula"] = formula
|
|
out: list[dict[str, Any]] = []
|
|
async for rec in _paginate(settings.AIRTABLE_TABLE_BOOKINGS, params):
|
|
out.append(normalise_booking(rec))
|
|
return out
|