- New services/cost_tracker.py: sync httpx preflight()/record() + async wrappers; BudgetExceeded exception; no-op when COST_TRACKER_BASE_URL is empty - Preflight budget check added before ingestion (Gemini), per-language translation (video-native + traditional), and per-language TTS dispatch - _record_gemini_usage and _record_tts_cost now call cost_tracker directly; removes broken asyncio.get_event_loop() hack from sync Celery worker - Fix: _cost_ctx now threaded into extract_accessibility_targeted (video-native path) - Fix: user_id/cost_project_id now propagated through dispatch_language_tts → synthesize_cue_task.s() and the rerender_accessible_video.py re-render path - Remove oliver-cost-tracker SDK dependency (was commented-out/never installed) - Drop cost_tracker_outbox_path setting and get_cost_tracker() factory - Update COST_TRACKER_BASE_URL default to optical-dev.oliver.solutions in .env.prod.example, docker-compose.yml, and all Cloud Run service yamls - Cloud Run yamls use Secret Manager ref (cost-tracker-api-key) for the API key Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
379 lines
15 KiB
Python
379 lines
15 KiB
Python
import asyncio
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime
|
|
|
|
import ffmpeg
|
|
from celery import Task
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
from ..models.job import JobStatus
|
|
from ..services import cost_tracker
|
|
from ..services.cost_tracker import BudgetExceeded
|
|
from ..services.gcs import gcs_service, upload_vtt_to_gcs
|
|
from ..services.gemini import gemini_service
|
|
from ..services.websocket import connection_manager
|
|
from . import celery_app
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None):
|
|
"""
|
|
Helper function to broadcast job status updates via WebSocket
|
|
Uses sync Redis client for Celery worker context
|
|
"""
|
|
logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, job_title={job_title}, message={message}")
|
|
try:
|
|
import redis as sync_redis
|
|
from ..core.config import settings
|
|
from ..services.websocket import JobStatusUpdate
|
|
from datetime import datetime
|
|
|
|
logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}")
|
|
|
|
# Create status update
|
|
update = JobStatusUpdate(
|
|
job_id=job_id,
|
|
status=status,
|
|
updated_at=datetime.utcnow(),
|
|
job_title=job_title,
|
|
message=message,
|
|
progress=progress
|
|
)
|
|
|
|
logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}")
|
|
|
|
# Create synchronous Redis client
|
|
redis_client = sync_redis.Redis.from_url(
|
|
settings.redis_url,
|
|
encoding="utf-8",
|
|
decode_responses=True
|
|
)
|
|
|
|
logger.info(f"🔊 Redis client created, now publishing to channels")
|
|
|
|
# Publish to channels
|
|
result1 = redis_client.publish("job_status_updates", update.model_dump_json())
|
|
result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json())
|
|
|
|
logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers")
|
|
|
|
# Close connection
|
|
redis_client.close()
|
|
|
|
logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}")
|
|
import traceback
|
|
logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}")
|
|
# Don't let WebSocket failures break the worker task
|
|
|
|
|
|
class AsyncTask(Task):
|
|
"""Base task class that supports async execution"""
|
|
def __call__(self, *args, **kwargs):
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
return loop.run_until_complete(self.run_async(*args, **kwargs))
|
|
finally:
|
|
loop.close()
|
|
|
|
async def run_async(self, *args, **kwargs):
|
|
raise NotImplementedError
|
|
|
|
|
|
class IngestAndAITask(AsyncTask):
|
|
async def run_async(self, job_id: str):
|
|
return await ingest_and_ai_task_impl(job_id)
|
|
|
|
|
|
@celery_app.task(bind=True, base=IngestAndAITask)
|
|
def ingest_and_ai_task(self, job_id: str):
|
|
"""
|
|
Pipeline 1: Ingestion & AI Processing
|
|
Task wrapper that delegates to async implementation
|
|
"""
|
|
# This method is called by AsyncTask.__call__
|
|
pass
|
|
|
|
|
|
async def ingest_and_ai_task_impl(job_id: str):
|
|
"""
|
|
Pipeline 1: Ingestion & AI Processing
|
|
1. Update status to 'ingesting'
|
|
2. Probe video for metadata (duration, codec)
|
|
3. Process with Gemini 2.5 Pro
|
|
4. Generate VTT files
|
|
5. Update status to 'pending_qc'
|
|
"""
|
|
logger.info(f"Starting ingestion and AI processing for job {job_id}")
|
|
|
|
# Connect to MongoDB
|
|
client = AsyncIOMotorClient(settings.mongodb_uri)
|
|
db = client[settings.mongodb_db]
|
|
|
|
try:
|
|
# Get job document to retrieve title for notifications
|
|
job_doc = await db.jobs.find_one({"_id": job_id})
|
|
if not job_doc:
|
|
logger.error(f"Job {job_id} not found in database")
|
|
return
|
|
|
|
job_title = job_doc.get("title", "Untitled Job")
|
|
logger.info(f"Processing job: {job_title}")
|
|
|
|
# Update status to ingesting
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
"status": JobStatus.INGESTING.value,
|
|
"updated_at": datetime.utcnow()
|
|
},
|
|
"$push": {
|
|
"review.history": {
|
|
"at": datetime.utcnow(),
|
|
"status": JobStatus.INGESTING.value,
|
|
"by": "system"
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
# Broadcast ingesting status update
|
|
broadcast_status_update(
|
|
job_id,
|
|
JobStatus.INGESTING.value,
|
|
job_title=job_title,
|
|
message=f"{job_title} is being ingested"
|
|
)
|
|
|
|
# Get job details
|
|
job_doc = await db.jobs.find_one({"_id": job_id})
|
|
if not job_doc:
|
|
raise ValueError(f"Job {job_id} not found")
|
|
|
|
# Download video file temporarily for processing
|
|
source_blob_path = job_doc["source"]["gcs_uri"].replace(f"gs://{settings.gcs_bucket}/", "")
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
|
|
temp_path = temp_file.name
|
|
|
|
# Download from GCS
|
|
blob = gcs_service.bucket.blob(source_blob_path)
|
|
blob.download_to_filename(temp_path)
|
|
|
|
try:
|
|
# Update status to AI processing
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
"status": JobStatus.AI_PROCESSING.value,
|
|
"updated_at": datetime.utcnow()
|
|
},
|
|
"$push": {
|
|
"review.history": {
|
|
"at": datetime.utcnow(),
|
|
"status": JobStatus.AI_PROCESSING.value,
|
|
"by": "system"
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
# Broadcast AI processing status update
|
|
broadcast_status_update(
|
|
job_id,
|
|
JobStatus.AI_PROCESSING.value,
|
|
job_title=job_title,
|
|
message=f"{job_title} is being processed by AI"
|
|
)
|
|
|
|
# Probe video for metadata
|
|
duration = await _get_video_duration(temp_path)
|
|
|
|
# Update source with duration
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{"$set": {"source.duration_s": duration}}
|
|
)
|
|
|
|
# Process with Gemini
|
|
brand_context = job_doc.get("brand_context")
|
|
sdh_requested = job_doc.get("requested_outputs", {}).get("sdh_vtt", False)
|
|
_cost_ctx = {
|
|
"user_id": job_doc.get("client_id", "system"),
|
|
"job_id": job_id,
|
|
"project_id": job_doc.get("cost_tracker_project_id"),
|
|
}
|
|
await cost_tracker.aio_preflight(
|
|
model=gemini_service.model_name,
|
|
user_external_id=_cost_ctx["user_id"],
|
|
project_id=_cost_ctx["project_id"],
|
|
)
|
|
ai_result = await gemini_service.extract_accessibility(
|
|
temp_path,
|
|
brand_context=brand_context,
|
|
sdh_requested=sdh_requested,
|
|
_cost_ctx=_cost_ctx,
|
|
)
|
|
|
|
# Final safety check for required fields
|
|
required_fields = ["captions_vtt", "audio_description_vtt"]
|
|
missing_fields = [field for field in required_fields if field not in ai_result]
|
|
|
|
if missing_fields:
|
|
logger.error(f"Missing required fields after AI processing: {missing_fields}")
|
|
# Create fallback content for missing fields
|
|
if "audio_description_vtt" in missing_fields:
|
|
ai_result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements."
|
|
logger.info("Created fallback audio_description_vtt")
|
|
|
|
# Get detected language from Gemini response
|
|
detected_language = ai_result.get("language", "en")
|
|
language_hint = job_doc["source"].get("language_hint")
|
|
initial_language = job_doc["source"].get("language", "en")
|
|
|
|
# Log if there's a mismatch between hint/initial and detected language
|
|
if language_hint and language_hint != detected_language:
|
|
logger.warning(
|
|
f"Language mismatch for job {job_id}: "
|
|
f"hint={language_hint}, detected={detected_language}"
|
|
)
|
|
elif initial_language != "auto" and initial_language != detected_language:
|
|
logger.info(
|
|
f"Language detection for job {job_id}: "
|
|
f"initial={initial_language}, detected={detected_language}"
|
|
)
|
|
|
|
# Use detected language for output storage
|
|
source_language = detected_language
|
|
logger.info(f"Using detected language '{source_language}' for job {job_id}")
|
|
|
|
# Upload VTT files to GCS using detected language
|
|
captions_gcs_uri = await upload_vtt_to_gcs(
|
|
ai_result["captions_vtt"],
|
|
f"{job_id}/{source_language}/captions.vtt"
|
|
)
|
|
|
|
ad_gcs_uri = await upload_vtt_to_gcs(
|
|
ai_result["audio_description_vtt"],
|
|
f"{job_id}/{source_language}/ad.vtt"
|
|
)
|
|
|
|
# Upload SDH VTT if generated
|
|
sdh_gcs_uri = None
|
|
if sdh_requested and ai_result.get("sdh_captions_vtt"):
|
|
sdh_gcs_uri = await upload_vtt_to_gcs(
|
|
ai_result["sdh_captions_vtt"],
|
|
f"{job_id}/{source_language}/sdh_captions.vtt"
|
|
)
|
|
|
|
# Generate descriptive transcript (WCAG 2.1 1.2.1)
|
|
transcript_gcs_uri = None
|
|
try:
|
|
from ..services.descriptive_transcript import generate_descriptive_transcript
|
|
transcript_text = generate_descriptive_transcript(
|
|
ai_result["captions_vtt"],
|
|
ai_result["audio_description_vtt"]
|
|
)
|
|
if transcript_text:
|
|
transcript_gcs_uri = await upload_vtt_to_gcs(
|
|
transcript_text,
|
|
f"{job_id}/{source_language}/descriptive_transcript.txt"
|
|
)
|
|
logger.info(f"Generated descriptive transcript for job {job_id}, language {source_language}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate descriptive transcript for job {job_id}: {e}")
|
|
|
|
source_lang_output = {
|
|
"captions_vtt_gcs": captions_gcs_uri,
|
|
"ad_vtt_gcs": ad_gcs_uri,
|
|
}
|
|
if sdh_gcs_uri:
|
|
source_lang_output["sdh_captions_vtt_gcs"] = sdh_gcs_uri
|
|
if transcript_gcs_uri:
|
|
source_lang_output["descriptive_transcript_gcs"] = transcript_gcs_uri
|
|
|
|
# Update job with AI results, detected language, and outputs
|
|
# Set status to TRANSLATING to trigger translation pipeline before QC
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
"status": JobStatus.TRANSLATING.value,
|
|
"source.language": source_language, # Update with detected language
|
|
"source.detected_language": detected_language,
|
|
"ai.ingestion_json": ai_result,
|
|
"ai.confidence": ai_result["confidence"],
|
|
f"outputs.{source_language}": source_lang_output,
|
|
"updated_at": datetime.utcnow()
|
|
},
|
|
"$push": {
|
|
"review.history": {
|
|
"at": datetime.utcnow(),
|
|
"status": JobStatus.TRANSLATING.value,
|
|
"by": "system"
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
# Broadcast status update
|
|
broadcast_status_update(
|
|
job_id,
|
|
JobStatus.TRANSLATING.value,
|
|
job_title=job_title,
|
|
message=f"{job_title} AI processing complete, starting translation pipeline"
|
|
)
|
|
|
|
logger.info(f"AI processing complete for job {job_id}, triggering translation pipeline")
|
|
|
|
# Trigger translation and synthesis pipeline
|
|
# This will process all translations, TTS, and accessible video BEFORE QC review
|
|
from .translate_and_synthesize import translate_and_synthesize_task
|
|
translate_and_synthesize_task.delay(job_id)
|
|
|
|
finally:
|
|
# Clean up temp file
|
|
os.unlink(temp_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Ingestion and AI processing failed for job {job_id}: {e}")
|
|
|
|
# Update job with error
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
"error": {
|
|
"type": "ingestion_failure",
|
|
"message": str(e),
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
},
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
}
|
|
)
|
|
|
|
raise
|
|
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
async def _get_video_duration(video_path: str) -> float:
|
|
"""Get video duration using ffprobe"""
|
|
try:
|
|
probe = ffmpeg.probe(video_path)
|
|
duration = float(probe['streams'][0]['duration'])
|
|
return duration
|
|
except Exception as e:
|
|
logger.warning(f"Could not determine video duration: {e}")
|
|
return 0.0
|