feat: integrate oliver-cost-tracker SDK into video-accessibility

Add AI cost tracking to all Gemini and TTS call sites:

- config.py: add COST_TRACKER_* env vars (base_url, api_key, source_app,
  outbox_path, enabled)
- dependencies.py: add get_cost_tracker() factory (lru_cache, graceful
  degradation if SDK not installed)
- models/job.py: add cost_tracker_project_id field for cost attribution
- services/gemini.py:
  - add import time, _record_gemini_usage() helper (reads usage_metadata)
  - add _cost_ctx kwarg to extract_accessibility, extract_accessibility_targeted,
    transcreate_content, translate_vtt, rewrite_tts_cue
  - record usage after every generate_content call via asyncio.create_task()
- tasks/ingest_and_ai.py: pass _cost_ctx (user_id, job_id, project_id) to
  extract_accessibility
- tasks/translate_and_synthesize.py: build _cost_ctx from job_doc and pass
  to transcreate_content + translate_vtt calls
- tasks/tts_synthesis.py: add user_id + cost_project_id kwargs, add
  _record_tts_cost() helper (records len(text) chars to cost tracker)
- pyproject.toml: document SDK install instructions (comment)
- .env.prod.example: add COST_TRACKER_* vars

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-27 11:30:46 +01:00
parent b5e5ad7e42
commit ae2c474061
10 changed files with 179 additions and 12 deletions

View file

@ -31,4 +31,11 @@ CORS_ORIGINS=https://your-domain.com,https://www.your-domain.com
# Frontend Build Configuration (for reference)
VITE_API_URL=https://your-api-domain.com:8000
VITE_SENTRY_DSN=your-frontend-sentry-dsn
VITE_ENVIRONMENT=production
VITE_ENVIRONMENT=production
# AI Cost Tracker (oliver-cost-tracker SDK)
COST_TRACKER_BASE_URL=https://cost.oliver.agency
COST_TRACKER_API_KEY=ct_live_your-api-key-here
COST_TRACKER_SOURCE_APP=video-accessibility
COST_TRACKER_OUTBOX_PATH=/tmp/cost_outbox.sqlite
COST_TRACKER_ENABLED=true

View file

@ -148,4 +148,10 @@ gs://accessible-video/{jobId}/
- Timestamp drift: Preserve cue timings in translations
- TTS alignment: Per-cue synthesis with crossfades
- Queue backlog: Autoscaling workers with monitoring
- Security: Secret Manager, least-privilege IAM, no client secrets
- Security: Secret Manager, least-privilege IAM, no client secrets
## Knowledge Wiki
A cross-project knowledge base is maintained automatically from all Claude Code sessions.
- **Index:** `/Users/aimpress/Library/Mobile Documents/iCloud~md~obsidian/Documents/VadymSamoilenko/wiki/index.md`
- **Query:** `cd ~/.claude/memory-compiler && uv run python scripts/query.py "your question"`
- Every session in this project automatically feeds the knowledge base.

View file

@ -228,6 +228,13 @@ class Settings(BaseSettings):
sentry_dsn: str = ""
otel_exporter_otlp_endpoint: str = ""
# AI Cost Tracker (oliver-cost-tracker SDK)
cost_tracker_base_url: str = ""
cost_tracker_api_key: str = ""
cost_tracker_source_app: str = "video-accessibility"
cost_tracker_outbox_path: str = "/tmp/cost_outbox.sqlite"
cost_tracker_enabled: bool = True
# CORS - comma-separated list of allowed origins
cors_origins: str = "http://localhost:5173,http://localhost:5174,http://localhost:3000,http://localhost:6001"

View file

@ -1,3 +1,4 @@
from functools import lru_cache
from typing import Optional
from fastapi import Depends, HTTPException, Request, status
@ -5,12 +6,32 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from motor.motor_asyncio import AsyncIOMotorDatabase
from ..models.user import User, UserRole
from .config import settings
from .database import get_database
from .security import decode_token
security = HTTPBearer()
@lru_cache(maxsize=1)
def get_cost_tracker():
if not settings.cost_tracker_enabled or not settings.cost_tracker_base_url:
return None
try:
from oliver_cost_tracker import CostTracker
ct = CostTracker(
base_url=settings.cost_tracker_base_url,
api_key=settings.cost_tracker_api_key,
source_app=settings.cost_tracker_source_app,
outbox_path=settings.cost_tracker_outbox_path,
graceful_degradation=True,
)
ct.start()
return ct
except ImportError:
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncIOMotorDatabase = Depends(get_database),

View file

@ -168,6 +168,7 @@ class Job(BaseModel):
error: Optional[dict[str, Any]] = None
tts_rewrites: Optional[list[dict[str, Any]]] = None # Track auto-rewritten TTS cues
brand_context: Optional[str] = None # Brand names present in the video for accurate product identification
cost_tracker_project_id: Optional[str] = None # External project ID for AI cost attribution
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None

