"""Thin HTTP client for the centralized Oliver AI Cost Tracker.""" import asyncio from typing import Optional import httpx from ..core.config import settings from ..core.logging import get_logger logger = get_logger(__name__) class BudgetExceeded(Exception): """Raised by preflight() when the cost tracker rejects a call due to budget limits.""" def preflight( *, model: str, user_external_id: str, project_id: Optional[str] = None, ) -> None: if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled: return try: payload = { "source_app": settings.cost_tracker_source_app, "model": model, "user_external_id": user_external_id, } if project_id: payload["project_external_id"] = project_id resp = httpx.post( f"{settings.cost_tracker_base_url}/preflight", headers={"X-API-Key": settings.cost_tracker_api_key}, json=payload, timeout=5.0, ) resp.raise_for_status() data = resp.json() if not data.get("allow", True): raise BudgetExceeded(data.get("deny_reason") or "Budget exceeded") except BudgetExceeded: raise except Exception as e: logger.warning(f"Cost tracker preflight failed (non-fatal): {e}") async def aio_preflight( *, model: str, user_external_id: str, project_id: Optional[str] = None, ) -> None: await asyncio.to_thread(preflight, model=model, user_external_id=user_external_id, project_id=project_id) def record( *, model: str, provider: str, user_external_id: str, project_id: Optional[str] = None, job_external_id: str = "", input_tokens: int = 0, output_tokens: int = 0, chars: Optional[int] = None, latency_ms: int = 0, status: str = "success", ) -> None: if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled: return try: units: dict = {} if chars is not None: units["char"] = chars else: if input_tokens: units["token_input"] = input_tokens if output_tokens: units["token_output"] = output_tokens payload: dict = { "source_app": settings.cost_tracker_source_app, "model": model, "provider": provider, "user_external_id": user_external_id, "units": units, "latency_ms": latency_ms, "status": status, } if project_id: payload["project_external_id"] = project_id if job_external_id: payload["job_external_id"] = job_external_id httpx.post( f"{settings.cost_tracker_base_url}/usage/record", headers={"X-API-Key": settings.cost_tracker_api_key}, json=payload, timeout=3.0, ) except Exception as e: logger.warning(f"Cost tracker record failed (non-fatal): {e}") async def aio_record(**kwargs) -> None: await asyncio.to_thread(record, **kwargs)