feat(cost): add Whisper transcription cost tracking
Records audio_duration (as chars) + latency_ms to cost tracker after each successful transcription; wrapped in try/except so it never fails the task. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
cff62c51ff
commit
922cb9318e
1 changed files with 36 additions and 2 deletions
|
|
@ -1,6 +1,8 @@
|
|||
"""Celery task for Whisper transcription with Cloud Run fallback."""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import google.auth.transport.requests
|
||||
|
|
@ -8,9 +10,11 @@ import httpx
|
|||
from google.auth import default
|
||||
from google.cloud import storage
|
||||
from google.oauth2 import id_token
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
|
||||
from ..core.config import settings
|
||||
from ..core.logging import get_logger
|
||||
from ..services import cost_tracker
|
||||
from ..services.whisper_service import whisper_service
|
||||
from . import celery_app
|
||||
|
||||
|
|
@ -180,14 +184,15 @@ def transcribe_video_audio_task(self, job_id: str, audio_path: str) -> dict:
|
|||
"""
|
||||
logger.info(f"Starting Whisper transcription task for job {job_id}")
|
||||
|
||||
t_start = time.monotonic()
|
||||
try:
|
||||
# Use Cloud Run if configured, otherwise local
|
||||
if settings.whisper_service_url:
|
||||
logger.info(f"Using Cloud Run Whisper service: {settings.whisper_service_url}")
|
||||
return _transcribe_via_cloud_run(job_id, audio_path)
|
||||
result = _transcribe_via_cloud_run(job_id, audio_path)
|
||||
else:
|
||||
logger.info("Using local Whisper service")
|
||||
return _transcribe_locally(job_id, audio_path)
|
||||
result = _transcribe_locally(job_id, audio_path)
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(f"Cloud Run transcription failed for job {job_id}: {e.response.status_code} - {e.response.text}")
|
||||
|
|
@ -195,3 +200,32 @@ def transcribe_video_audio_task(self, job_id: str, audio_path: str) -> dict:
|
|||
except Exception as e:
|
||||
logger.error(f"Whisper transcription failed for job {job_id}: {e}")
|
||||
raise
|
||||
|
||||
latency_ms = int((time.monotonic() - t_start) * 1000)
|
||||
audio_duration = result.get("audio_duration", 0.0)
|
||||
if audio_duration:
|
||||
try:
|
||||
async def _fetch_job():
|
||||
client = AsyncIOMotorClient(settings.mongodb_uri)
|
||||
try:
|
||||
return await client[settings.mongodb_db].jobs.find_one({"_id": job_id})
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
job_doc = asyncio.run(_fetch_job())
|
||||
user_id = str(job_doc.get("created_by", "")) if job_doc else ""
|
||||
project_id = str(job_doc.get("cost_tracker_project_id", "")) if job_doc else ""
|
||||
cost_tracker.record(
|
||||
model="whisper-1",
|
||||
provider="openai",
|
||||
user_external_id=user_id,
|
||||
project_id=project_id or None,
|
||||
job_external_id=job_id,
|
||||
chars=int(audio_duration),
|
||||
latency_ms=latency_ms,
|
||||
status="success",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cost tracking failed for job {job_id} (non-fatal): {e}")
|
||||
|
||||
return result
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue