Complete Phase 1 implementation: Backend (FastAPI + MongoDB + Celery): - Core: config, DB with indexes, JWT security, API key auth middleware - Models: org hierarchy (workspace/team/project), user mirror, pricing, usage events/rollups, budgets, alert log, audit log - Services: pricing engine (LiteLLM/YAML/override priority), budget check with preflight, email alerts at 50/80/100%, analytics aggregations, audit logger - API routes: public (preflight/record/upsert), admin CRUD, pricing management, budget management, analytics (summary/timeseries/breakdown/pivot), Microsoft SSO auth - Celery tasks: daily LiteLLM price sync with change notifications, daily rollup aggregation, 5-minute alert evaluator - Pricing catalogue: ElevenLabs + Google Cloud TTS in models.yaml SDK (oliver-cost-tracker Python package): - CostTracker client with httpx + exponential backoff (3 retries) - SQLite outbox with 30s background flusher (never blocks AI pipeline) - Estimators: token/char estimation per provider - BudgetExceeded / CostTrackerUnavailable exceptions Frontend (React 18 + Vite + TypeScript): - Dashboard with KPI cards, daily cost timeseries, top-model/top-user charts - Pivot Explorer with multi-dim row/col selection + stacked bar chart + table - Admin pages: Workspaces, Pricing (with LiteLLM sync + override), Budgets (with live spend bar), API Keys (show-once), Users (mirror), Audit Log - Microsoft SSO login flow Infra: docker-compose.yml (mongo + redis + api + celery worker + beat + frontend) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
209 lines
6.7 KiB
Python
209 lines
6.7 KiB
Python
"""Daily Celery task: sync model prices from LiteLLM JSON."""
|
|
import asyncio
|
|
from datetime import date, datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# LiteLLM billing_mode → our billing_unit
|
|
_MODE_MAP = {
|
|
"chat": None, # will use token_input + token_output
|
|
"completion": None,
|
|
"embedding": "token_input",
|
|
"image_generation": "request",
|
|
"audio_transcription": "second",
|
|
"audio_speech": "char",
|
|
"vision": None,
|
|
}
|
|
|
|
# LiteLLM provider → our provider
|
|
_PROVIDER_MAP = {
|
|
"google": "google",
|
|
"vertex_ai": "google",
|
|
"openai": "openai",
|
|
"anthropic": "anthropic",
|
|
"cohere": "cohere",
|
|
"mistral": "mistral",
|
|
"together_ai": "together",
|
|
"groq": "groq",
|
|
"bedrock": "bedrock",
|
|
}
|
|
|
|
|
|
async def sync_litellm_prices(db: AsyncIOMotorDatabase) -> dict[str, Any]:
|
|
"""Fetch LiteLLM price JSON and upsert into model_prices. Returns summary dict."""
|
|
url = settings.litellm_prices_url
|
|
logger.info(f"Starting LiteLLM price sync from {url}")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
resp = await client.get(url)
|
|
resp.raise_for_status()
|
|
raw: dict = resp.json()
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch LiteLLM prices: {e}")
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
today = date.today().isoformat()
|
|
now = datetime.now(timezone.utc)
|
|
new_count = changed_count = skipped = 0
|
|
notifications = []
|
|
|
|
for litellm_key, entry in raw.items():
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
|
|
litellm_provider = entry.get("litellm_provider", "")
|
|
provider = _PROVIDER_MAP.get(litellm_provider, litellm_provider)
|
|
model = litellm_key
|
|
|
|
input_ppu = entry.get("input_cost_per_token")
|
|
output_ppu = entry.get("output_cost_per_token")
|
|
|
|
if input_ppu is None and output_ppu is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
pairs: list[tuple[str, float]] = []
|
|
if input_ppu is not None:
|
|
pairs.append(("token_input", float(input_ppu)))
|
|
if output_ppu is not None:
|
|
pairs.append(("token_output", float(output_ppu)))
|
|
|
|
for billing_unit, ppu in pairs:
|
|
result = await _upsert_litellm_price(
|
|
db=db,
|
|
provider=provider,
|
|
model=model,
|
|
billing_unit=billing_unit,
|
|
ppu=ppu,
|
|
today=today,
|
|
now=now,
|
|
litellm_commit=settings.litellm_commit_hash,
|
|
)
|
|
if result == "new":
|
|
new_count += 1
|
|
elif result == "changed":
|
|
changed_count += 1
|
|
notifications.append(
|
|
f"{provider}/{model}/{billing_unit}: price changed → ${ppu:.8f}/unit"
|
|
)
|
|
|
|
logger.info(
|
|
f"LiteLLM sync complete: {new_count} new, {changed_count} changed, {skipped} skipped"
|
|
)
|
|
|
|
if notifications:
|
|
await _send_price_change_notification(db, notifications)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"new": new_count,
|
|
"changed": changed_count,
|
|
"skipped": skipped,
|
|
"notifications_sent": len(notifications),
|
|
}
|
|
|
|
|
|
async def _upsert_litellm_price(
|
|
db: AsyncIOMotorDatabase,
|
|
provider: str,
|
|
model: str,
|
|
billing_unit: str,
|
|
ppu: float,
|
|
today: str,
|
|
now: datetime,
|
|
litellm_commit: str,
|
|
) -> str:
|
|
"""Return 'new' | 'changed' | 'unchanged'."""
|
|
# Find latest litellm record for this provider+model+billing_unit
|
|
latest = await db.model_prices.find_one(
|
|
{"provider": provider, "model": model, "billing_unit": billing_unit, "source": "litellm"},
|
|
sort=[("effective_from", -1)],
|
|
)
|
|
|
|
if not latest:
|
|
await db.model_prices.insert_one({
|
|
"provider": provider,
|
|
"model": model,
|
|
"billing_unit": billing_unit,
|
|
"price_per_unit_usd": ppu,
|
|
"currency": "USD",
|
|
"effective_from": today,
|
|
"effective_to": None,
|
|
"source": "litellm",
|
|
"litellm_commit_hash": litellm_commit,
|
|
"created_at": now,
|
|
})
|
|
return "new"
|
|
|
|
existing_ppu = latest.get("price_per_unit_usd", 0)
|
|
if abs(existing_ppu - ppu) < 1e-10:
|
|
# Same price — update commit hash only
|
|
await db.model_prices.update_one(
|
|
{"_id": latest["_id"]},
|
|
{"$set": {"litellm_commit_hash": litellm_commit}},
|
|
)
|
|
return "unchanged"
|
|
|
|
# Price changed — close old record, create new
|
|
await db.model_prices.update_one(
|
|
{"_id": latest["_id"]},
|
|
{"$set": {"effective_to": today}},
|
|
)
|
|
await db.model_prices.insert_one({
|
|
"provider": provider,
|
|
"model": model,
|
|
"billing_unit": billing_unit,
|
|
"price_per_unit_usd": ppu,
|
|
"currency": "USD",
|
|
"effective_from": today,
|
|
"effective_to": None,
|
|
"source": "litellm",
|
|
"litellm_commit_hash": litellm_commit,
|
|
"created_at": now,
|
|
"previous_price": existing_ppu,
|
|
})
|
|
return "changed"
|
|
|
|
|
|
async def _send_price_change_notification(db: AsyncIOMotorDatabase, notifications: list[str]) -> None:
|
|
"""Email admins about LiteLLM price changes."""
|
|
try:
|
|
from ..core.config import settings
|
|
if not settings.sendgrid_api_key:
|
|
logger.warning("SENDGRID_API_KEY not set — skipping price change notification")
|
|
return
|
|
|
|
from sendgrid import SendGridAPIClient
|
|
from sendgrid.helpers.mail import Mail
|
|
|
|
lines_html = "".join(f"<li>{n}</li>" for n in notifications)
|
|
body = (
|
|
f"<p>LiteLLM daily sync detected price changes for {len(notifications)} model(s):</p>"
|
|
f"<ul>{lines_html}</ul>"
|
|
f"<p>Review in Admin UI → Pricing. If you have active overrides, they take priority.</p>"
|
|
)
|
|
recipients = [e.strip() for e in settings.alert_email_to.split(",") if e.strip()]
|
|
if not recipients:
|
|
logger.warning("No alert_email_to configured — skipping price change notification")
|
|
return
|
|
|
|
sg = SendGridAPIClient(settings.sendgrid_api_key)
|
|
for to_email in recipients:
|
|
msg = Mail(
|
|
from_email=settings.email_from,
|
|
to_emails=to_email,
|
|
subject="[Oliver Cost Tracker] LiteLLM price changes detected",
|
|
html_content=body,
|
|
)
|
|
sg.send(msg)
|
|
logger.info(f"Price change notification sent to {recipients}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send price change notification: {e}")
|