diff --git a/.env.prod.example b/.env.prod.example index ebf52d9..5012d0f 100644 --- a/.env.prod.example +++ b/.env.prod.example @@ -31,4 +31,11 @@ CORS_ORIGINS=https://your-domain.com,https://www.your-domain.com # Frontend Build Configuration (for reference) VITE_API_URL=https://your-api-domain.com:8000 VITE_SENTRY_DSN=your-frontend-sentry-dsn -VITE_ENVIRONMENT=production \ No newline at end of file +VITE_ENVIRONMENT=production + +# AI Cost Tracker (oliver-cost-tracker SDK) +COST_TRACKER_BASE_URL=https://cost.oliver.agency +COST_TRACKER_API_KEY=ct_live_your-api-key-here +COST_TRACKER_SOURCE_APP=video-accessibility +COST_TRACKER_OUTBOX_PATH=/tmp/cost_outbox.sqlite +COST_TRACKER_ENABLED=true \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md index 969f4c9..9fb2153 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -148,4 +148,10 @@ gs://accessible-video/{jobId}/ - Timestamp drift: Preserve cue timings in translations - TTS alignment: Per-cue synthesis with crossfades - Queue backlog: Autoscaling workers with monitoring -- Security: Secret Manager, least-privilege IAM, no client secrets \ No newline at end of file +- Security: Secret Manager, least-privilege IAM, no client secrets + +## Knowledge Wiki +A cross-project knowledge base is maintained automatically from all Claude Code sessions. +- **Index:** `/Users/aimpress/Library/Mobile Documents/iCloud~md~obsidian/Documents/VadymSamoilenko/wiki/index.md` +- **Query:** `cd ~/.claude/memory-compiler && uv run python scripts/query.py "your question"` +- Every session in this project automatically feeds the knowledge base. diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 2434e58..80c33ed 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -228,6 +228,13 @@ class Settings(BaseSettings): sentry_dsn: str = "" otel_exporter_otlp_endpoint: str = "" + # AI Cost Tracker (oliver-cost-tracker SDK) + cost_tracker_base_url: str = "" + cost_tracker_api_key: str = "" + cost_tracker_source_app: str = "video-accessibility" + cost_tracker_outbox_path: str = "/tmp/cost_outbox.sqlite" + cost_tracker_enabled: bool = True + # CORS - comma-separated list of allowed origins cors_origins: str = "http://localhost:5173,http://localhost:5174,http://localhost:3000,http://localhost:6001" diff --git a/backend/app/core/dependencies.py b/backend/app/core/dependencies.py index fe68a6f..bbca5b4 100644 --- a/backend/app/core/dependencies.py +++ b/backend/app/core/dependencies.py @@ -1,3 +1,4 @@ +from functools import lru_cache from typing import Optional from fastapi import Depends, HTTPException, Request, status @@ -5,12 +6,32 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from motor.motor_asyncio import AsyncIOMotorDatabase from ..models.user import User, UserRole +from .config import settings from .database import get_database from .security import decode_token security = HTTPBearer() +@lru_cache(maxsize=1) +def get_cost_tracker(): + if not settings.cost_tracker_enabled or not settings.cost_tracker_base_url: + return None + try: + from oliver_cost_tracker import CostTracker + ct = CostTracker( + base_url=settings.cost_tracker_base_url, + api_key=settings.cost_tracker_api_key, + source_app=settings.cost_tracker_source_app, + outbox_path=settings.cost_tracker_outbox_path, + graceful_degradation=True, + ) + ct.start() + return ct + except ImportError: + return None + + async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncIOMotorDatabase = Depends(get_database), diff --git a/backend/app/models/job.py b/backend/app/models/job.py index 38b806b..58aa0cf 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -168,6 +168,7 @@ class Job(BaseModel): error: Optional[dict[str, Any]] = None tts_rewrites: Optional[list[dict[str, Any]]] = None # Track auto-rewritten TTS cues brand_context: Optional[str] = None # Brand names present in the video for accurate product identification + cost_tracker_project_id: Optional[str] = None # External project ID for AI cost attribution created_at: Optional[datetime] = None updated_at: Optional[datetime] = None diff --git a/backend/app/services/gemini.py b/backend/app/services/gemini.py index 8d369a8..45ad166 100644 --- a/backend/app/services/gemini.py +++ b/backend/app/services/gemini.py @@ -1,5 +1,6 @@ import json import asyncio +import time from pathlib import Path from typing import Any, Optional @@ -13,6 +14,39 @@ logger = get_logger(__name__) # Configure Gemini client client = genai.Client(api_key=settings.gemini_api_key) + +async def _record_gemini_usage( + response, + model: str, + user_id: str, + job_id: str, + project_id: Optional[str], + elapsed_ms: int, +) -> None: + """Fire-and-forget: record Gemini usage to cost tracker after a generate_content call.""" + try: + from ..core.dependencies import get_cost_tracker + ct = get_cost_tracker() + if not ct: + return + usage = getattr(response, "usage_metadata", None) + if usage is None: + return + await ct.record( + user_external_id=user_id, + model=model, + provider="google", + input_tokens=getattr(usage, "prompt_token_count", 0) or 0, + output_tokens=getattr(usage, "candidates_token_count", 0) or 0, + job_external_id=job_id, + project_external_id=project_id, + latency_ms=elapsed_ms, + status="success", + ) + except Exception as e: + logger.warning(f"Cost tracker record failed (non-fatal): {e}") + + class GeminiService: def __init__(self): self.model_name = 'gemini-3-pro-preview' # Gemini 3 Pro preview model @@ -89,7 +123,7 @@ Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched w ) return "No specific brand names have been provided for this video." - async def extract_accessibility(self, video_file_path: str, brand_context: Optional[str] = None, sdh_requested: bool = False) -> dict[str, Any]: + async def extract_accessibility(self, video_file_path: str, brand_context: Optional[str] = None, sdh_requested: bool = False, _cost_ctx: Optional[dict] = None) -> dict[str, Any]: """ Extract captions and audio descriptions from video using Gemini 2.0 Returns structured JSON with transcript, captions VTT, and audio description VTT @@ -126,6 +160,7 @@ Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched w # Generate content using new API - use asyncio.to_thread to avoid blocking logger.info("Generating content with Gemini model...") + _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, @@ -142,6 +177,8 @@ Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched w top_k=40, ), ) + if _cost_ctx: + asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) # Parse JSON response response_text = response.text.strip() @@ -287,7 +324,8 @@ Fix the JSON and return it: video_file_path: str, target_language: str, brand_context: Optional[str] = None, - sdh_requested: bool = False + sdh_requested: bool = False, + _cost_ctx: Optional[dict] = None, ) -> dict[str, Any]: """ Extract captions and audio descriptions from video using Gemini, @@ -340,6 +378,7 @@ Fix the JSON and return it: # Generate content using new API logger.info(f"Generating content with Gemini model for {target_language}...") + _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, @@ -351,6 +390,8 @@ Fix the JSON and return it: ) ] ) + if _cost_ctx: + asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) # Parse JSON response response_text = response.text.strip() @@ -719,7 +760,8 @@ Fix the JSON and return it: captions_vtt: str, ad_vtt: str, target_language: str, - brief: Optional[str] = None + brief: Optional[str] = None, + _cost_ctx: Optional[dict] = None, ) -> dict[str, str]: """ Transcreate English VTT content to target language with cultural adaptation @@ -742,6 +784,7 @@ JSON: """ try: + _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, @@ -749,6 +792,8 @@ JSON: genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt) ] ) + if _cost_ctx: + asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) response_text = response.text.strip() @@ -776,7 +821,8 @@ JSON: self, vtt_content: str, target_language: str, - source_language: str = "en" + source_language: str = "en", + _cost_ctx: Optional[dict] = None, ) -> str: """ Translate VTT content using Gemini, preserving timing programmatically. @@ -813,11 +859,14 @@ REQUIREMENTS: Segments to translate: {numbered_texts}""" + _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[genai.types.Part.from_text(text=prompt)] ) + if _cost_ctx: + asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) return self._parse_numbered_translation(response.text.strip(), cue_count) try: @@ -877,7 +926,8 @@ Segments to translate: async def rewrite_tts_cue( self, original_text: str, - language: str = "en" + language: str = "en", + _cost_ctx: Optional[dict] = None, ) -> str: """ Rewrite an audio description cue to be TTS-friendly. @@ -902,11 +952,14 @@ Segments to translate: try: logger.info(f"Rewriting TTS cue for safety: '{original_text[:50]}...'") + _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[genai.types.Part.from_text(text=prompt)] ) + if _cost_ctx: + asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) result = response.text.strip() diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index a96fc6a..39727fc 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -205,10 +205,16 @@ async def ingest_and_ai_task_impl(job_id: str): # Process with Gemini brand_context = job_doc.get("brand_context") sdh_requested = job_doc.get("requested_outputs", {}).get("sdh_vtt", False) + _cost_ctx = { + "user_id": job_doc.get("client_id", "system"), + "job_id": job_id, + "project_id": job_doc.get("cost_tracker_project_id"), + } ai_result = await gemini_service.extract_accessibility( temp_path, brand_context=brand_context, - sdh_requested=sdh_requested + sdh_requested=sdh_requested, + _cost_ctx=_cost_ctx, ) # Final safety check for required fields diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index 09ce1ea..a0f4537 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -176,6 +176,11 @@ async def _async_translate_and_synthesize(job_id: str): job_title = job_doc.get("title", "Untitled Job") logger.info(f"✅ Found job document for {job_id} ({job_title}), status: {job_doc.get('status', 'UNKNOWN')}") + _cost_ctx = { + "user_id": job_doc.get("client_id", "system"), + "job_id": job_id, + "project_id": job_doc.get("cost_tracker_project_id"), + } # Check for valid status to process translation # Valid statuses: approved_english, approved_source (legacy), or translating (new workflow) @@ -381,7 +386,8 @@ async def _async_translate_and_synthesize(job_id: str): source_captions_vtt, source_ad_vtt, language, - brief="Standard accessibility content" + brief="Standard accessibility content", + _cost_ctx=_cost_ctx, ) result = await retry_with_backoff(transcreate, max_retries=3) @@ -393,12 +399,14 @@ async def _async_translate_and_synthesize(job_id: str): # TRADITIONAL MODE: Use Gemini translation (6-36x cheaper than Google Translate API) async def translate_captions(): return await gemini_service.translate_vtt( - source_captions_vtt, language, source_language=source_language + source_captions_vtt, language, source_language=source_language, + _cost_ctx=_cost_ctx, ) async def translate_ad(): return await gemini_service.translate_vtt( - source_ad_vtt, language, source_language=source_language + source_ad_vtt, language, source_language=source_language, + _cost_ctx=_cost_ctx, ) translated_captions = await retry_with_backoff(translate_captions, max_retries=3) @@ -425,7 +433,8 @@ async def _async_translate_and_synthesize(job_id: str): if sdh_requested and source_sdh_vtt: async def translate_sdh(): return await gemini_service.translate_vtt( - source_sdh_vtt, language, source_language=source_language + source_sdh_vtt, language, source_language=source_language, + _cost_ctx=_cost_ctx, ) translated_sdh = await retry_with_backoff(translate_sdh, max_retries=3) sdh_gcs_uri = await upload_vtt_to_gcs( diff --git a/backend/app/tasks/tts_synthesis.py b/backend/app/tasks/tts_synthesis.py index afbfb81..64a454c 100644 --- a/backend/app/tasks/tts_synthesis.py +++ b/backend/app/tasks/tts_synthesis.py @@ -24,6 +24,55 @@ from . import celery_app logger = get_logger(__name__) +_TTS_PROVIDER_MODEL_MAP = { + # (provider, model) → cost-tracker provider + model strings + "gemini": "google", + "google": "google_tts", + "elevenlabs": "elevenlabs", +} + +_TTS_MODEL_STRINGS = { + "flash": "gemini-2.5-flash-preview-tts", + "pro": "gemini-2.5-pro-preview-tts", + "standard": "standard", + "wavenet": "wavenet", + "neural2": "neural2", + "elevenlabs": "eleven_multilingual_v2", +} + + +def _record_tts_cost( + provider: str, + model: str, + text: str, + user_id: str, + job_id: str, + project_id: Optional[str], + latency_ms: int, +) -> None: + try: + from ..core.dependencies import get_cost_tracker + ct = get_cost_tracker() + if not ct: + return + ct_provider = _TTS_PROVIDER_MODEL_MAP.get(provider, provider) + ct_model = _TTS_MODEL_STRINGS.get(model, model) + chars = len(text) + asyncio.get_event_loop().create_task( + ct.record( + user_external_id=user_id, + model=ct_model, + provider=ct_provider, + chars=chars, + job_external_id=job_id, + project_external_id=project_id, + latency_ms=latency_ms, + status="success", + ) + ) + except Exception as e: + logger.warning(f"Cost tracker TTS record failed (non-fatal): {e}") + @celery_app.task( bind=True, @@ -48,6 +97,8 @@ def synthesize_cue_task( style_prompt: str, stability: float = 0.5, similarity_boost: float = 0.5, + user_id: Optional[str] = None, + cost_project_id: Optional[str] = None, ) -> dict: """ Synthesize a single AD cue and upload to GCS immediately. @@ -102,6 +153,9 @@ def synthesize_cue_task( f"cue={cue_index}, duration={duration:.2f}s, elapsed={elapsed_ms:.0f}ms" ) + # Record TTS cost (fire-and-forget) + _record_tts_cost(provider, model, text, user_id or "system", job_id, cost_project_id, int(elapsed_ms)) + return { "cue_index": cue_index, "job_id": job_id, diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 943a1b2..4392241 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -42,6 +42,9 @@ python-magic = "^0.4.27" aiohttp = "^3.12.15" jinja2 = "^3.1.6" audioop-lts = {version = "^0.2.2", python = ">=3.13"} +# AI Cost Tracker SDK — install from Bitbucket once published: +# oliver-cost-tracker = {git = "ssh://git@bitbucket.org/zlalani/ai-cost-tracker.git", subdirectory = "sdk"} +# Or temporarily: pip install -e /path/to/ai-cost-tracker/sdk/ [tool.poetry.group.dev.dependencies] pytest = "^7.4.3"