gmal-scope-builder/backend/app/services/team_shape.py
DJP 2d44103603 Fix hours × volume bug: store per-1-asset hours, link directly to GMAL
Ratecard lines now store total_hours as per-1-asset hours (= base_hours,
linked to the GMAL row), with volume tracked separately. Aggregators
(team_shape, ratecard summary, Excel matrix, in-app ratecard tab) multiply
by volume themselves when computing total effort. Display behavior is
preserved; storage semantics are clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:11:04 -04:00

116 lines
4 KiB
Python

"""Calculate team shape (FTE headcount) from ratecard data."""
import logging
from collections import defaultdict
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.gmal import Role
from app.models.project import Project, RatecardLine
logger = logging.getLogger(__name__)
HOURS_PER_FTE = 1800
MAX_EFFICIENCY = 90 # Cap at 90% - never fully eliminate a role
async def calculate_team_shape(
db: AsyncSession,
project: Project,
efficiency_pct: float = 0,
profile_rates: dict[str, float] | None = None,
tool_rates: dict[str, float] | None = None,
) -> list[dict]:
"""Calculate FTE headcount per role from the ratecard.
Efficiency can be applied in three ways (in priority order):
1. profile_rates + tool_rates: per-discipline rates from profile + additive tool rates
2. efficiency_pct: legacy blanket percentage for all delivery roles
3. None: no efficiency applied
Programme roles are NEVER reduced regardless of method.
Total efficiency per discipline is capped at MAX_EFFICIENCY%.
"""
lines_result = await db.execute(
select(RatecardLine).where(RatecardLine.project_id == project.id)
)
lines = lines_result.scalars().all()
if not lines:
return []
# Aggregate hours per role.
# total_hours / manual_override are stored per-1-asset; multiply by volume here.
role_hours: dict[int, float] = defaultdict(float)
for line in lines:
per_asset = float(line.manual_override) if line.manual_override is not None else float(line.total_hours or 0)
role_hours[line.role_id] += per_asset * (line.volume or 1)
# Load role details
role_ids = list(role_hours.keys())
roles_result = await db.execute(select(Role).where(Role.id.in_(role_ids)))
roles = {r.id: r for r in roles_result.scalars().all()}
# Build combined efficiency map per discipline
discipline_efficiency = {}
if profile_rates:
for disc, pct in profile_rates.items():
discipline_efficiency[disc] = pct
# Add tool rates (additive)
if tool_rates:
for disc, pct in tool_rates.items():
discipline_efficiency[disc] = discipline_efficiency.get(disc, 0) + pct
# Cap at MAX_EFFICIENCY
for disc in discipline_efficiency:
discipline_efficiency[disc] = min(discipline_efficiency[disc], MAX_EFFICIENCY)
# Build team shape
team = []
for role_id, total_hours in role_hours.items():
role = roles.get(role_id)
if not role or total_hours == 0:
continue
fte = round(total_hours / HOURS_PER_FTE, 4)
# Determine efficiency for this role
if role.is_programme_role:
eff = 0
elif discipline_efficiency:
eff = discipline_efficiency.get(role.discipline, 0)
elif efficiency_pct > 0:
eff = efficiency_pct
else:
eff = 0
if eff > 0:
multiplier = 1 - (eff / 100)
adjusted_hours = round(total_hours * multiplier, 2)
adjusted_fte = round(adjusted_hours / HOURS_PER_FTE, 4)
else:
adjusted_hours = round(total_hours, 2)
adjusted_fte = fte
team.append({
"role_id": role_id,
"role_title": role.role_title,
"discipline": role.discipline,
"is_programme_role": role.is_programme_role,
"sort_order": role.sort_order or 0,
"total_hours": round(total_hours, 2),
"fte": fte,
"efficiency_pct": round(eff, 1),
"adjusted_hours": adjusted_hours,
"adjusted_fte": adjusted_fte,
"hours_saved": round(total_hours - adjusted_hours, 2),
"fte_saved": round(fte - adjusted_fte, 4),
})
team.sort(key=lambda t: (t["discipline"], t["sort_order"]))
total_hours = sum(t["total_hours"] for t in team)
total_fte = sum(t["fte"] for t in team)
logger.info(f"Team shape: {len(team)} roles, {total_hours:.0f} hours, {total_fte:.2f} FTE")
return team