video-accessibility/backend/app/services/cost_tracker.py
Vadym Samoilenko ca312d48fa chore(lint): fix all ruff errors — 0 warnings remaining
- B904 (55): add `from err` / `from None` to raise-in-except across 13 files
- F821 (1): add missing HTTPException import in routes_language_qc.py
- F841 (7): remove unused variable assignments (current_user, job_title, tts_provider, etc.)
- W293 (13): strip trailing whitespace from blank lines
- C416 (4): rewrite unnecessary dict comprehensions as dict()
- C401 (1): rewrite unnecessary generator as set comprehension
- E701 (4): split multi-statement lines in cost_tracker.py
- E741 (1): rename ambiguous `l` to `lang` in cloud_run_dispatch.py
- B007 (4): prefix unused loop variables with _ in tts.py, video_renderer.py
- I001 (1): sort imports in tasks/__init__.py (move stdlib to top)
- E402 (3): move threading/time/signals imports to top of tasks/__init__.py
- UP042 (9): replace (str, Enum) with StrEnum in all model/schema enums

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 17:13:08 +01:00

108 lines
3 KiB
Python

"""Thin HTTP client for the centralized Oliver AI Cost Tracker."""
import asyncio
import httpx
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
class BudgetExceeded(Exception):
"""Raised by preflight() when the cost tracker rejects a call due to budget limits."""
def preflight(
*,
model: str,
user_external_id: str,
project_id: str | None = None,
) -> None:
if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled:
return
try:
payload = {
"source_app": settings.cost_tracker_source_app,
"model": model,
"user_external_id": user_external_id,
}
if project_id:
payload["project_external_id"] = project_id
resp = httpx.post(
f"{settings.cost_tracker_base_url}/preflight",
headers={"X-API-Key": settings.cost_tracker_api_key},
json=payload,
timeout=5.0,
)
resp.raise_for_status()
data = resp.json()
if not data.get("allow", True):
raise BudgetExceeded(data.get("deny_reason") or "Budget exceeded")
except BudgetExceeded:
raise
except Exception as e:
logger.warning(f"Cost tracker preflight failed (non-fatal): {e}")
async def aio_preflight(
*,
model: str,
user_external_id: str,
project_id: str | None = None,
) -> None:
await asyncio.to_thread(preflight, model=model, user_external_id=user_external_id, project_id=project_id)
def record(
*,
model: str,
provider: str,
user_external_id: str,
project_id: str | None = None,
job_external_id: str = "",
input_tokens: int = 0,
output_tokens: int = 0,
chars: int | None = None,
latency_ms: int = 0,
status: str = "success",
) -> None:
if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled:
return
try:
units: dict = {}
if chars is not None:
units["char"] = chars
else:
if input_tokens:
units["token_input"] = input_tokens
if output_tokens:
units["token_output"] = output_tokens
payload: dict = {
"source_app": settings.cost_tracker_source_app,
"model": model,
"provider": provider,
"user_external_id": user_external_id,
"units": units,
"latency_ms": latency_ms,
"status": status,
}
if project_id:
payload["project_external_id"] = project_id
if job_external_id:
payload["job_external_id"] = job_external_id
httpx.post(
f"{settings.cost_tracker_base_url}/usage/record",
headers={"X-API-Key": settings.cost_tracker_api_key},
json=payload,
timeout=3.0,
)
except Exception as e:
logger.warning(f"Cost tracker record failed (non-fatal): {e}")
async def aio_record(**kwargs) -> None:
await asyncio.to_thread(record, **kwargs)