ai-cost-tracker/backend/app/core/security.py
Vadym Samoilenko 2f070ce503 feat: initial implementation of Oliver AI Cost Tracker
Complete Phase 1 implementation:

Backend (FastAPI + MongoDB + Celery):
- Core: config, DB with indexes, JWT security, API key auth middleware
- Models: org hierarchy (workspace/team/project), user mirror, pricing,
  usage events/rollups, budgets, alert log, audit log
- Services: pricing engine (LiteLLM/YAML/override priority), budget check
  with preflight, email alerts at 50/80/100%, analytics aggregations,
  audit logger
- API routes: public (preflight/record/upsert), admin CRUD, pricing
  management, budget management, analytics (summary/timeseries/breakdown/pivot),
  Microsoft SSO auth
- Celery tasks: daily LiteLLM price sync with change notifications,
  daily rollup aggregation, 5-minute alert evaluator
- Pricing catalogue: ElevenLabs + Google Cloud TTS in models.yaml

SDK (oliver-cost-tracker Python package):
- CostTracker client with httpx + exponential backoff (3 retries)
- SQLite outbox with 30s background flusher (never blocks AI pipeline)
- Estimators: token/char estimation per provider
- BudgetExceeded / CostTrackerUnavailable exceptions

Frontend (React 18 + Vite + TypeScript):
- Dashboard with KPI cards, daily cost timeseries, top-model/top-user charts
- Pivot Explorer with multi-dim row/col selection + stacked bar chart + table
- Admin pages: Workspaces, Pricing (with LiteLLM sync + override), Budgets
  (with live spend bar), API Keys (show-once), Users (mirror), Audit Log
- Microsoft SSO login flow

Infra: docker-compose.yml (mongo + redis + api + celery worker + beat + frontend)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:26:08 +01:00

65 lines
2.2 KiB
Python

import hashlib
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from .config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ── API key helpers ────────────────────────────────────────────────────────────
API_KEY_PREFIX = "ct_live_"
API_KEY_LENGTH = 40 # bytes of entropy
def generate_api_key() -> tuple[str, str]:
"""Return (raw_key, key_hash). raw_key is shown once; store key_hash."""
raw = API_KEY_PREFIX + secrets.token_urlsafe(API_KEY_LENGTH)
key_hash = _hash_api_key(raw)
return raw, key_hash
def _hash_api_key(raw: str) -> str:
return hashlib.sha256(raw.encode()).hexdigest()
def verify_api_key(raw: str, stored_hash: str) -> bool:
return _hash_api_key(raw) == stored_hash
# ── JWT helpers (admin UI) ─────────────────────────────────────────────────────
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.jwt_access_ttl_min)
)
to_encode["exp"] = expire
return jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_alg)
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=settings.jwt_refresh_ttl_days)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_alg)
def decode_token(token: str) -> Optional[dict]:
try:
return jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_alg])
except JWTError:
return None
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)