**BUG-1 & BUG-2 — Wrong audio plays after re-render / MP3 doesn't match text** Root cause: audio files were named by index (cue_0.mp3, cue_1.mp3). When a cue was inserted or deleted, all following indices shifted but old MP3 files kept their original names, so re-render would play the wrong audio for the wrong cue. Fix: renamed files to cue_N_CONTENTHASH.mp3 and introduced an ad_cue_manifest stored in the job document that maps each cue index to its correct GCS URI. Re-render now reads from the manifest instead of guessing by filename. Also: editing AD cue text in the VTT editor now automatically queues TTS regeneration for changed cues — no more silent mismatches. **BUG-3 — App crash / state desync when uploading VTT or clearing TTS queue** Fixed handleVttFileUpload to only update local editor state after the server confirms the save — previously local state was updated first, so a network error left the UI showing content that wasn't actually saved. Fixed handleClearRegenerationQueue to only remove items from local state if the server removal succeeded — previously all items were cleared regardless. **BUG-4 — AI generates different audio descriptions every time** Added GenerateContentConfig(temperature=0.2, top_p=0.8, top_k=40) to the Gemini API call so output is more consistent across runs. **BUG-5 — On-screen text inconsistently described** Strengthened the AI prompt rule from a vague suggestion to a mandatory requirement with an explicit format: "Text on screen reads: [exact text]". Applied to both gemini_ingestion.md and gemini_ingestion_targeted.md. **BUG-6 — No notification when re-render finishes** Added rendering_qc toast notification and a dismissible green banner that appears in QCDetail when re-render transitions to pending_qc. The banner auto-dismisses after 10 seconds. Also increased WebSocket reconnect attempts from 5 to 15 and capped backoff at 60s to prevent falling back to manual refresh. **BUG-7 — Timeline preview looks accurate but isn't after edits** Added isStale prop to TimelinePreview. The timeline now shows an amber tint and "Preview may be outdated" label whenever there are unsaved pause point changes, pending TTS regenerations, or a new VTT has been uploaded. **BUG-8 — ElevenLabs API errors break TTS with no fallback** Added try/except fallback chain in _synthesize_single_cue: if the configured provider fails, it automatically retries with google, then gemini. **BUG-9 — Concurrent re-render requests cause race conditions** Made the PENDING_QC → RENDERING_QC status transition conditional (only succeeds if the job is still in PENDING_QC). Returns HTTP 409 if a re-render is already in progress. The completion transition back to PENDING_QC is also conditional so a cancelled/overridden render doesn't corrupt job state. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
662 lines
27 KiB
Python
662 lines
27 KiB
Python
"""Celery task for rendering accessible video with embedded audio descriptions."""
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from datetime import datetime
|
|
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
from ..lib.vtt import VTTParser
|
|
from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata
|
|
from ..schemas.whisper import CachedWhisperTranscript, CachedWordTimestamp
|
|
from ..services.gcs import gcs_service
|
|
from ..services.video_renderer import video_renderer_service
|
|
from ..services.vtt_retimer import vtt_retimer_service
|
|
from ..services.whisper_service import WordTimestamp, whisper_service
|
|
from . import celery_app
|
|
from .translate_and_synthesize import broadcast_status_update
|
|
from .tts_synthesis import parse_cue_index_from_blob_name
|
|
from .whisper_transcribe import transcribe_video_audio_task
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@celery_app.task(bind=True, time_limit=7200, soft_time_limit=7000) # 2 hour limit for video rendering
|
|
def render_accessible_video_task(self, job_id: str, language: str):
|
|
"""
|
|
Pipeline 3: Accessible Video Rendering
|
|
Triggered after TTS generation completes for a language when accessible_video_mp4 is requested.
|
|
|
|
Steps:
|
|
1. Download source video and per-cue AD MP3s from GCS
|
|
2. Get AD VTT content and calculate cue durations
|
|
3. Call Gemini for placement analysis
|
|
4. Render accessible video (overlay or pause-insert)
|
|
5. If pause-insert: generate re-timed caption VTT
|
|
6. Upload outputs to GCS
|
|
7. Update job document
|
|
"""
|
|
logger.info(f"Starting accessible video render for job {job_id}, language {language}")
|
|
|
|
try:
|
|
result = asyncio.run(_async_render_accessible_video(job_id, language))
|
|
logger.info(f"Accessible video render completed for job {job_id}, language {language}")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}")
|
|
import traceback
|
|
logger.error(f"Full traceback: {traceback.format_exc()}")
|
|
raise
|
|
|
|
|
|
async def _async_render_accessible_video(job_id: str, language: str):
|
|
"""Async implementation of accessible video rendering."""
|
|
logger.info(f"Async render started for job {job_id}, language {language}")
|
|
|
|
client = AsyncIOMotorClient(settings.mongodb_uri)
|
|
db = client[settings.mongodb_db]
|
|
|
|
try:
|
|
# Get job details
|
|
job_doc = await db.jobs.find_one({"_id": job_id})
|
|
if not job_doc:
|
|
raise ValueError(f"Job {job_id} not found")
|
|
|
|
job_title = job_doc.get("title", "Untitled Job")
|
|
|
|
# Verify accessible video is requested
|
|
if not job_doc["requested_outputs"].get("accessible_video_mp4"):
|
|
logger.info(f"Accessible video not requested for job {job_id}")
|
|
return
|
|
|
|
# Update progress to rendering and transition job status if needed
|
|
update_fields = {
|
|
f"accessible_video_progress.{language}": {
|
|
"status": "rendering",
|
|
"started_at": datetime.utcnow()
|
|
},
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
# Transition job status to RENDERING_VIDEO if currently in TTS_GENERATING
|
|
if job_doc["status"] == JobStatus.TTS_GENERATING.value:
|
|
update_fields["status"] = JobStatus.RENDERING_VIDEO.value
|
|
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{"$set": update_fields}
|
|
)
|
|
|
|
# If we transitioned to RENDERING_VIDEO, also add to history
|
|
if job_doc["status"] == JobStatus.TTS_GENERATING.value:
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$push": {
|
|
"review.history": {
|
|
"at": datetime.utcnow(),
|
|
"status": JobStatus.RENDERING_VIDEO.value,
|
|
"by": "system"
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
# Broadcast status update
|
|
broadcast_status_update(
|
|
job_id,
|
|
JobStatus.RENDERING_VIDEO.value if job_doc["status"] == JobStatus.TTS_GENERATING.value else "rendering",
|
|
job_title=job_title,
|
|
message=f"Rendering accessible video for {language.upper()}"
|
|
)
|
|
|
|
# Use TMPDIR env var if set (for shared volume between workers)
|
|
temp_base = os.environ.get('TMPDIR', None)
|
|
with tempfile.TemporaryDirectory(dir=temp_base) as temp_dir:
|
|
# 1. Download source video from GCS
|
|
source_video_gcs = job_doc["source"]["gcs_uri"]
|
|
source_blob_path = source_video_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
|
|
source_video_path = os.path.join(temp_dir, "source.mp4")
|
|
|
|
logger.info(f"Downloading source video from {source_blob_path}")
|
|
source_blob = gcs_service.bucket.blob(source_blob_path)
|
|
source_blob.download_to_filename(source_video_path)
|
|
|
|
# 2. Get language outputs
|
|
lang_output = job_doc["outputs"].get(language)
|
|
if not lang_output:
|
|
raise ValueError(f"No outputs found for language {language}")
|
|
|
|
# 3. Download AD VTT content
|
|
ad_vtt_gcs = lang_output.get("ad_vtt_gcs")
|
|
if not ad_vtt_gcs:
|
|
raise ValueError(f"No AD VTT found for language {language}")
|
|
|
|
ad_blob_path = ad_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
|
|
ad_blob = gcs_service.bucket.blob(ad_blob_path)
|
|
ad_vtt_content = ad_blob.download_as_text()
|
|
|
|
# 4. Download per-cue AD MP3 segments
|
|
ad_cues_prefix = lang_output.get("ad_cues_gcs_prefix")
|
|
if not ad_cues_prefix:
|
|
raise ValueError(f"No AD cue segments found for language {language}")
|
|
|
|
# Download per-cue MP3s — prefer manifest (stable across cue edits), fall back to blob listing
|
|
ad_segments = []
|
|
cue_durations = []
|
|
from pydub import AudioSegment
|
|
|
|
ad_cue_manifest = lang_output.get("ad_cue_manifest")
|
|
if ad_cue_manifest:
|
|
logger.info(f"Using ad_cue_manifest ({len(ad_cue_manifest)} entries) for MP3 download")
|
|
for entry in sorted(ad_cue_manifest, key=lambda e: e["cue_index"]):
|
|
cue_index = entry["cue_index"]
|
|
gcs_uri = entry["gcs_uri"]
|
|
blob_path = gcs_uri.replace(f"gs://{settings.gcs_bucket}/", "")
|
|
local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3")
|
|
gcs_service.bucket.blob(blob_path).download_to_filename(local_path)
|
|
ad_segments.append((cue_index, local_path))
|
|
audio = AudioSegment.from_mp3(local_path)
|
|
cue_durations.append(len(audio) / 1000.0)
|
|
else:
|
|
logger.info("No ad_cue_manifest found, falling back to legacy index-based blob listing")
|
|
prefix_path = ad_cues_prefix.replace(f"gs://{settings.gcs_bucket}/", "")
|
|
blobs = list(gcs_service.bucket.list_blobs(prefix=prefix_path))
|
|
cue_blobs = [
|
|
(b, parse_cue_index_from_blob_name(b.name))
|
|
for b in blobs if b.name.endswith(".mp3")
|
|
]
|
|
cue_blobs = [(b, idx) for b, idx in cue_blobs if idx is not None]
|
|
cue_blobs.sort(key=lambda x: x[1])
|
|
for blob, cue_index in cue_blobs:
|
|
local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3")
|
|
blob.download_to_filename(local_path)
|
|
ad_segments.append((cue_index, local_path))
|
|
audio = AudioSegment.from_mp3(local_path)
|
|
cue_durations.append(len(audio) / 1000.0)
|
|
|
|
logger.info(f"Downloaded {len(ad_segments)} AD cue segments")
|
|
|
|
# 5. Get method from job's requested_outputs (user selected at QC approval)
|
|
method = job_doc["requested_outputs"].get("accessible_video_method") or "pause_insert"
|
|
logger.info(f"Using user-selected accessible video method: {method}")
|
|
|
|
# 5a. Build placements from AD VTT cues (instead of Gemini video analysis)
|
|
placements = _build_placements_from_ad_vtt(ad_vtt_content, cue_durations)
|
|
logger.info(f"Built {len(placements)} placements from AD VTT cues")
|
|
|
|
# Build analysis dict compatible with existing code
|
|
analysis = {
|
|
"method": method,
|
|
"method_rationale": "User-selected at QC Review approval",
|
|
"placements": placements,
|
|
"total_added_duration": sum(cue_durations) if method == "pause_insert" else 0,
|
|
"warnings": []
|
|
}
|
|
|
|
# 5b. If pause-insert method, refine pause points using Whisper
|
|
if method == "pause_insert":
|
|
logger.info("Refining pause points with Whisper speech analysis...")
|
|
analysis, whisper_warnings = await _refine_pause_points_with_whisper(
|
|
job_id, source_video_path, analysis, db, temp_dir
|
|
)
|
|
|
|
if whisper_warnings:
|
|
# Add warnings to analysis for visibility
|
|
existing_warnings = analysis.get("warnings", [])
|
|
analysis["warnings"] = existing_warnings + whisper_warnings
|
|
logger.info(f"Whisper refinement complete with {len(whisper_warnings)} warnings")
|
|
|
|
# 6. Render accessible video with segment persistence for QC editing
|
|
output_video_path = os.path.join(temp_dir, "accessible_video.mp4")
|
|
gcs_segment_prefix = f"{job_id}/{language}/segments/"
|
|
|
|
logger.info(f"Rendering accessible video using {method} method with segment persistence...")
|
|
rendered_path, updated_placements, segment_metadata, pause_points = await video_renderer_service.render_accessible_video(
|
|
source_video_path,
|
|
ad_segments,
|
|
analysis,
|
|
output_video_path,
|
|
persist_segments=True,
|
|
gcs_segment_prefix=gcs_segment_prefix
|
|
)
|
|
|
|
# Update analysis with actual freeze durations for VTT retiming
|
|
if updated_placements:
|
|
analysis["placements"] = updated_placements
|
|
logger.info(f"Updated {len(updated_placements)} placements with actual freeze durations")
|
|
|
|
# Build edit state for QC review if segment metadata was returned
|
|
edit_state = None
|
|
if segment_metadata and pause_points:
|
|
edit_state = AccessibleVideoEditState(
|
|
pause_points=pause_points,
|
|
video_segments=segment_metadata,
|
|
tts_regeneration_queue=[],
|
|
last_render_at=datetime.utcnow(),
|
|
whisper_refine_enabled=False
|
|
)
|
|
logger.info(f"Built edit state with {len(segment_metadata)} segments and {len(pause_points)} pause points")
|
|
|
|
# 7. Upload rendered video to GCS
|
|
video_blob_path = f"{job_id}/{language}/accessible_video.mp4"
|
|
video_blob = gcs_service.bucket.blob(video_blob_path)
|
|
video_blob.content_type = "video/mp4"
|
|
video_blob.upload_from_filename(output_video_path)
|
|
|
|
video_gcs_uri = f"gs://{settings.gcs_bucket}/{video_blob_path}"
|
|
logger.info(f"Uploaded accessible video to {video_gcs_uri}")
|
|
|
|
# 8. If pause-insert, generate re-timed captions VTT
|
|
retimed_captions_gcs_uri = None
|
|
if method == "pause_insert":
|
|
# Download original captions VTT
|
|
captions_vtt_gcs = lang_output.get("captions_vtt_gcs")
|
|
if captions_vtt_gcs:
|
|
captions_blob_path = captions_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
|
|
captions_blob = gcs_service.bucket.blob(captions_blob_path)
|
|
original_captions_vtt = captions_blob.download_as_text()
|
|
|
|
# Re-time captions
|
|
retimed_captions = vtt_retimer_service.retime_for_pause_insert(
|
|
original_captions_vtt,
|
|
analysis
|
|
)
|
|
|
|
# Upload re-timed captions
|
|
retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt"
|
|
retimed_blob = gcs_service.bucket.blob(retimed_blob_path)
|
|
retimed_blob.content_type = "text/vtt"
|
|
retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt")
|
|
|
|
retimed_captions_gcs_uri = f"gs://{settings.gcs_bucket}/{retimed_blob_path}"
|
|
logger.info(f"Uploaded re-timed captions to {retimed_captions_gcs_uri}")
|
|
|
|
# 9. Update job document with results (including edit state for QC review)
|
|
update_fields = {
|
|
f"outputs.{language}.accessible_video_gcs": video_gcs_uri,
|
|
f"outputs.{language}.accessible_video_method": method,
|
|
f"outputs.{language}.video_segments_gcs_prefix": f"gs://{settings.gcs_bucket}/{gcs_segment_prefix}",
|
|
f"accessible_video_progress.{language}": {
|
|
"status": "completed",
|
|
"method": method,
|
|
"started_at": job_doc.get("accessible_video_progress", {}).get(language, {}).get("started_at"),
|
|
"completed_at": datetime.utcnow()
|
|
},
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
if retimed_captions_gcs_uri:
|
|
update_fields[f"outputs.{language}.retimed_captions_vtt_gcs"] = retimed_captions_gcs_uri
|
|
|
|
# Store edit state for QC review accessible video editing
|
|
if edit_state:
|
|
update_fields[f"outputs.{language}.accessible_video_edit_state"] = edit_state.model_dump()
|
|
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{"$set": update_fields}
|
|
)
|
|
|
|
# Broadcast completion
|
|
broadcast_status_update(
|
|
job_id,
|
|
"asset_ready",
|
|
job_title=job_title,
|
|
message=f"Accessible video ready for {language.upper()} ({method} method)"
|
|
)
|
|
|
|
# Check if all accessible videos are complete
|
|
await _check_accessible_video_completion(job_id, db)
|
|
|
|
logger.info(f"Accessible video render complete for job {job_id}/{language}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}")
|
|
|
|
# Update progress to failed
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
f"accessible_video_progress.{language}": {
|
|
"status": "failed",
|
|
"error_message": str(e),
|
|
"completed_at": datetime.utcnow()
|
|
},
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
}
|
|
)
|
|
|
|
# Check if all videos are now finished (completed or failed) to update job status
|
|
# This ensures the job transitions to RENDER_FAILED if all languages have finished
|
|
await _check_accessible_video_completion(job_id, db)
|
|
|
|
raise
|
|
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def _build_placements_from_ad_vtt(ad_vtt_content: str, cue_durations: list[float]) -> list[dict]:
|
|
"""
|
|
Build placement instructions from AD VTT cues and TTS durations.
|
|
|
|
Uses AD VTT start_time as initial pause_point for each cue.
|
|
This replaces the Gemini analyze_accessible_video_placement call.
|
|
|
|
Args:
|
|
ad_vtt_content: The AD VTT content string
|
|
cue_durations: List of TTS audio durations in seconds (same order as VTT cues)
|
|
|
|
Returns:
|
|
List of placement dicts compatible with Whisper refinement
|
|
"""
|
|
cues = VTTParser.parse(ad_vtt_content)
|
|
|
|
if len(cues) != len(cue_durations):
|
|
logger.warning(
|
|
f"Cue count mismatch: {len(cues)} VTT cues, {len(cue_durations)} durations. "
|
|
f"Using minimum count."
|
|
)
|
|
|
|
placements = []
|
|
for i, cue in enumerate(cues):
|
|
if i >= len(cue_durations):
|
|
break
|
|
|
|
placements.append({
|
|
"ad_cue_index": i,
|
|
"original_start_time": cue.start_time,
|
|
"original_end_time": cue.end_time,
|
|
"target_start_time": cue.start_time,
|
|
"ad_duration": cue_durations[i],
|
|
"pause_point": cue.start_time, # Use VTT start_time as initial pause point
|
|
"resume_from": cue.start_time, # Will be refined by Whisper
|
|
"pause_point_rationale": "Derived from AD VTT cue start time"
|
|
})
|
|
|
|
return placements
|
|
|
|
|
|
async def _check_accessible_video_completion(job_id: str, db):
|
|
"""Check if all accessible videos are complete and update job status accordingly."""
|
|
job_doc = await db.jobs.find_one({"_id": job_id})
|
|
if not job_doc:
|
|
return
|
|
|
|
progress = job_doc.get("accessible_video_progress", {})
|
|
requested_languages = job_doc["requested_outputs"]["languages"]
|
|
|
|
# Check status of all requested languages
|
|
all_finished = True # All languages have either completed or failed
|
|
any_failed = False
|
|
failed_languages = []
|
|
|
|
for language in requested_languages:
|
|
lang_progress = progress.get(language, {})
|
|
status = lang_progress.get("status", "pending")
|
|
|
|
if status == "failed":
|
|
any_failed = True
|
|
failed_languages.append({
|
|
"language": language,
|
|
"error": lang_progress.get("error_message", "Unknown error")
|
|
})
|
|
elif status not in ["completed", "failed"]:
|
|
# Still pending or rendering
|
|
all_finished = False
|
|
|
|
job_title = job_doc.get("title", "Untitled Job")
|
|
|
|
# Only update job status if all languages have finished processing
|
|
if all_finished:
|
|
if any_failed:
|
|
# Some or all videos failed - transition to RENDER_FAILED
|
|
logger.error(f"Accessible video rendering failed for job {job_id}: {len(failed_languages)} language(s) failed")
|
|
|
|
# Build error summary
|
|
error_summary = "; ".join([f"{f['language']}: {f['error']}" for f in failed_languages])
|
|
|
|
if job_doc["status"] in [JobStatus.TTS_GENERATING.value, JobStatus.RENDERING_VIDEO.value]:
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
"status": JobStatus.RENDER_FAILED.value,
|
|
"error": {
|
|
"type": "render_failure",
|
|
"failed_languages": failed_languages,
|
|
"message": f"Video rendering failed for {len(failed_languages)} language(s): {error_summary}",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
},
|
|
"updated_at": datetime.utcnow()
|
|
},
|
|
"$push": {
|
|
"review.history": {
|
|
"at": datetime.utcnow(),
|
|
"status": JobStatus.RENDER_FAILED.value,
|
|
"by": "system",
|
|
"notes": f"Rendering failed for: {', '.join([f['language'] for f in failed_languages])}"
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
broadcast_status_update(
|
|
job_id,
|
|
JobStatus.RENDER_FAILED.value,
|
|
job_title=job_title,
|
|
message=f"{job_title} - Video rendering failed for {len(failed_languages)} language(s). Manual reprocessing required."
|
|
)
|
|
else:
|
|
# All videos completed successfully
|
|
# NEW WORKFLOW: Go to PENDING_QC for QC review (not PENDING_FINAL_REVIEW)
|
|
logger.info(f"All accessible videos complete for job {job_id}")
|
|
|
|
if job_doc["status"] in [JobStatus.TTS_GENERATING.value, JobStatus.RENDERING_VIDEO.value]:
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
"status": JobStatus.PENDING_QC.value,
|
|
"updated_at": datetime.utcnow()
|
|
},
|
|
"$push": {
|
|
"review.history": {
|
|
"at": datetime.utcnow(),
|
|
"status": JobStatus.PENDING_QC.value,
|
|
"by": "system"
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
broadcast_status_update(
|
|
job_id,
|
|
JobStatus.PENDING_QC.value,
|
|
job_title=job_title,
|
|
message=f"{job_title} has all accessible videos complete - ready for QC Review"
|
|
)
|
|
|
|
|
|
async def _refine_pause_points_with_whisper(
|
|
job_id: str,
|
|
video_path: str,
|
|
analysis: dict,
|
|
db,
|
|
temp_dir: str
|
|
) -> tuple[dict, list[str]]:
|
|
"""
|
|
Refine Gemini pause points using Whisper speech gap detection.
|
|
|
|
This function:
|
|
1. Extracts audio and runs Whisper transcription via dedicated queue (always fresh, no caching)
|
|
2. Saves transcript to job document for debugging/auditing
|
|
3. Identifies speech gaps from word timestamps
|
|
4. Snaps all pause points to nearest valid gaps AFTER the original point
|
|
|
|
Args:
|
|
job_id: Job ID for caching
|
|
video_path: Path to source video
|
|
analysis: Gemini analysis dict with placements
|
|
db: MongoDB database handle
|
|
temp_dir: Temporary directory for extracted audio
|
|
|
|
Returns:
|
|
Tuple of (refined_analysis, warnings)
|
|
"""
|
|
logger.info(f"Refining pause points with Whisper for job {job_id}")
|
|
|
|
# Always generate a fresh Whisper transcript (no caching)
|
|
# This ensures we get accurate word timestamps for the current video file
|
|
audio_path = os.path.join(temp_dir, "source_audio.mp3")
|
|
await _extract_audio_for_whisper(video_path, audio_path)
|
|
|
|
# Dispatch Whisper transcription to dedicated whisper queue
|
|
# Uses same pattern as FFmpeg: dispatch -> poll -> allow_join_result -> get
|
|
logger.info(f"Dispatching Whisper transcription to dedicated queue for job {job_id}")
|
|
try:
|
|
words = await _dispatch_whisper_transcription(job_id, audio_path)
|
|
except Exception as e:
|
|
logger.error(f"Whisper transcription failed for job {job_id}: {e}")
|
|
return analysis, [f"Whisper transcription failed: {str(e)} - using original Gemini timestamps"]
|
|
|
|
if words:
|
|
# Save transcript to job document (for debugging/auditing, not caching)
|
|
transcript_data = CachedWhisperTranscript(
|
|
words=[CachedWordTimestamp(word=w.word, start=w.start, end=w.end) for w in words],
|
|
model_name=settings.whisper_model,
|
|
audio_duration=words[-1].end if words else 0,
|
|
created_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{"$set": {"whisper_transcript": transcript_data.model_dump()}}
|
|
)
|
|
logger.info(f"Saved Whisper transcript with {len(words)} words for job {job_id}")
|
|
|
|
if not words:
|
|
logger.warning(f"No speech detected in video for job {job_id}, using original Gemini timestamps")
|
|
return analysis, ["No speech detected in video - using original Gemini timestamps"]
|
|
|
|
# Identify speech gaps
|
|
gaps = whisper_service.identify_speech_gaps(words)
|
|
logger.info(f"Found {len(gaps)} speech gaps in video for job {job_id}")
|
|
|
|
# Refine pause points (Phase 1: individual refinement, Phase 2: consolidation)
|
|
refined_placements, warnings = whisper_service.refine_all_pause_points(
|
|
analysis.get("placements", []),
|
|
words,
|
|
gaps
|
|
)
|
|
|
|
# Update analysis with refined placements
|
|
refined_analysis = analysis.copy()
|
|
refined_analysis["placements"] = refined_placements
|
|
refined_analysis["whisper_refined"] = True
|
|
|
|
logger.info(f"Pause point refinement complete for job {job_id}: {len(warnings)} warnings")
|
|
|
|
return refined_analysis, warnings
|
|
|
|
|
|
async def _extract_audio_for_whisper(video_path: str, audio_path: str):
|
|
"""
|
|
Extract audio track from video for Whisper transcription.
|
|
|
|
Uses FFmpeg to extract audio at 16kHz mono MP3 (optimal for Whisper).
|
|
"""
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", video_path,
|
|
"-vn", # No video
|
|
"-acodec", "libmp3lame",
|
|
"-ar", "16000", # 16kHz is optimal for Whisper
|
|
"-ac", "1", # Mono
|
|
"-q:a", "5", # Medium quality
|
|
audio_path
|
|
]
|
|
|
|
logger.info(f"Extracting audio for Whisper: {video_path} -> {audio_path}")
|
|
|
|
# Run FFmpeg synchronously in a thread pool
|
|
def run_ffmpeg():
|
|
process = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if process.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg audio extraction failed: {process.stderr}")
|
|
|
|
await asyncio.to_thread(run_ffmpeg)
|
|
logger.info(f"Audio extraction complete: {audio_path}")
|
|
|
|
|
|
async def _dispatch_whisper_transcription(job_id: str, audio_path: str) -> list[WordTimestamp]:
|
|
"""
|
|
Dispatch Whisper transcription to dedicated whisper queue and wait for result.
|
|
|
|
Uses the same pattern as FFmpeg dispatch:
|
|
1. apply_async() to dispatch to the whisper queue
|
|
2. Poll with ready() using async sleep to avoid blocking
|
|
3. Use allow_join_result() context manager
|
|
4. Get result only after task is ready
|
|
|
|
Args:
|
|
job_id: Job ID for logging
|
|
audio_path: Path to extracted audio file
|
|
|
|
Returns:
|
|
List of WordTimestamp objects from transcription
|
|
|
|
Raises:
|
|
Exception: If transcription fails
|
|
"""
|
|
from celery.result import allow_join_result
|
|
|
|
# Dispatch to whisper queue
|
|
task_result = transcribe_video_audio_task.apply_async(
|
|
args=[job_id, audio_path],
|
|
queue='whisper'
|
|
)
|
|
|
|
logger.info(f"Whisper task dispatched for job {job_id}, waiting for completion...")
|
|
|
|
# Poll for result with async sleep to avoid blocking
|
|
# Use longer sleep interval since Whisper takes a while
|
|
poll_count = 0
|
|
while not task_result.ready():
|
|
await asyncio.sleep(1.0)
|
|
poll_count += 1
|
|
if poll_count % 30 == 0: # Log every 30 seconds
|
|
logger.info(f"Still waiting for Whisper transcription for job {job_id}...")
|
|
|
|
# Get result - use allow_join_result since we're calling from within a task
|
|
# This is safe because we've already confirmed the task is complete via ready()
|
|
with allow_join_result():
|
|
result = task_result.get(timeout=30)
|
|
|
|
# Check for task failure
|
|
if task_result.failed():
|
|
raise Exception(f"Whisper task failed: {task_result.result}")
|
|
|
|
# Convert to WordTimestamp objects
|
|
words = [
|
|
WordTimestamp(word=w["word"], start=w["start"], end=w["end"])
|
|
for w in result.get("words", [])
|
|
]
|
|
|
|
logger.info(
|
|
f"Whisper transcription complete for job {job_id}: "
|
|
f"{len(words)} words detected"
|
|
)
|
|
|
|
return words
|