View file

@ -1,5 +1,6 @@
import json
import asyncio
import time
from pathlib import Path
from typing import Any, Optional
@ -13,6 +14,39 @@ logger = get_logger(__name__)
# Configure Gemini client
client = genai.Client(api_key=settings.gemini_api_key)
async def _record_gemini_usage(
response,
model: str,
user_id: str,
job_id: str,
project_id: Optional[str],
elapsed_ms: int,
) -> None:
"""Fire-and-forget: record Gemini usage to cost tracker after a generate_content call."""
try:
from ..core.dependencies import get_cost_tracker
ct = get_cost_tracker()
if not ct:
return
usage = getattr(response, "usage_metadata", None)
if usage is None:
return
await ct.record(
user_external_id=user_id,
model=model,
provider="google",
input_tokens=getattr(usage, "prompt_token_count", 0) or 0,
output_tokens=getattr(usage, "candidates_token_count", 0) or 0,
job_external_id=job_id,
project_external_id=project_id,
latency_ms=elapsed_ms,
status="success",
)
except Exception as e:
logger.warning(f"Cost tracker record failed (non-fatal): {e}")
class GeminiService:
def __init__(self):
self.model_name = 'gemini-3-pro-preview' # Gemini 3 Pro preview model
@ -89,7 +123,7 @@ Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched w
)
return "No specific brand names have been provided for this video."
async def extract_accessibility(self, video_file_path: str, brand_context: Optional[str] = None, sdh_requested: bool = False) -> dict[str, Any]:
async def extract_accessibility(self, video_file_path: str, brand_context: Optional[str] = None, sdh_requested: bool = False, _cost_ctx: Optional[dict] = None) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini 2.0
Returns structured JSON with transcript, captions VTT, and audio description VTT
@ -126,6 +160,7 @@ Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched w
# Generate content using new API - use asyncio.to_thread to avoid blocking
logger.info("Generating content with Gemini model...")
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
@ -142,6 +177,8 @@ Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched w
top_k=40,
),
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
# Parse JSON response
response_text = response.text.strip()
@ -287,7 +324,8 @@ Fix the JSON and return it:
video_file_path: str,
target_language: str,
brand_context: Optional[str] = None,
sdh_requested: bool = False
sdh_requested: bool = False,
_cost_ctx: Optional[dict] = None,
) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini,
@ -340,6 +378,7 @@ Fix the JSON and return it:
# Generate content using new API
logger.info(f"Generating content with Gemini model for {target_language}...")
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
@ -351,6 +390,8 @@ Fix the JSON and return it:
)
]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
# Parse JSON response
response_text = response.text.strip()
@ -719,7 +760,8 @@ Fix the JSON and return it:
captions_vtt: str,
ad_vtt: str,
target_language: str,
brief: Optional[str] = None
brief: Optional[str] = None,
_cost_ctx: Optional[dict] = None,
) -> dict[str, str]:
"""
Transcreate English VTT content to target language with cultural adaptation
@ -742,6 +784,7 @@ JSON:
"""
try:
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
@ -749,6 +792,8 @@ JSON:
genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt)
]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
response_text = response.text.strip()
@ -776,7 +821,8 @@ JSON:
self,
vtt_content: str,
target_language: str,
source_language: str = "en"
source_language: str = "en",
_cost_ctx: Optional[dict] = None,
) -> str:
"""
Translate VTT content using Gemini, preserving timing programmatically.
@ -813,11 +859,14 @@ REQUIREMENTS:
Segments to translate:
{numbered_texts}"""
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=prompt)]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
return self._parse_numbered_translation(response.text.strip(), cue_count)
try:
@ -877,7 +926,8 @@ Segments to translate:
async def rewrite_tts_cue(
self,
original_text: str,
language: str = "en"
language: str = "en",
_cost_ctx: Optional[dict] = None,
) -> str:
"""
Rewrite an audio description cue to be TTS-friendly.
@ -902,11 +952,14 @@ Segments to translate:
try:
logger.info(f"Rewriting TTS cue for safety: '{original_text[:50]}...'")
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=prompt)]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
result = response.text.strip()

View file

@ -205,10 +205,16 @@ async def ingest_and_ai_task_impl(job_id: str):
# Process with Gemini
brand_context = job_doc.get("brand_context")
sdh_requested = job_doc.get("requested_outputs", {}).get("sdh_vtt", False)
_cost_ctx = {
"user_id": job_doc.get("client_id", "system"),
"job_id": job_id,
"project_id": job_doc.get("cost_tracker_project_id"),
}
ai_result = await gemini_service.extract_accessibility(
temp_path,
brand_context=brand_context,
sdh_requested=sdh_requested
sdh_requested=sdh_requested,
_cost_ctx=_cost_ctx,
)
# Final safety check for required fields

View file

@ -176,6 +176,11 @@ async def _async_translate_and_synthesize(job_id: str):
job_title = job_doc.get("title", "Untitled Job")
logger.info(f"✅ Found job document for {job_id} ({job_title}), status: {job_doc.get('status', 'UNKNOWN')}")
_cost_ctx = {
"user_id": job_doc.get("client_id", "system"),
"job_id": job_id,
"project_id": job_doc.get("cost_tracker_project_id"),
}
# Check for valid status to process translation
# Valid statuses: approved_english, approved_source (legacy), or translating (new workflow)
@ -381,7 +386,8 @@ async def _async_translate_and_synthesize(job_id: str):
source_captions_vtt,
source_ad_vtt,
language,
brief="Standard accessibility content"
brief="Standard accessibility content",
_cost_ctx=_cost_ctx,
)
result = await retry_with_backoff(transcreate, max_retries=3)
@ -393,12 +399,14 @@ async def _async_translate_and_synthesize(job_id: str):
# TRADITIONAL MODE: Use Gemini translation (6-36x cheaper than Google Translate API)
async def translate_captions():
return await gemini_service.translate_vtt(
source_captions_vtt, language, source_language=source_language
source_captions_vtt, language, source_language=source_language,
_cost_ctx=_cost_ctx,
)
async def translate_ad():
return await gemini_service.translate_vtt(
source_ad_vtt, language, source_language=source_language
source_ad_vtt, language, source_language=source_language,
_cost_ctx=_cost_ctx,
)
translated_captions = await retry_with_backoff(translate_captions, max_retries=3)
@ -425,7 +433,8 @@ async def _async_translate_and_synthesize(job_id: str):
if sdh_requested and source_sdh_vtt:
async def translate_sdh():
return await gemini_service.translate_vtt(
source_sdh_vtt, language, source_language=source_language
source_sdh_vtt, language, source_language=source_language,
_cost_ctx=_cost_ctx,
)
translated_sdh = await retry_with_backoff(translate_sdh, max_retries=3)
sdh_gcs_uri = await upload_vtt_to_gcs(

View file

@ -24,6 +24,55 @@ from . import celery_app
logger = get_logger(__name__)
_TTS_PROVIDER_MODEL_MAP = {
# (provider, model) → cost-tracker provider + model strings
"gemini": "google",
"google": "google_tts",
"elevenlabs": "elevenlabs",
}
_TTS_MODEL_STRINGS = {
"flash": "gemini-2.5-flash-preview-tts",
"pro": "gemini-2.5-pro-preview-tts",
"standard": "standard",
"wavenet": "wavenet",
"neural2": "neural2",
"elevenlabs": "eleven_multilingual_v2",
}
def _record_tts_cost(
provider: str,
model: str,
text: str,
user_id: str,
job_id: str,
project_id: Optional[str],
latency_ms: int,
) -> None:
try:
from ..core.dependencies import get_cost_tracker
ct = get_cost_tracker()
if not ct:
return
ct_provider = _TTS_PROVIDER_MODEL_MAP.get(provider, provider)
ct_model = _TTS_MODEL_STRINGS.get(model, model)
chars = len(text)
asyncio.get_event_loop().create_task(
ct.record(
user_external_id=user_id,
model=ct_model,
provider=ct_provider,
chars=chars,
job_external_id=job_id,
project_external_id=project_id,
latency_ms=latency_ms,
status="success",
)
)
except Exception as e:
logger.warning(f"Cost tracker TTS record failed (non-fatal): {e}")
@celery_app.task(
bind=True,
@ -48,6 +97,8 @@ def synthesize_cue_task(
style_prompt: str,
stability: float = 0.5,
similarity_boost: float = 0.5,
user_id: Optional[str] = None,
cost_project_id: Optional[str] = None,
) -> dict:
"""
Synthesize a single AD cue and upload to GCS immediately.
@ -102,6 +153,9 @@ def synthesize_cue_task(
f"cue={cue_index}, duration={duration:.2f}s, elapsed={elapsed_ms:.0f}ms"
)
# Record TTS cost (fire-and-forget)
_record_tts_cost(provider, model, text, user_id or "system", job_id, cost_project_id, int(elapsed_ms))
return {
"cue_index": cue_index,
"job_id": job_id,

View file

@ -42,6 +42,9 @@ python-magic = "^0.4.27"
aiohttp = "^3.12.15"
jinja2 = "^3.1.6"
audioop-lts = {version = "^0.2.2", python = ">=3.13"}
# AI Cost Tracker SDK — install from Bitbucket once published:
# oliver-cost-tracker = {git = "ssh://git@bitbucket.org/zlalani/ai-cost-tracker.git", subdirectory = "sdk"}
# Or temporarily: pip install -e /path/to/ai-cost-tracker/sdk/
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"