feat: replace SDK with direct HTTP integration to centralized cost tracker

- New services/cost_tracker.py: sync httpx preflight()/record() + async wrappers;
  BudgetExceeded exception; no-op when COST_TRACKER_BASE_URL is empty
- Preflight budget check added before ingestion (Gemini), per-language translation
  (video-native + traditional), and per-language TTS dispatch
- _record_gemini_usage and _record_tts_cost now call cost_tracker directly;
  removes broken asyncio.get_event_loop() hack from sync Celery worker
- Fix: _cost_ctx now threaded into extract_accessibility_targeted (video-native path)
- Fix: user_id/cost_project_id now propagated through dispatch_language_tts →
  synthesize_cue_task.s() and the rerender_accessible_video.py re-render path
- Remove oliver-cost-tracker SDK dependency (was commented-out/never installed)
- Drop cost_tracker_outbox_path setting and get_cost_tracker() factory
- Update COST_TRACKER_BASE_URL default to optical-dev.oliver.solutions in
  .env.prod.example, docker-compose.yml, and all Cloud Run service yamls
- Cloud Run yamls use Secret Manager ref (cost-tracker-api-key) for the API key

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-27 13:36:15 +01:00
parent 26bfedd7c7
commit ea21cace96
15 changed files with 264 additions and 65 deletions

View file

@ -33,9 +33,8 @@ VITE_API_URL=https://your-api-domain.com:8000
VITE_SENTRY_DSN=your-frontend-sentry-dsn
VITE_ENVIRONMENT=production
# AI Cost Tracker (oliver-cost-tracker SDK)
COST_TRACKER_BASE_URL=https://cost.oliver.agency
# AI Cost Tracker (direct HTTP — see backend/app/services/cost_tracker.py)
COST_TRACKER_BASE_URL=https://optical-dev.oliver.solutions/cost-tracker/v1
COST_TRACKER_API_KEY=ct_live_your-api-key-here
COST_TRACKER_SOURCE_APP=video-accessibility
COST_TRACKER_OUTBOX_PATH=/tmp/cost_outbox.sqlite
COST_TRACKER_ENABLED=true

View file

@ -228,11 +228,10 @@ class Settings(BaseSettings):
sentry_dsn: str = ""
otel_exporter_otlp_endpoint: str = ""
# AI Cost Tracker (oliver-cost-tracker SDK)
# AI Cost Tracker (direct HTTP — see services/cost_tracker.py)
cost_tracker_base_url: str = ""
cost_tracker_api_key: str = ""
cost_tracker_source_app: str = "video-accessibility"
cost_tracker_outbox_path: str = "/tmp/cost_outbox.sqlite"
cost_tracker_enabled: bool = True
# CORS - comma-separated list of allowed origins

View file

@ -1,4 +1,3 @@
from functools import lru_cache
from typing import Optional
from fastapi import Depends, HTTPException, Request, status
@ -13,25 +12,6 @@ from .security import decode_token
security = HTTPBearer()
@lru_cache(maxsize=1)
def get_cost_tracker():
if not settings.cost_tracker_enabled or not settings.cost_tracker_base_url:
return None
try:
from oliver_cost_tracker import CostTracker
ct = CostTracker(
base_url=settings.cost_tracker_base_url,
api_key=settings.cost_tracker_api_key,
source_app=settings.cost_tracker_source_app,
outbox_path=settings.cost_tracker_outbox_path,
graceful_degradation=True,
)
ct.start()
return ct
except ImportError:
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncIOMotorDatabase = Depends(get_database),

View file

