ai-cost-tracker/backend/celery_worker.py
Vadym Samoilenko 2f070ce503 feat: initial implementation of Oliver AI Cost Tracker
Complete Phase 1 implementation:

Backend (FastAPI + MongoDB + Celery):
- Core: config, DB with indexes, JWT security, API key auth middleware
- Models: org hierarchy (workspace/team/project), user mirror, pricing,
  usage events/rollups, budgets, alert log, audit log
- Services: pricing engine (LiteLLM/YAML/override priority), budget check
  with preflight, email alerts at 50/80/100%, analytics aggregations,
  audit logger
- API routes: public (preflight/record/upsert), admin CRUD, pricing
  management, budget management, analytics (summary/timeseries/breakdown/pivot),
  Microsoft SSO auth
- Celery tasks: daily LiteLLM price sync with change notifications,
  daily rollup aggregation, 5-minute alert evaluator
- Pricing catalogue: ElevenLabs + Google Cloud TTS in models.yaml

SDK (oliver-cost-tracker Python package):
- CostTracker client with httpx + exponential backoff (3 retries)
- SQLite outbox with 30s background flusher (never blocks AI pipeline)
- Estimators: token/char estimation per provider
- BudgetExceeded / CostTrackerUnavailable exceptions

Frontend (React 18 + Vite + TypeScript):
- Dashboard with KPI cards, daily cost timeseries, top-model/top-user charts
- Pivot Explorer with multi-dim row/col selection + stacked bar chart + table
- Admin pages: Workspaces, Pricing (with LiteLLM sync + override), Budgets
  (with live spend bar), API Keys (show-once), Users (mirror), Audit Log
- Microsoft SSO login flow

Infra: docker-compose.yml (mongo + redis + api + celery worker + beat + frontend)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:26:08 +01:00

85 lines
2.4 KiB
Python

"""Celery worker with beat schedule."""
import asyncio
from celery import Celery
from celery.schedules import crontab
from app.core.config import settings
celery_app = Celery(
"cost_tracker",
broker=settings.redis_url,
backend=settings.redis_url,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
beat_schedule={
# Sync LiteLLM prices daily at 02:00 UTC
"litellm-price-sync": {
"task": "celery_worker.task_sync_litellm",
"schedule": crontab(hour=2, minute=0),
},
# Build daily rollup at 01:00 UTC (previous day now complete)
"daily-rollup": {
"task": "celery_worker.task_daily_rollup",
"schedule": crontab(hour=1, minute=0),
},
# Evaluate budget alerts every 5 minutes
"alert-evaluator": {
"task": "celery_worker.task_alert_evaluator",
"schedule": crontab(minute="*/5"),
},
},
)
def _run(coro):
"""Run an async coroutine from a sync Celery task."""
return asyncio.get_event_loop().run_until_complete(coro)
@celery_app.task(name="celery_worker.task_sync_litellm")
def task_sync_litellm():
from app.core.database import get_db_instance
from app.tasks.pricing_sync import sync_litellm_prices
async def _inner():
from app.core.database import connect_db
await connect_db()
db = get_db_instance()
return await sync_litellm_prices(db)
return _run(_inner())
@celery_app.task(name="celery_worker.task_daily_rollup")
def task_daily_rollup(target_date: str | None = None):
from app.core.database import get_db_instance
from app.tasks.rollup_daily import build_daily_rollup
async def _inner():
from app.core.database import connect_db
await connect_db()
db = get_db_instance()
return await build_daily_rollup(db, target_date)
return _run(_inner())
@celery_app.task(name="celery_worker.task_alert_evaluator")
def task_alert_evaluator():
from app.core.database import get_db_instance
from app.tasks.alert_evaluator import evaluate_all_budgets
async def _inner():
from app.core.database import connect_db
await connect_db()
db = get_db_instance()
return await evaluate_all_budgets(db)
return _run(_inner())