ai-cost-tracker/backend/app/tasks/pricing_sync.py
Vadym Samoilenko 2f070ce503 feat: initial implementation of Oliver AI Cost Tracker
Complete Phase 1 implementation:

Backend (FastAPI + MongoDB + Celery):
- Core: config, DB with indexes, JWT security, API key auth middleware
- Models: org hierarchy (workspace/team/project), user mirror, pricing,
  usage events/rollups, budgets, alert log, audit log
- Services: pricing engine (LiteLLM/YAML/override priority), budget check
  with preflight, email alerts at 50/80/100%, analytics aggregations,
  audit logger
- API routes: public (preflight/record/upsert), admin CRUD, pricing
  management, budget management, analytics (summary/timeseries/breakdown/pivot),
  Microsoft SSO auth
- Celery tasks: daily LiteLLM price sync with change notifications,
  daily rollup aggregation, 5-minute alert evaluator
- Pricing catalogue: ElevenLabs + Google Cloud TTS in models.yaml

SDK (oliver-cost-tracker Python package):
- CostTracker client with httpx + exponential backoff (3 retries)
- SQLite outbox with 30s background flusher (never blocks AI pipeline)
- Estimators: token/char estimation per provider
- BudgetExceeded / CostTrackerUnavailable exceptions

Frontend (React 18 + Vite + TypeScript):
- Dashboard with KPI cards, daily cost timeseries, top-model/top-user charts
- Pivot Explorer with multi-dim row/col selection + stacked bar chart + table
- Admin pages: Workspaces, Pricing (with LiteLLM sync + override), Budgets
  (with live spend bar), API Keys (show-once), Users (mirror), Audit Log
- Microsoft SSO login flow

Infra: docker-compose.yml (mongo + redis + api + celery worker + beat + frontend)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:26:08 +01:00

209 lines
6.7 KiB
Python

"""Daily Celery task: sync model prices from LiteLLM JSON."""
import asyncio
from datetime import date, datetime, timezone
from typing import Any, Optional
import httpx
from motor.motor_asyncio import AsyncIOMotorDatabase
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
# LiteLLM billing_mode → our billing_unit
_MODE_MAP = {
"chat": None, # will use token_input + token_output
"completion": None,
"embedding": "token_input",
"image_generation": "request",
"audio_transcription": "second",
"audio_speech": "char",
"vision": None,
}
# LiteLLM provider → our provider
_PROVIDER_MAP = {
"google": "google",
"vertex_ai": "google",
"openai": "openai",
"anthropic": "anthropic",
"cohere": "cohere",
"mistral": "mistral",
"together_ai": "together",
"groq": "groq",
"bedrock": "bedrock",
}
async def sync_litellm_prices(db: AsyncIOMotorDatabase) -> dict[str, Any]:
"""Fetch LiteLLM price JSON and upsert into model_prices. Returns summary dict."""
url = settings.litellm_prices_url
logger.info(f"Starting LiteLLM price sync from {url}")
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
resp.raise_for_status()
raw: dict = resp.json()
except Exception as e:
logger.error(f"Failed to fetch LiteLLM prices: {e}")
return {"status": "error", "error": str(e)}
today = date.today().isoformat()
now = datetime.now(timezone.utc)
new_count = changed_count = skipped = 0
notifications = []
for litellm_key, entry in raw.items():
if not isinstance(entry, dict):
continue
litellm_provider = entry.get("litellm_provider", "")
provider = _PROVIDER_MAP.get(litellm_provider, litellm_provider)
model = litellm_key
input_ppu = entry.get("input_cost_per_token")
output_ppu = entry.get("output_cost_per_token")
if input_ppu is None and output_ppu is None:
skipped += 1
continue
pairs: list[tuple[str, float]] = []
if input_ppu is not None:
pairs.append(("token_input", float(input_ppu)))
if output_ppu is not None:
pairs.append(("token_output", float(output_ppu)))
for billing_unit, ppu in pairs:
result = await _upsert_litellm_price(
db=db,
provider=provider,
model=model,
billing_unit=billing_unit,
ppu=ppu,
today=today,
now=now,
litellm_commit=settings.litellm_commit_hash,
)
if result == "new":
new_count += 1
elif result == "changed":
changed_count += 1
notifications.append(
f"{provider}/{model}/{billing_unit}: price changed → ${ppu:.8f}/unit"
)
logger.info(
f"LiteLLM sync complete: {new_count} new, {changed_count} changed, {skipped} skipped"
)
if notifications:
await _send_price_change_notification(db, notifications)
return {
"status": "ok",
"new": new_count,
"changed": changed_count,
"skipped": skipped,
"notifications_sent": len(notifications),
}
async def _upsert_litellm_price(
db: AsyncIOMotorDatabase,
provider: str,
model: str,
billing_unit: str,
ppu: float,
today: str,
now: datetime,
litellm_commit: str,
) -> str:
"""Return 'new' | 'changed' | 'unchanged'."""
# Find latest litellm record for this provider+model+billing_unit
latest = await db.model_prices.find_one(
{"provider": provider, "model": model, "billing_unit": billing_unit, "source": "litellm"},
sort=[("effective_from", -1)],
)
if not latest:
await db.model_prices.insert_one({
"provider": provider,
"model": model,
"billing_unit": billing_unit,
"price_per_unit_usd": ppu,
"currency": "USD",
"effective_from": today,
"effective_to": None,
"source": "litellm",
"litellm_commit_hash": litellm_commit,
"created_at": now,
})
return "new"
existing_ppu = latest.get("price_per_unit_usd", 0)
if abs(existing_ppu - ppu) < 1e-10:
# Same price — update commit hash only
await db.model_prices.update_one(
{"_id": latest["_id"]},
{"$set": {"litellm_commit_hash": litellm_commit}},
)
return "unchanged"
# Price changed — close old record, create new
await db.model_prices.update_one(
{"_id": latest["_id"]},
{"$set": {"effective_to": today}},
)
await db.model_prices.insert_one({
"provider": provider,
"model": model,
"billing_unit": billing_unit,
"price_per_unit_usd": ppu,
"currency": "USD",
"effective_from": today,
"effective_to": None,
"source": "litellm",
"litellm_commit_hash": litellm_commit,
"created_at": now,
"previous_price": existing_ppu,
})
return "changed"
async def _send_price_change_notification(db: AsyncIOMotorDatabase, notifications: list[str]) -> None:
"""Email admins about LiteLLM price changes."""
try:
from ..core.config import settings
if not settings.sendgrid_api_key:
logger.warning("SENDGRID_API_KEY not set — skipping price change notification")
return
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
lines_html = "".join(f"<li>{n}</li>" for n in notifications)
body = (
f"<p>LiteLLM daily sync detected price changes for {len(notifications)} model(s):</p>"
f"<ul>{lines_html}</ul>"
f"<p>Review in Admin UI → Pricing. If you have active overrides, they take priority.</p>"
)
recipients = [e.strip() for e in settings.alert_email_to.split(",") if e.strip()]
if not recipients:
logger.warning("No alert_email_to configured — skipping price change notification")
return
sg = SendGridAPIClient(settings.sendgrid_api_key)
for to_email in recipients:
msg = Mail(
from_email=settings.email_from,
to_emails=to_email,
subject="[Oliver Cost Tracker] LiteLLM price changes detected",
html_content=body,
)
sg.send(msg)
logger.info(f"Price change notification sent to {recipients}")
except Exception as e:
logger.error(f"Failed to send price change notification: {e}")