@ -0,0 +1,116 @@
"""Thin HTTP client for the centralized Oliver AI Cost Tracker.
Both functions no-op gracefully when COST_TRACKER_BASE_URL is empty or
COST_TRACKER_ENABLED is false, so local development needs no extra config.
"""
import asyncio
from typing import Optional
import httpx
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
class BudgetExceeded(Exception):
"""Raised by preflight() when the cost tracker rejects a call due to budget limits."""
def preflight(
*,
model: str,
user_external_id: str,
project_id: Optional[str] = None,
) -> None:
"""Check budget before an AI call. Raises BudgetExceeded if not allowed.
Fails open on all transport/server errors so a cost-tracker outage never
blocks AI work.
"""
if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled:
return
try:
resp = httpx.post(
f"{settings.cost_tracker_base_url}/preflight",
headers={"X-API-Key": settings.cost_tracker_api_key},
json={
"source_app": settings.cost_tracker_source_app,
"model": model,
"user_external_id": user_external_id,
"project_id": project_id,
},
timeout=5.0,
)
resp.raise_for_status()
data = resp.json()
if not data.get("allowed", True):
raise BudgetExceeded(data.get("reason") or "Budget exceeded")
except BudgetExceeded:
raise
except Exception as e:
logger.warning(f"Cost tracker preflight failed (non-fatal): {e}")
async def aio_preflight(
*,
model: str,
user_external_id: str,
project_id: Optional[str] = None,
) -> None:
"""Async wrapper for preflight(). Raises BudgetExceeded if not allowed."""
await asyncio.to_thread(
preflight,
model=model,
user_external_id=user_external_id,
project_id=project_id,
)
def record(
*,
model: str,
provider: str,
user_external_id: str,
project_id: Optional[str] = None,
job_external_id: str = "",
input_tokens: int = 0,
output_tokens: int = 0,
chars: Optional[int] = None,
latency_ms: int = 0,
status: str = "success",
) -> None:
"""Record AI usage. Never raises — fire-and-forget."""
if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled:
return
try:
payload: dict = {
"source_app": settings.cost_tracker_source_app,
"model": model,
"provider": provider,
"user_external_id": user_external_id,
"project_id": project_id,
"job_external_id": job_external_id,
"latency_ms": latency_ms,
"status": status,
}
if chars is not None:
payload["chars"] = chars
else:
payload["input_tokens"] = input_tokens
payload["output_tokens"] = output_tokens
httpx.post(
f"{settings.cost_tracker_base_url}/record",
headers={"X-API-Key": settings.cost_tracker_api_key},
json=payload,
timeout=3.0,
)
except Exception as e:
logger.warning(f"Cost tracker record failed (non-fatal): {e}")
async def aio_record(**kwargs) -> None:
"""Async wrapper for record(); use in async call sites."""
await asyncio.to_thread(record, **kwargs)

View file

@ -23,25 +23,20 @@ async def _record_gemini_usage(
project_id: Optional[str],
elapsed_ms: int,
) -> None:
"""Fire-and-forget: record Gemini usage to cost tracker after a generate_content call."""
try:
from ..core.dependencies import get_cost_tracker
ct = get_cost_tracker()
if not ct:
return
from ..services import cost_tracker
usage = getattr(response, "usage_metadata", None)
if usage is None:
return
await ct.record(
user_external_id=user_id,
await cost_tracker.aio_record(
model=model,
provider="google",
user_external_id=user_id,
project_id=project_id,
job_external_id=job_id,
input_tokens=getattr(usage, "prompt_token_count", 0) or 0,
output_tokens=getattr(usage, "candidates_token_count", 0) or 0,
job_external_id=job_id,
project_external_id=project_id,
latency_ms=elapsed_ms,
status="success",
)
except Exception as e:
logger.warning(f"Cost tracker record failed (non-fatal): {e}")

View file

@ -10,6 +10,8 @@ from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services import cost_tracker
from ..services.cost_tracker import BudgetExceeded
from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.websocket import connection_manager
@ -210,6 +212,11 @@ async def ingest_and_ai_task_impl(job_id: str):
"job_id": job_id,
"project_id": job_doc.get("cost_tracker_project_id"),
}
await cost_tracker.aio_preflight(
model=gemini_service.model_name,
user_external_id=_cost_ctx["user_id"],
project_id=_cost_ctx["project_id"],
)
ai_result = await gemini_service.extract_accessibility(
temp_path,
brand_context=brand_context,

View file

@ -458,7 +458,9 @@ async def _regenerate_tts_cues(
"provider": provider,
"model": model,
"speed": speed,
"style_prompt": style_prompt
"style_prompt": style_prompt,
"user_id": job_doc.get("client_id", "system"),
"cost_project_id": job_doc.get("cost_tracker_project_id"),
},
queue="tts"
)

View file

