Complete Phase 1 implementation: Backend (FastAPI + MongoDB + Celery): - Core: config, DB with indexes, JWT security, API key auth middleware - Models: org hierarchy (workspace/team/project), user mirror, pricing, usage events/rollups, budgets, alert log, audit log - Services: pricing engine (LiteLLM/YAML/override priority), budget check with preflight, email alerts at 50/80/100%, analytics aggregations, audit logger - API routes: public (preflight/record/upsert), admin CRUD, pricing management, budget management, analytics (summary/timeseries/breakdown/pivot), Microsoft SSO auth - Celery tasks: daily LiteLLM price sync with change notifications, daily rollup aggregation, 5-minute alert evaluator - Pricing catalogue: ElevenLabs + Google Cloud TTS in models.yaml SDK (oliver-cost-tracker Python package): - CostTracker client with httpx + exponential backoff (3 retries) - SQLite outbox with 30s background flusher (never blocks AI pipeline) - Estimators: token/char estimation per provider - BudgetExceeded / CostTrackerUnavailable exceptions Frontend (React 18 + Vite + TypeScript): - Dashboard with KPI cards, daily cost timeseries, top-model/top-user charts - Pivot Explorer with multi-dim row/col selection + stacked bar chart + table - Admin pages: Workspaces, Pricing (with LiteLLM sync + override), Budgets (with live spend bar), API Keys (show-once), Users (mirror), Audit Log - Microsoft SSO login flow Infra: docker-compose.yml (mongo + redis + api + celery worker + beat + frontend) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
"""Celery worker with beat schedule."""
|
|
import asyncio
|
|
|
|
from celery import Celery
|
|
from celery.schedules import crontab
|
|
|
|
from app.core.config import settings
|
|
|
|
celery_app = Celery(
|
|
"cost_tracker",
|
|
broker=settings.redis_url,
|
|
backend=settings.redis_url,
|
|
)
|
|
|
|
celery_app.conf.update(
|
|
task_serializer="json",
|
|
accept_content=["json"],
|
|
result_serializer="json",
|
|
timezone="UTC",
|
|
enable_utc=True,
|
|
beat_schedule={
|
|
# Sync LiteLLM prices daily at 02:00 UTC
|
|
"litellm-price-sync": {
|
|
"task": "celery_worker.task_sync_litellm",
|
|
"schedule": crontab(hour=2, minute=0),
|
|
},
|
|
# Build daily rollup at 01:00 UTC (previous day now complete)
|
|
"daily-rollup": {
|
|
"task": "celery_worker.task_daily_rollup",
|
|
"schedule": crontab(hour=1, minute=0),
|
|
},
|
|
# Evaluate budget alerts every 5 minutes
|
|
"alert-evaluator": {
|
|
"task": "celery_worker.task_alert_evaluator",
|
|
"schedule": crontab(minute="*/5"),
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def _run(coro):
|
|
"""Run an async coroutine from a sync Celery task."""
|
|
return asyncio.get_event_loop().run_until_complete(coro)
|
|
|
|
|
|
@celery_app.task(name="celery_worker.task_sync_litellm")
|
|
def task_sync_litellm():
|
|
from app.core.database import get_db_instance
|
|
from app.tasks.pricing_sync import sync_litellm_prices
|
|
|
|
async def _inner():
|
|
from app.core.database import connect_db
|
|
await connect_db()
|
|
db = get_db_instance()
|
|
return await sync_litellm_prices(db)
|
|
|
|
return _run(_inner())
|
|
|
|
|
|
@celery_app.task(name="celery_worker.task_daily_rollup")
|
|
def task_daily_rollup(target_date: str | None = None):
|
|
from app.core.database import get_db_instance
|
|
from app.tasks.rollup_daily import build_daily_rollup
|
|
|
|
async def _inner():
|
|
from app.core.database import connect_db
|
|
await connect_db()
|
|
db = get_db_instance()
|
|
return await build_daily_rollup(db, target_date)
|
|
|
|
return _run(_inner())
|
|
|
|
|
|
@celery_app.task(name="celery_worker.task_alert_evaluator")
|
|
def task_alert_evaluator():
|
|
from app.core.database import get_db_instance
|
|
from app.tasks.alert_evaluator import evaluate_all_budgets
|
|
|
|
async def _inner():
|
|
from app.core.database import connect_db
|
|
await connect_db()
|
|
db = get_db_instance()
|
|
return await evaluate_all_budgets(db)
|
|
|
|
return _run(_inner())
|