diff --git a/.env.prod.example b/.env.prod.example index 5012d0f..176dddb 100644 --- a/.env.prod.example +++ b/.env.prod.example @@ -33,9 +33,8 @@ VITE_API_URL=https://your-api-domain.com:8000 VITE_SENTRY_DSN=your-frontend-sentry-dsn VITE_ENVIRONMENT=production -# AI Cost Tracker (oliver-cost-tracker SDK) -COST_TRACKER_BASE_URL=https://cost.oliver.agency +# AI Cost Tracker (direct HTTP — see backend/app/services/cost_tracker.py) +COST_TRACKER_BASE_URL=https://optical-dev.oliver.solutions/cost-tracker/v1 COST_TRACKER_API_KEY=ct_live_your-api-key-here COST_TRACKER_SOURCE_APP=video-accessibility -COST_TRACKER_OUTBOX_PATH=/tmp/cost_outbox.sqlite COST_TRACKER_ENABLED=true \ No newline at end of file diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 80c33ed..1424d20 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -228,11 +228,10 @@ class Settings(BaseSettings): sentry_dsn: str = "" otel_exporter_otlp_endpoint: str = "" - # AI Cost Tracker (oliver-cost-tracker SDK) + # AI Cost Tracker (direct HTTP — see services/cost_tracker.py) cost_tracker_base_url: str = "" cost_tracker_api_key: str = "" cost_tracker_source_app: str = "video-accessibility" - cost_tracker_outbox_path: str = "/tmp/cost_outbox.sqlite" cost_tracker_enabled: bool = True # CORS - comma-separated list of allowed origins diff --git a/backend/app/core/dependencies.py b/backend/app/core/dependencies.py index bbca5b4..a20149b 100644 --- a/backend/app/core/dependencies.py +++ b/backend/app/core/dependencies.py @@ -1,4 +1,3 @@ -from functools import lru_cache from typing import Optional from fastapi import Depends, HTTPException, Request, status @@ -13,25 +12,6 @@ from .security import decode_token security = HTTPBearer() -@lru_cache(maxsize=1) -def get_cost_tracker(): - if not settings.cost_tracker_enabled or not settings.cost_tracker_base_url: - return None - try: - from oliver_cost_tracker import CostTracker - ct = CostTracker( - base_url=settings.cost_tracker_base_url, - api_key=settings.cost_tracker_api_key, - source_app=settings.cost_tracker_source_app, - outbox_path=settings.cost_tracker_outbox_path, - graceful_degradation=True, - ) - ct.start() - return ct - except ImportError: - return None - - async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncIOMotorDatabase = Depends(get_database), diff --git a/backend/app/services/cost_tracker.py b/backend/app/services/cost_tracker.py new file mode 100644 index 0000000..019dc49 --- /dev/null +++ b/backend/app/services/cost_tracker.py @@ -0,0 +1,116 @@ +"""Thin HTTP client for the centralized Oliver AI Cost Tracker. + +Both functions no-op gracefully when COST_TRACKER_BASE_URL is empty or +COST_TRACKER_ENABLED is false, so local development needs no extra config. +""" + +import asyncio +from typing import Optional + +import httpx + +from ..core.config import settings +from ..core.logging import get_logger + +logger = get_logger(__name__) + + +class BudgetExceeded(Exception): + """Raised by preflight() when the cost tracker rejects a call due to budget limits.""" + + +def preflight( + *, + model: str, + user_external_id: str, + project_id: Optional[str] = None, +) -> None: + """Check budget before an AI call. Raises BudgetExceeded if not allowed. + + Fails open on all transport/server errors so a cost-tracker outage never + blocks AI work. + """ + if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled: + return + try: + resp = httpx.post( + f"{settings.cost_tracker_base_url}/preflight", + headers={"X-API-Key": settings.cost_tracker_api_key}, + json={ + "source_app": settings.cost_tracker_source_app, + "model": model, + "user_external_id": user_external_id, + "project_id": project_id, + }, + timeout=5.0, + ) + resp.raise_for_status() + data = resp.json() + if not data.get("allowed", True): + raise BudgetExceeded(data.get("reason") or "Budget exceeded") + except BudgetExceeded: + raise + except Exception as e: + logger.warning(f"Cost tracker preflight failed (non-fatal): {e}") + + +async def aio_preflight( + *, + model: str, + user_external_id: str, + project_id: Optional[str] = None, +) -> None: + """Async wrapper for preflight(). Raises BudgetExceeded if not allowed.""" + await asyncio.to_thread( + preflight, + model=model, + user_external_id=user_external_id, + project_id=project_id, + ) + + +def record( + *, + model: str, + provider: str, + user_external_id: str, + project_id: Optional[str] = None, + job_external_id: str = "", + input_tokens: int = 0, + output_tokens: int = 0, + chars: Optional[int] = None, + latency_ms: int = 0, + status: str = "success", +) -> None: + """Record AI usage. Never raises — fire-and-forget.""" + if not settings.cost_tracker_base_url or not settings.cost_tracker_enabled: + return + try: + payload: dict = { + "source_app": settings.cost_tracker_source_app, + "model": model, + "provider": provider, + "user_external_id": user_external_id, + "project_id": project_id, + "job_external_id": job_external_id, + "latency_ms": latency_ms, + "status": status, + } + if chars is not None: + payload["chars"] = chars + else: + payload["input_tokens"] = input_tokens + payload["output_tokens"] = output_tokens + httpx.post( + f"{settings.cost_tracker_base_url}/record", + headers={"X-API-Key": settings.cost_tracker_api_key}, + json=payload, + timeout=3.0, + ) + except Exception as e: + logger.warning(f"Cost tracker record failed (non-fatal): {e}") + + +async def aio_record(**kwargs) -> None: + """Async wrapper for record(); use in async call sites.""" + await asyncio.to_thread(record, **kwargs) diff --git a/backend/app/services/gemini.py b/backend/app/services/gemini.py index 45ad166..72b351e 100644 --- a/backend/app/services/gemini.py +++ b/backend/app/services/gemini.py @@ -23,25 +23,20 @@ async def _record_gemini_usage( project_id: Optional[str], elapsed_ms: int, ) -> None: - """Fire-and-forget: record Gemini usage to cost tracker after a generate_content call.""" try: - from ..core.dependencies import get_cost_tracker - ct = get_cost_tracker() - if not ct: - return + from ..services import cost_tracker usage = getattr(response, "usage_metadata", None) if usage is None: return - await ct.record( - user_external_id=user_id, + await cost_tracker.aio_record( model=model, provider="google", + user_external_id=user_id, + project_id=project_id, + job_external_id=job_id, input_tokens=getattr(usage, "prompt_token_count", 0) or 0, output_tokens=getattr(usage, "candidates_token_count", 0) or 0, - job_external_id=job_id, - project_external_id=project_id, latency_ms=elapsed_ms, - status="success", ) except Exception as e: logger.warning(f"Cost tracker record failed (non-fatal): {e}") diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index 39727fc..1d473a8 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -10,6 +10,8 @@ from motor.motor_asyncio import AsyncIOMotorClient from ..core.config import settings from ..core.logging import get_logger from ..models.job import JobStatus +from ..services import cost_tracker +from ..services.cost_tracker import BudgetExceeded from ..services.gcs import gcs_service, upload_vtt_to_gcs from ..services.gemini import gemini_service from ..services.websocket import connection_manager @@ -210,6 +212,11 @@ async def ingest_and_ai_task_impl(job_id: str): "job_id": job_id, "project_id": job_doc.get("cost_tracker_project_id"), } + await cost_tracker.aio_preflight( + model=gemini_service.model_name, + user_external_id=_cost_ctx["user_id"], + project_id=_cost_ctx["project_id"], + ) ai_result = await gemini_service.extract_accessibility( temp_path, brand_context=brand_context, diff --git a/backend/app/tasks/rerender_accessible_video.py b/backend/app/tasks/rerender_accessible_video.py index 0ff7077..0203d4e 100644 --- a/backend/app/tasks/rerender_accessible_video.py +++ b/backend/app/tasks/rerender_accessible_video.py @@ -458,7 +458,9 @@ async def _regenerate_tts_cues( "provider": provider, "model": model, "speed": speed, - "style_prompt": style_prompt + "style_prompt": style_prompt, + "user_id": job_doc.get("client_id", "system"), + "cost_project_id": job_doc.get("cost_tracker_project_id"), }, queue="tts" ) diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index a0f4537..674f45a 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -13,6 +13,7 @@ from motor.motor_asyncio import AsyncIOMotorClient from ..core.config import settings from ..core.logging import get_logger from ..models.job import JobStatus +from ..services import cost_tracker from ..services.gcs import gcs_service, upload_vtt_to_gcs from ..services.gemini import gemini_service from ..services.gemini_tts import TTSSynthesisError @@ -286,12 +287,19 @@ async def _async_translate_and_synthesize(job_id: str): async with semaphore: logger.info(f"Starting video-native translation for {lang} (from source: {source_language})") try: + await cost_tracker.aio_preflight( + model=gemini_service.model_name, + user_external_id=_cost_ctx["user_id"], + project_id=_cost_ctx["project_id"], + ) + async def extract_targeted(): return await gemini_service.extract_accessibility_targeted( video_local_path, lang, brand_context=job_brand_context, - sdh_requested=sdh_requested + sdh_requested=sdh_requested, + _cost_ctx=_cost_ctx, ) result = await retry_with_backoff(extract_targeted, max_retries=3) @@ -378,6 +386,12 @@ async def _async_translate_and_synthesize(job_id: str): for language in target_languages: logger.info(f"Processing language: {language} (from source: {source_language}, mode: {translation_mode})") + await cost_tracker.aio_preflight( + model=gemini_service.model_name, + user_external_id=_cost_ctx["user_id"], + project_id=_cost_ctx["project_id"], + ) + try: if language in transcreation_languages: # TRADITIONAL MODE with transcreation: cultural adaptation @@ -504,7 +518,8 @@ async def _async_translate_and_synthesize(job_id: str): # Get TTS preferences from job tts_preferences = job_doc["requested_outputs"].get("tts_preferences", {}) await _generate_tts_for_languages( - job_id, updated_outputs, db, source_language, tts_preferences, accessible_video_requested + job_id, updated_outputs, db, source_language, tts_preferences, accessible_video_requested, + user_id=_cost_ctx["user_id"], cost_project_id=_cost_ctx["project_id"], ) # Update final status @@ -614,7 +629,9 @@ async def _generate_tts_for_languages( db, source_language: str = "en", tts_preferences: dict = None, - accessible_video_requested: bool = False + accessible_video_requested: bool = False, + user_id: str = "system", + cost_project_id: str = None, ): """Generate TTS audio for each language's audio description. @@ -627,14 +644,16 @@ async def _generate_tts_for_languages( # Always generate source language MP3 first if source_language in outputs and "ad_vtt_gcs" in outputs[source_language]: await _generate_language_tts( - job_id, source_language, outputs[source_language], db, tts_preferences, accessible_video_requested + job_id, source_language, outputs[source_language], db, tts_preferences, accessible_video_requested, + user_id=user_id, cost_project_id=cost_project_id, ) # Generate for other languages for language, lang_output in outputs.items(): if language != source_language and "ad_vtt_gcs" in lang_output: await _generate_language_tts( - job_id, language, lang_output, db, tts_preferences, accessible_video_requested + job_id, language, lang_output, db, tts_preferences, accessible_video_requested, + user_id=user_id, cost_project_id=cost_project_id, ) except TTSSynthesisError as e: @@ -683,7 +702,7 @@ async def _generate_tts_for_languages( raise -async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None, accessible_video_requested: bool = False): +async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None, accessible_video_requested: bool = False, user_id: str = "system", cost_project_id: str = None): """ Generate TTS for a specific language using parallel cue synthesis. @@ -716,12 +735,24 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict, f"accessible_video={accessible_video_requested}" ) + # Preflight budget check before dispatching TTS + tts_provider = tts_preferences.get("provider", "gemini") + from .tts_synthesis import _TTS_MODEL_STRINGS, _TTS_PROVIDER_MODEL_MAP + tts_model_key = tts_preferences.get("model", "flash") + await cost_tracker.aio_preflight( + model=_TTS_MODEL_STRINGS.get(tts_model_key, tts_model_key), + user_external_id=user_id, + project_id=cost_project_id, + ) + # Dispatch parallel cue synthesis tasks to TTS worker group_result = dispatch_language_tts( job_id=job_id, language=language, cues=cues, - tts_preferences=tts_preferences + tts_preferences=tts_preferences, + user_id=user_id, + cost_project_id=cost_project_id, ) if group_result is None: diff --git a/backend/app/tasks/tts_synthesis.py b/backend/app/tasks/tts_synthesis.py index 64a454c..1fb7370 100644 --- a/backend/app/tasks/tts_synthesis.py +++ b/backend/app/tasks/tts_synthesis.py @@ -51,24 +51,15 @@ def _record_tts_cost( latency_ms: int, ) -> None: try: - from ..core.dependencies import get_cost_tracker - ct = get_cost_tracker() - if not ct: - return - ct_provider = _TTS_PROVIDER_MODEL_MAP.get(provider, provider) - ct_model = _TTS_MODEL_STRINGS.get(model, model) - chars = len(text) - asyncio.get_event_loop().create_task( - ct.record( - user_external_id=user_id, - model=ct_model, - provider=ct_provider, - chars=chars, - job_external_id=job_id, - project_external_id=project_id, - latency_ms=latency_ms, - status="success", - ) + from ..services.cost_tracker import record + record( + model=_TTS_MODEL_STRINGS.get(model, model), + provider=_TTS_PROVIDER_MODEL_MAP.get(provider, provider), + user_external_id=user_id, + project_id=project_id, + job_external_id=job_id, + chars=len(text), + latency_ms=latency_ms, ) except Exception as e: logger.warning(f"Cost tracker TTS record failed (non-fatal): {e}") @@ -345,7 +336,9 @@ def dispatch_language_tts( job_id: str, language: str, cues: list[dict], - tts_preferences: dict + tts_preferences: dict, + user_id: Optional[str] = None, + cost_project_id: Optional[str] = None, ) -> AsyncResult: """ Dispatch a group of cue synthesis tasks for a language. @@ -400,6 +393,8 @@ def dispatch_language_tts( style_prompt=style_prompt, stability=stability, similarity_boost=similarity_boost, + user_id=user_id, + cost_project_id=cost_project_id, ) for i, cue in enumerate(cues) if cue.get("text", "").strip() # Skip empty cues diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 4392241..943a1b2 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -42,9 +42,6 @@ python-magic = "^0.4.27" aiohttp = "^3.12.15" jinja2 = "^3.1.6" audioop-lts = {version = "^0.2.2", python = ">=3.13"} -# AI Cost Tracker SDK — install from Bitbucket once published: -# oliver-cost-tracker = {git = "ssh://git@bitbucket.org/zlalani/ai-cost-tracker.git", subdirectory = "sdk"} -# Or temporarily: pip install -e /path/to/ai-cost-tracker/sdk/ [tool.poetry.group.dev.dependencies] pytest = "^7.4.3" diff --git a/docker-compose.yml b/docker-compose.yml index a82fda4..5f9cf2a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -133,6 +133,12 @@ services: # Observability SENTRY_DSN: ${SENTRY_DSN:-} OTEL_EXPORTER_OTLP_ENDPOINT: ${OTEL_EXPORTER_OTLP_ENDPOINT:-} + + # AI Cost Tracker + COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-} + COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-} + COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility} + COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true} volumes: - ./secrets:/secrets:ro - api-logs:/app/logs @@ -218,6 +224,12 @@ services: # Observability SENTRY_DSN: ${SENTRY_DSN:-} + + # AI Cost Tracker + COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-} + COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-} + COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility} + COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true} volumes: - ./secrets:/secrets:ro - worker-logs:/app/logs @@ -299,6 +311,12 @@ services: # Observability SENTRY_DSN: ${SENTRY_DSN:-} + + # AI Cost Tracker + COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-} + COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-} + COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility} + COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true} volumes: - ./secrets:/secrets:ro - tts-worker-logs:/app/logs @@ -386,6 +404,12 @@ services: # Observability SENTRY_DSN: ${SENTRY_DSN:-} + + # AI Cost Tracker + COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-} + COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-} + COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility} + COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true} volumes: - ./secrets:/secrets:ro - ffmpeg-worker-logs:/app/logs @@ -477,6 +501,12 @@ services: # Observability SENTRY_DSN: ${SENTRY_DSN:-} + + # AI Cost Tracker + COST_TRACKER_BASE_URL: ${COST_TRACKER_BASE_URL:-} + COST_TRACKER_API_KEY: ${COST_TRACKER_API_KEY:-} + COST_TRACKER_SOURCE_APP: ${COST_TRACKER_SOURCE_APP:-video-accessibility} + COST_TRACKER_ENABLED: ${COST_TRACKER_ENABLED:-true} volumes: - ./secrets:/secrets:ro - whisper-worker-logs:/app/logs diff --git a/infra/cloud-run/api-service.yaml b/infra/cloud-run/api-service.yaml index 8d218d5..c0c01d8 100644 --- a/infra/cloud-run/api-service.yaml +++ b/infra/cloud-run/api-service.yaml @@ -93,6 +93,18 @@ spec: key: latest - name: SENTRY_ENVIRONMENT value: "production" + # AI Cost Tracker + - name: COST_TRACKER_BASE_URL + value: "https://optical-dev.oliver.solutions/cost-tracker/v1" + - name: COST_TRACKER_API_KEY + valueFrom: + secretKeyRef: + name: cost-tracker-api-key + key: latest + - name: COST_TRACKER_SOURCE_APP + value: "video-accessibility" + - name: COST_TRACKER_ENABLED + value: "true" resources: limits: memory: "2Gi" diff --git a/infra/cloud-run/tts-worker-service.yaml b/infra/cloud-run/tts-worker-service.yaml index 62df921..675145a 100644 --- a/infra/cloud-run/tts-worker-service.yaml +++ b/infra/cloud-run/tts-worker-service.yaml @@ -88,6 +88,18 @@ spec: secretKeyRef: name: redis-url key: latest + # AI Cost Tracker + - name: COST_TRACKER_BASE_URL + value: "https://optical-dev.oliver.solutions/cost-tracker/v1" + - name: COST_TRACKER_API_KEY + valueFrom: + secretKeyRef: + name: cost-tracker-api-key + key: latest + - name: COST_TRACKER_SOURCE_APP + value: "video-accessibility" + - name: COST_TRACKER_ENABLED + value: "true" resources: # TTS is memory-light compared to Whisper limits: diff --git a/infra/cloud-run/whisper-worker-service.yaml b/infra/cloud-run/whisper-worker-service.yaml index 762374e..7db9a71 100644 --- a/infra/cloud-run/whisper-worker-service.yaml +++ b/infra/cloud-run/whisper-worker-service.yaml @@ -90,6 +90,18 @@ spec: secretKeyRef: name: redis-url key: latest + # AI Cost Tracker + - name: COST_TRACKER_BASE_URL + value: "https://optical-dev.oliver.solutions/cost-tracker/v1" + - name: COST_TRACKER_API_KEY + valueFrom: + secretKeyRef: + name: cost-tracker-api-key + key: latest + - name: COST_TRACKER_SOURCE_APP + value: "video-accessibility" + - name: COST_TRACKER_ENABLED + value: "true" resources: # Higher memory for Whisper large-v3 model (~4-6GB) + audio processing limits: diff --git a/infra/cloud-run/worker-service.yaml b/infra/cloud-run/worker-service.yaml index ffa8adc..96211b2 100644 --- a/infra/cloud-run/worker-service.yaml +++ b/infra/cloud-run/worker-service.yaml @@ -86,6 +86,18 @@ spec: secretKeyRef: name: redis-url key: latest + # AI Cost Tracker + - name: COST_TRACKER_BASE_URL + value: "https://optical-dev.oliver.solutions/cost-tracker/v1" + - name: COST_TRACKER_API_KEY + valueFrom: + secretKeyRef: + name: cost-tracker-api-key + key: latest + - name: COST_TRACKER_SOURCE_APP + value: "video-accessibility" + - name: COST_TRACKER_ENABLED + value: "true" resources: limits: memory: "4Gi"