@ -13,6 +13,7 @@ from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services import cost_tracker
from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.gemini_tts import TTSSynthesisError
@ -286,12 +287,19 @@ async def _async_translate_and_synthesize(job_id: str):
async with semaphore:
logger.info(f"Starting video-native translation for {lang} (from source: {source_language})")
try:
await cost_tracker.aio_preflight(
model=gemini_service.model_name,
user_external_id=_cost_ctx["user_id"],
project_id=_cost_ctx["project_id"],
)
async def extract_targeted():
return await gemini_service.extract_accessibility_targeted(
video_local_path,
lang,
brand_context=job_brand_context,
sdh_requested=sdh_requested
sdh_requested=sdh_requested,
_cost_ctx=_cost_ctx,
)
result = await retry_with_backoff(extract_targeted, max_retries=3)
@ -378,6 +386,12 @@ async def _async_translate_and_synthesize(job_id: str):
for language in target_languages:
logger.info(f"Processing language: {language} (from source: {source_language}, mode: {translation_mode})")
await cost_tracker.aio_preflight(
model=gemini_service.model_name,
user_external_id=_cost_ctx["user_id"],
project_id=_cost_ctx["project_id"],
)
try:
if language in transcreation_languages:
# TRADITIONAL MODE with transcreation: cultural adaptation
@ -504,7 +518,8 @@ async def _async_translate_and_synthesize(job_id: str):
# Get TTS preferences from job
tts_preferences = job_doc["requested_outputs"].get("tts_preferences", {})
await _generate_tts_for_languages(
job_id, updated_outputs, db, source_language, tts_preferences, accessible_video_requested
job_id, updated_outputs, db, source_language, tts_preferences, accessible_video_requested,
user_id=_cost_ctx["user_id"], cost_project_id=_cost_ctx["project_id"],
)
# Update final status
@ -614,7 +629,9 @@ async def _generate_tts_for_languages(
db,
source_language: str = "en",
tts_preferences: dict = None,
accessible_video_requested: bool = False
accessible_video_requested: bool = False,
user_id: str = "system",
cost_project_id: str = None,
):
"""Generate TTS audio for each language's audio description.
@ -627,14 +644,16 @@ async def _generate_tts_for_languages(
# Always generate source language MP3 first
if source_language in outputs and "ad_vtt_gcs" in outputs[source_language]:
await _generate_language_tts(
job_id, source_language, outputs[source_language], db, tts_preferences, accessible_video_requested
job_id, source_language, outputs[source_language], db, tts_preferences, accessible_video_requested,
user_id=user_id, cost_project_id=cost_project_id,
)
# Generate for other languages
for language, lang_output in outputs.items():
if language != source_language and "ad_vtt_gcs" in lang_output:
await _generate_language_tts(
job_id, language, lang_output, db, tts_preferences, accessible_video_requested
job_id, language, lang_output, db, tts_preferences, accessible_video_requested,
user_id=user_id, cost_project_id=cost_project_id,
)
except TTSSynthesisError as e:
@ -683,7 +702,7 @@ async def _generate_tts_for_languages(
raise
async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None, accessible_video_requested: bool = False):
async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None, accessible_video_requested: bool = False, user_id: str = "system", cost_project_id: str = None):
"""
Generate TTS for a specific language using parallel cue synthesis.
@ -716,12 +735,24 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict,
f"accessible_video={accessible_video_requested}"
)
# Preflight budget check before dispatching TTS
tts_provider = tts_preferences.get("provider", "gemini")
from .tts_synthesis import _TTS_MODEL_STRINGS, _TTS_PROVIDER_MODEL_MAP
tts_model_key = tts_preferences.get("model", "flash")
await cost_tracker.aio_preflight(
model=_TTS_MODEL_STRINGS.get(tts_model_key, tts_model_key),
user_external_id=user_id,
project_id=cost_project_id,
)
# Dispatch parallel cue synthesis tasks to TTS worker
group_result = dispatch_language_tts(
job_id=job_id,
language=language,
cues=cues,
tts_preferences=tts_preferences
tts_preferences=tts_preferences,
user_id=user_id,
cost_project_id=cost_project_id,
)
if group_result is None:

View file

@ -51,24 +51,15 @@ def _record_tts_cost(
latency_ms: int,
) -> None:
try:
from ..core.dependencies import get_cost_tracker
ct = get_cost_tracker()
if not ct:
return
ct_provider = _TTS_PROVIDER_MODEL_MAP.get(provider, provider)
ct_model = _TTS_MODEL_STRINGS.get(model, model)
chars = len(text)
asyncio.get_event_loop().create_task(
ct.record(
user_external_id=user_id,
model=ct_model,
provider=ct_provider,
chars=chars,
job_external_id=job_id,
project_external_id=project_id,
latency_ms=latency_ms,
status="success",
)
from ..services.cost_tracker import record
record(
model=_TTS_MODEL_STRINGS.get(model, model),
provider=_TTS_PROVIDER_MODEL_MAP.get(provider, provider),
user_external_id=user_id,
project_id=project_id,
job_external_id=job_id,
chars=len(text),
latency_ms=latency_ms,
)
except Exception as e:
logger.warning(f"Cost tracker TTS record failed (non-fatal): {e}")
@ -345,7 +336,9 @@ def dispatch_language_tts(
job_id: str,
language: str,
cues: list[dict],
tts_preferences: dict
tts_preferences: dict,
user_id: Optional[str] = None,
cost_project_id: Optional[str] = None,
) -> AsyncResult:
"""
Dispatch a group of cue synthesis tasks for a language.
@ -400,6 +393,8 @@ def dispatch_language_tts(
style_prompt=style_prompt,
stability=stability,
similarity_boost=similarity_boost,
user_id=user_id,
cost_project_id=cost_project_id,
)
for i, cue in enumerate(cues)
if cue.get("text", "").strip() # Skip empty cues

View file

@ -42,9 +42,6 @@ python-magic = "^0.4.27"
aiohttp = "^3.12.15"
jinja2 = "^3.1.6"
audioop-lts = {version = "^0.2.2", python = ">=3.13"}
# AI Cost Tracker SDK — install from Bitbucket once published:
# oliver-cost-tracker = {git = "ssh://git@bitbucket.org/zlalani/ai-cost-tracker.git", subdirectory = "sdk"}
# Or temporarily: pip install -e /path/to/ai-cost-tracker/sdk/
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"

View file

@ -133,6 +133,12 @@ services:
# Observability
SENTRY_DSN: ${SENTRY_DSN:-}
OTEL_EXPORTER_OTLP_ENDPOINT: ${OTEL_EXPORTER_OTLP_ENDPOINT:-}
# AI Cost Tracker
COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-}
COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-}
COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility}
COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true}
volumes:
- ./secrets:/secrets:ro
- api-logs:/app/logs
@ -218,6 +224,12 @@ services:
# Observability
SENTRY_DSN: ${SENTRY_DSN:-}
# AI Cost Tracker
COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-}
COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-}
COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility}
COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true}
volumes:
- ./secrets:/secrets:ro
- worker-logs:/app/logs
@ -299,6 +311,12 @@ services:
# Observability
SENTRY_DSN: ${SENTRY_DSN:-}
# AI Cost Tracker
COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-}
COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-}
COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility}
COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true}
volumes:
- ./secrets:/secrets:ro
- tts-worker-logs:/app/logs
@ -386,6 +404,12 @@ services:
# Observability
SENTRY_DSN: ${SENTRY_DSN:-}
# AI Cost Tracker
COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-}
COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-}
COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility}
COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true}
volumes:
- ./secrets:/secrets:ro
- ffmpeg-worker-logs:/app/logs
@ -477,6 +501,12 @@ services:
# Observability
SENTRY_DSN: ${SENTRY_DSN:-}
# AI Cost Tracker
COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-}
COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-}
COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility}
COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true}
volumes:
- ./secrets:/secrets:ro
- whisper-worker-logs:/app/logs

View file

@ -93,6 +93,18 @@ spec:
key: latest
- name: SENTRY_ENVIRONMENT
value: "production"
# AI Cost Tracker
- name: COST_TRACKER_BASE_URL
value: "https://optical-dev.oliver.solutions/cost-tracker/v1"
- name: COST_TRACKER_API_KEY
valueFrom:
secretKeyRef:
name: cost-tracker-api-key
key: latest
- name: COST_TRACKER_SOURCE_APP
value: "video-accessibility"
- name: COST_TRACKER_ENABLED
value: "true"
resources:
limits:
memory: "2Gi"

View file

@ -88,6 +88,18 @@ spec:
secretKeyRef:
name: redis-url
key: latest
# AI Cost Tracker
- name: COST_TRACKER_BASE_URL
value: "https://optical-dev.oliver.solutions/cost-tracker/v1"
- name: COST_TRACKER_API_KEY
valueFrom:
secretKeyRef:
name: cost-tracker-api-key
key: latest
- name: COST_TRACKER_SOURCE_APP
value: "video-accessibility"
- name: COST_TRACKER_ENABLED
value: "true"
resources:
# TTS is memory-light compared to Whisper
limits:

View file

@ -90,6 +90,18 @@ spec:
secretKeyRef:
name: redis-url
key: latest
# AI Cost Tracker
- name: COST_TRACKER_BASE_URL
value: "https://optical-dev.oliver.solutions/cost-tracker/v1"
- name: COST_TRACKER_API_KEY
valueFrom:
secretKeyRef:
name: cost-tracker-api-key
key: latest
- name: COST_TRACKER_SOURCE_APP
value: "video-accessibility"
- name: COST_TRACKER_ENABLED
value: "true"
resources:
# Higher memory for Whisper large-v3 model (~4-6GB) + audio processing
limits:

View file

@ -86,6 +86,18 @@ spec:
secretKeyRef:
name: redis-url
key: latest
# AI Cost Tracker
- name: COST_TRACKER_BASE_URL
value: "https://optical-dev.oliver.solutions/cost-tracker/v1"
- name: COST_TRACKER_API_KEY
valueFrom:
secretKeyRef:
name: cost-tracker-api-key
key: latest
- name: COST_TRACKER_SOURCE_APP
value: "video-accessibility"
- name: COST_TRACKER_ENABLED
value: "true"
resources:
limits:
memory: "4Gi"