diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index 81a786d..e55f397 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -32,6 +32,17 @@ from ...schemas.job import ( VttTimingAdjustRequest, VttUpdateRequest, ) +from ...schemas.accessible_video import ( + AccessibleVideoEditStateResponse, + PausePointResponse, + PausePointUpdateRequest, + RerenderAccessibleVideoRequest, + TTSRegenerationItem, + TTSRegenerationQueueRequest, + TTSRegenerationRemoveRequest, + VideoSegmentResponse, +) +from ...models.job import TTSRegenerationRequest from ...services.websocket import connection_manager from ...services.gcs import ( gcs_service, @@ -1455,3 +1466,404 @@ async def validate_job_assets( errors=errors, warnings=[] # Can be extended for non-blocking warnings ) + + +# ============================================================================== +# Accessible Video QC Editing Endpoints +# ============================================================================== + + +@router.get("/{job_id}/accessible-video/{language}/edit-state", response_model=AccessibleVideoEditStateResponse) +async def get_accessible_video_edit_state( + job_id: str, + language: str, + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN)), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Get current pause points, segment metadata, and TTS regeneration queue for QC editing.""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Job not found" + ) + + # Check job is in QC status + if job_doc["status"] not in [JobStatus.PENDING_QC.value, JobStatus.RENDERING_QC.value]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Job must be in pending_qc or rendering_qc status (current: {job_doc['status']})" + ) + + # Get language outputs + lang_output = job_doc.get("outputs", {}).get(language) + if not lang_output: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No outputs found for language {language}" + ) + + # Get edit state + edit_state = lang_output.get("accessible_video_edit_state") + if not edit_state: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No accessible video edit state found for language {language}" + ) + + # Calculate total duration from segments + segments = edit_state.get("video_segments", []) + total_duration_ms = sum(s.get("duration_ms", 0) for s in segments) + + # Get signed URL for accessible video + accessible_video_gcs = lang_output.get("accessible_video_gcs") + accessible_video_url = None + if accessible_video_gcs: + blob_path = accessible_video_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + blob = gcs_service.bucket.blob(blob_path) + accessible_video_url = blob.generate_signed_url(expiration=3600) + + # Convert to response format + return AccessibleVideoEditStateResponse( + pause_points=[ + PausePointResponse( + cue_index=pp.get("cue_index"), + original_ms=pp.get("original_ms"), + adjusted_ms=pp.get("adjusted_ms"), + min_bound_ms=pp.get("min_bound_ms"), + max_bound_ms=pp.get("max_bound_ms") + ) + for pp in edit_state.get("pause_points", []) + ], + video_segments=[ + VideoSegmentResponse( + segment_index=seg.get("segment_index"), + start_ms=seg.get("start_ms"), + end_ms=seg.get("end_ms"), + gcs_uri=seg.get("gcs_uri"), + duration_ms=seg.get("duration_ms"), + is_freeze_frame=seg.get("is_freeze_frame", False), + cue_index=seg.get("cue_index") + ) + for seg in segments + ], + tts_regeneration_queue=[ + TTSRegenerationItem( + cue_index=req.get("cue_index"), + requested_at=req.get("requested_at"), + new_text=req.get("new_text"), + status=req.get("status", "pending"), + error_message=req.get("error_message") + ) + for req in edit_state.get("tts_regeneration_queue", []) + ], + last_render_at=edit_state.get("last_render_at"), + total_duration_ms=total_duration_ms, + accessible_video_url=accessible_video_url + ) + + +@router.patch("/{job_id}/accessible-video/{language}/pause-points/{cue_index}", response_model=PausePointResponse) +async def update_pause_point( + job_id: str, + language: str, + cue_index: int, + request: PausePointUpdateRequest, + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN)), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Update a single pause point timing with millisecond precision.""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Job not found" + ) + + # Check job is in QC status + if job_doc["status"] not in [JobStatus.PENDING_QC.value]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Job must be in pending_qc status for editing (current: {job_doc['status']})" + ) + + # Get edit state + lang_output = job_doc.get("outputs", {}).get(language) + if not lang_output: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No outputs found for language {language}" + ) + + edit_state = lang_output.get("accessible_video_edit_state") + if not edit_state: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No edit state found for language {language}" + ) + + # Find the pause point + pause_points = edit_state.get("pause_points", []) + pause_point = next((pp for pp in pause_points if pp.get("cue_index") == cue_index), None) + if not pause_point: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pause point for cue {cue_index} not found" + ) + + # Validate bounds + min_bound = pause_point.get("min_bound_ms", 0) + max_bound = pause_point.get("max_bound_ms", float("inf")) + + if request.adjusted_ms < min_bound: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Adjusted value {request.adjusted_ms}ms is below minimum bound {min_bound}ms" + ) + if request.adjusted_ms > max_bound: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Adjusted value {request.adjusted_ms}ms is above maximum bound {max_bound}ms" + ) + + # Update the pause point + pause_point["adjusted_ms"] = request.adjusted_ms + + # Save back to database + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"outputs.{language}.accessible_video_edit_state.pause_points": pause_points, + "updated_at": datetime.utcnow() + } + } + ) + + logger.info(f"Updated pause point for cue {cue_index} in job {job_id}/{language}: {request.adjusted_ms}ms") + + return PausePointResponse( + cue_index=pause_point["cue_index"], + original_ms=pause_point["original_ms"], + adjusted_ms=pause_point["adjusted_ms"], + min_bound_ms=pause_point["min_bound_ms"], + max_bound_ms=pause_point["max_bound_ms"] + ) + + +@router.post("/{job_id}/accessible-video/{language}/tts-regeneration") +async def queue_tts_regeneration( + job_id: str, + language: str, + request: TTSRegenerationQueueRequest, + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN)), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Queue TTS regeneration for specific cues (uses current AD VTT text).""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Job not found" + ) + + # Check job is in QC status + if job_doc["status"] not in [JobStatus.PENDING_QC.value]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Job must be in pending_qc status for editing (current: {job_doc['status']})" + ) + + # Get edit state + lang_output = job_doc.get("outputs", {}).get(language) + if not lang_output: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No outputs found for language {language}" + ) + + edit_state = lang_output.get("accessible_video_edit_state") + if not edit_state: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No edit state found for language {language}" + ) + + # Get current queue + current_queue = edit_state.get("tts_regeneration_queue", []) + existing_cues = {req.get("cue_index") for req in current_queue} + + # Add new requests (avoid duplicates) + added = [] + for cue_idx in request.cue_indices: + if cue_idx not in existing_cues: + new_request = { + "cue_index": cue_idx, + "requested_at": datetime.utcnow().isoformat(), + "new_text": None, # Will use current VTT text + "status": "pending", + "error_message": None + } + current_queue.append(new_request) + added.append(cue_idx) + + # Save back to database + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"outputs.{language}.accessible_video_edit_state.tts_regeneration_queue": current_queue, + "updated_at": datetime.utcnow() + } + } + ) + + logger.info(f"Queued TTS regeneration for cues {added} in job {job_id}/{language}") + + return {"message": f"Queued {len(added)} cue(s) for regeneration", "queued_cues": added} + + +@router.delete("/{job_id}/accessible-video/{language}/tts-regeneration/{cue_index}") +async def remove_tts_regeneration( + job_id: str, + language: str, + cue_index: int, + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN)), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Remove a cue from the TTS regeneration queue.""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Job not found" + ) + + # Get edit state + lang_output = job_doc.get("outputs", {}).get(language) + if not lang_output or not lang_output.get("accessible_video_edit_state"): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No edit state found for language {language}" + ) + + edit_state = lang_output["accessible_video_edit_state"] + current_queue = edit_state.get("tts_regeneration_queue", []) + + # Remove the cue from queue + new_queue = [req for req in current_queue if req.get("cue_index") != cue_index] + + if len(new_queue) == len(current_queue): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Cue {cue_index} not in regeneration queue" + ) + + # Save back to database + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"outputs.{language}.accessible_video_edit_state.tts_regeneration_queue": new_queue, + "updated_at": datetime.utcnow() + } + } + ) + + logger.info(f"Removed cue {cue_index} from TTS regeneration queue for job {job_id}/{language}") + + return {"message": f"Removed cue {cue_index} from regeneration queue"} + + +@router.post("/{job_id}/accessible-video/{language}/re-render", response_model=JobResponse) +async def trigger_accessible_video_rerender( + job_id: str, + language: str, + request: RerenderAccessibleVideoRequest, + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN)), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """ + Trigger re-synthesis of accessible video with QC changes. + - Regenerates only queued TTS segments (others reuse existing MP3s) + - Optionally runs Whisper pause point refinement + """ + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Job not found" + ) + + # Check job is in QC status + if job_doc["status"] not in [JobStatus.PENDING_QC.value]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Job must be in pending_qc status to re-render (current: {job_doc['status']})" + ) + + # Get edit state + lang_output = job_doc.get("outputs", {}).get(language) + if not lang_output or not lang_output.get("accessible_video_edit_state"): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No edit state found for language {language}" + ) + + edit_state = lang_output["accessible_video_edit_state"] + + # Get cues to regenerate + regenerate_cues = [ + req.get("cue_index") + for req in edit_state.get("tts_regeneration_queue", []) + if req.get("status") == "pending" + ] + + # Update job status to RENDERING_QC + job_title = job_doc.get("title", "Untitled Job") + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + "status": JobStatus.RENDERING_QC.value, + "updated_at": datetime.utcnow() + }, + "$push": { + "review.history": { + "at": datetime.utcnow(), + "status": JobStatus.RENDERING_QC.value, + "by": str(current_user.id), + "notes": f"Re-rendering {language} with {len(regenerate_cues)} TTS regeneration(s), whisper_refine={request.whisper_refine}" + } + } + } + ) + + # Trigger re-render task + from ...tasks.rerender_accessible_video import rerender_accessible_video_task + rerender_accessible_video_task.delay( + job_id=job_id, + language=language, + regenerate_cue_indices=regenerate_cues, + whisper_refine=request.whisper_refine + ) + + logger.info( + f"Triggered accessible video re-render for job {job_id}/{language}: " + f"regenerate_cues={regenerate_cues}, whisper_refine={request.whisper_refine}" + ) + + # Get updated job + result = await db.jobs.find_one({"_id": job_id}) + + return JobResponse( + id=str(result["_id"]), + title=result["title"], + status=result["status"], + source=result["source"], + requested_outputs=RequestedOutputs(**result["requested_outputs"]), + review=result.get("review", {"notes": "", "history": []}), + outputs=result.get("outputs"), + created_at=result["created_at"].isoformat(), + updated_at=result["updated_at"].isoformat() + ) diff --git a/backend/app/models/job.py b/backend/app/models/job.py index f7bc1ae..56be357 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -19,6 +19,7 @@ class JobStatus(str, Enum): TTS_FAILED = "tts_failed" # TTS synthesis failed after retries, requires reprocessing RENDERING_VIDEO = "rendering_video" # Accessible video rendering in progress RENDER_FAILED = "render_failed" # Accessible video rendering failed, requires reprocessing + RENDERING_QC = "rendering_qc" # Re-rendering accessible video during QC review PENDING_FINAL_REVIEW = "pending_final_review" COMPLETED = "completed" @@ -64,6 +65,44 @@ class RequestedOutputs(BaseModel): translation_mode: Literal["traditional", "video_native"] = "video_native" +class PausePointData(BaseModel): + """Pause point timing data for accessible video editing during QC.""" + cue_index: int # AD cue index this pause point belongs to + original_ms: float # Original pause point timestamp (ms) + adjusted_ms: Optional[float] = None # User-adjusted timestamp (ms), None = use original + min_bound_ms: float # Minimum allowed value (end of previous AD segment) + max_bound_ms: float # Maximum allowed value (start of next AD segment) + + +class VideoSegmentMetadata(BaseModel): + """Metadata for a video segment between pause points.""" + segment_index: int # 0-based segment index + start_ms: float # Start timestamp in source video (ms) + end_ms: float # End timestamp in source video (ms) + gcs_uri: str # GCS path to segment MP4 + duration_ms: float # Actual segment duration (ms) + is_freeze_frame: bool = False # True if this is a freeze frame segment with AD audio + cue_index: Optional[int] = None # AD cue index (only for freeze frame segments) + + +class TTSRegenerationRequest(BaseModel): + """Request to regenerate TTS for a specific cue during QC.""" + cue_index: int + requested_at: datetime + new_text: Optional[str] = None # If provided, use this text instead of current VTT + status: Literal["pending", "processing", "completed", "failed"] = "pending" + error_message: Optional[str] = None + + +class AccessibleVideoEditState(BaseModel): + """Editable state for accessible video during QC review.""" + pause_points: list[PausePointData] = [] + video_segments: list[VideoSegmentMetadata] = [] + tts_regeneration_queue: list[TTSRegenerationRequest] = [] + last_render_at: Optional[datetime] = None + whisper_refine_enabled: bool = False # Default: off (user enables if cue positions changed) + + class LangOutput(BaseModel): captions_vtt_gcs: Optional[str] = None ad_vtt_gcs: Optional[str] = None @@ -73,6 +112,9 @@ class LangOutput(BaseModel): accessible_video_method: Optional[Literal["overlay", "pause_insert"]] = None retimed_captions_vtt_gcs: Optional[str] = None # Re-timed captions for pause-insert method ad_cues_gcs_prefix: Optional[str] = None # GCS path prefix for per-cue MP3 segments + # QC editing state for accessible video + video_segments_gcs_prefix: Optional[str] = None # GCS prefix for persisted video segments + accessible_video_edit_state: Optional[AccessibleVideoEditState] = None origin: Optional[Literal["translate", "transcreate", "gemini_translate", "video_native"]] = None qa_notes: Optional[str] = None diff --git a/backend/app/schemas/accessible_video.py b/backend/app/schemas/accessible_video.py index 8f52dfb..fee9daf 100644 --- a/backend/app/schemas/accessible_video.py +++ b/backend/app/schemas/accessible_video.py @@ -122,3 +122,89 @@ class AccessibleVideoProgress(BaseModel): error_message: Optional[str] = None started_at: Optional[str] = None completed_at: Optional[str] = None + + +# === QC Review Accessible Video Editing Schemas === + + +class PausePointResponse(BaseModel): + """Pause point timing data for QC editing.""" + cue_index: int = Field(..., description="AD cue index this pause point belongs to") + original_ms: float = Field(..., description="Original pause point timestamp (ms)") + adjusted_ms: Optional[float] = Field(None, description="User-adjusted timestamp (ms)") + min_bound_ms: float = Field(..., description="Minimum allowed value (ms)") + max_bound_ms: float = Field(..., description="Maximum allowed value (ms)") + + +class VideoSegmentResponse(BaseModel): + """Metadata for a video segment.""" + segment_index: int = Field(..., description="0-based segment index") + start_ms: float = Field(..., description="Start timestamp in source video (ms)") + end_ms: float = Field(..., description="End timestamp in source video (ms)") + gcs_uri: str = Field(..., description="GCS path to segment MP4") + duration_ms: float = Field(..., description="Actual segment duration (ms)") + is_freeze_frame: bool = Field(False, description="True if freeze frame with AD audio") + cue_index: Optional[int] = Field(None, description="AD cue index (freeze frames only)") + + +class TTSRegenerationItem(BaseModel): + """A queued TTS regeneration request.""" + cue_index: int = Field(..., description="AD cue index to regenerate") + requested_at: str = Field(..., description="ISO timestamp when requested") + new_text: Optional[str] = Field(None, description="Override text (if provided)") + status: str = Field("pending", description="pending | processing | completed | failed") + error_message: Optional[str] = None + + +class AccessibleVideoEditStateResponse(BaseModel): + """Current editable state for accessible video during QC review.""" + pause_points: list[PausePointResponse] = Field( + default_factory=list, + description="All pause points with original and adjusted values" + ) + video_segments: list[VideoSegmentResponse] = Field( + default_factory=list, + description="Video segment metadata for timeline display" + ) + tts_regeneration_queue: list[TTSRegenerationItem] = Field( + default_factory=list, + description="Queued TTS regeneration requests" + ) + last_render_at: Optional[str] = Field( + None, + description="ISO timestamp of last accessible video render" + ) + total_duration_ms: float = Field(..., description="Total accessible video duration (ms)") + accessible_video_url: Optional[str] = Field( + None, + description="Signed URL for accessible video preview" + ) + + +class PausePointUpdateRequest(BaseModel): + """Request to update a pause point's adjusted timing.""" + adjusted_ms: float = Field( + ..., + description="New pause point timestamp in milliseconds" + ) + + +class TTSRegenerationQueueRequest(BaseModel): + """Request to queue TTS regeneration for specific cues.""" + cue_indices: list[int] = Field( + ..., + description="List of AD cue indices to regenerate" + ) + + +class TTSRegenerationRemoveRequest(BaseModel): + """Request to remove a cue from the TTS regeneration queue.""" + cue_index: int = Field(..., description="AD cue index to remove from queue") + + +class RerenderAccessibleVideoRequest(BaseModel): + """Request to re-render accessible video with QC changes.""" + whisper_refine: bool = Field( + False, + description="Run Whisper pause point refinement (enable if cue count/position changed)" + ) diff --git a/backend/app/services/video_renderer.py b/backend/app/services/video_renderer.py index ebfa1c0..86df1db 100644 --- a/backend/app/services/video_renderer.py +++ b/backend/app/services/video_renderer.py @@ -22,6 +22,7 @@ from google.oauth2 import id_token from ..core.config import settings from ..core.logging import get_logger +from ..models.job import PausePointData, VideoSegmentMetadata from ..schemas.accessible_video import AccessibleVideoMethod, GeminiAccessibleVideoAnalysis logger = get_logger(__name__) @@ -150,6 +151,27 @@ class VideoRendererService: # Log but don't fail on cleanup errors logger.warning(f"Failed to delete GCS temp file {gcs_uri}: {e}") + def _upload_to_gcs_permanent(self, local_path: str, gcs_path: str) -> str: + """ + Upload local file to permanent GCS location (not temp). + + Args: + local_path: Path to local file + gcs_path: Full GCS path within the bucket (e.g., "job_id/en/segments/seg_0.mp4") + + Returns: + GCS URI (gs://bucket/gcs_path) + """ + client = self._get_gcs_client() + bucket = client.bucket(settings.gcs_bucket) + + blob = bucket.blob(gcs_path) + blob.upload_from_filename(local_path) + + gcs_uri = f"gs://{settings.gcs_bucket}/{gcs_path}" + logger.debug(f"Uploaded {local_path} to {gcs_uri} (permanent)") + return gcs_uri + async def _call_cloud_run_probe(self, gcs_uri: str) -> dict[str, Any]: """ Call Cloud Run FFmpeg service /probe endpoint. @@ -300,7 +322,9 @@ class VideoRendererService: ad_segments: list[tuple[int, str]], # [(cue_index, mp3_path), ...] analysis: dict[str, Any], output_path: str, - ) -> tuple[str, list[dict] | None]: + persist_segments: bool = False, + gcs_segment_prefix: str | None = None, + ) -> tuple[str, list[dict] | None, list[VideoSegmentMetadata] | None, list[PausePointData] | None]: """ Render accessible video based on Gemini analysis. @@ -309,11 +333,15 @@ class VideoRendererService: ad_segments: List of (cue_index, mp3_path) tuples for each AD segment analysis: Gemini analysis dict with method and placements output_path: Where to save the output MP4 + persist_segments: If True, upload video segments to GCS for QC editing + gcs_segment_prefix: GCS path prefix for segments (e.g., "job_id/en/segments/") Returns: - Tuple of (output_path, updated_placements) + Tuple of (output_path, updated_placements, segment_metadata, pause_points) - output_path: Path to rendered accessible video - updated_placements: Placements with actual_freeze_duration added (pause-insert only) + - segment_metadata: List of VideoSegmentMetadata if persist_segments=True, else None + - pause_points: List of PausePointData if persist_segments=True, else None """ method = analysis.get("method", "pause_insert") @@ -328,10 +356,12 @@ class VideoRendererService: result_path = await self._render_overlay_method( source_video_path, ad_segments, analysis, output_path ) - return (result_path, None) + return (result_path, None, None, None) else: return await self._render_pause_insert_method( - source_video_path, ad_segments, analysis, output_path + source_video_path, ad_segments, analysis, output_path, + persist_segments=persist_segments, + gcs_segment_prefix=gcs_segment_prefix ) finally: # Clean up cached source video from GCS @@ -460,13 +490,23 @@ class VideoRendererService: ad_segments: list[tuple[int, str]], analysis: dict[str, Any], output_path: str, - ) -> tuple[str, list[dict]]: + persist_segments: bool = False, + gcs_segment_prefix: str | None = None, + ) -> tuple[str, list[dict], list[VideoSegmentMetadata] | None, list[PausePointData] | None]: """ Render with pause-insert method: 1. Split video at each pause point 2. Extract freeze frame at each pause point 3. Create freeze-frame segment with AD audio 4. Concatenate all segments + 5. Optionally persist segments to GCS for QC editing + + Args: + persist_segments: If True, upload segments to GCS and return metadata + gcs_segment_prefix: GCS path prefix (e.g., "job_id/en/segments/") + + Returns: + Tuple of (output_path, updated_placements, segment_metadata, pause_points) """ logger.info(f"Starting pause-insert render for {source_video_path}") placements = analysis.get("placements", []) @@ -489,7 +529,7 @@ class VideoRendererService: if not sorted_placements: logger.warning("No pause points found, copying source video") await self._copy_video(source_video_path, output_path) - return (output_path, []) + return (output_path, [], None, None) with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) @@ -725,7 +765,106 @@ class VideoRendererService: updated["actual_freeze_duration"] = actual_durations[cue_index] updated_placements.append(updated) - return (output_path, updated_placements) + # ============================================================ + # PHASE 5: Persist segments to GCS for QC editing (optional) + # ============================================================ + segment_metadata_list: list[VideoSegmentMetadata] | None = None + pause_point_data_list: list[PausePointData] | None = None + + if persist_segments and gcs_segment_prefix: + logger.info(f"Persisting {len(segment_files)} segments to GCS at {gcs_segment_prefix}") + segment_metadata_list = [] + segment_idx = 0 + cumulative_time_ms = 0.0 + + for p in valid_placements: + i = p["index"] + # Upload video segment if it exists + if i in video_segment_paths: + local_path = video_segment_paths[i] + gcs_path = f"{gcs_segment_prefix}seg_{segment_idx}.mp4" + gcs_uri = self._upload_to_gcs_permanent(local_path, gcs_path) + segment_duration_ms = (p["pause_point"] - p["segment_start"]) * 1000 + + segment_metadata_list.append(VideoSegmentMetadata( + segment_index=segment_idx, + start_ms=cumulative_time_ms, + end_ms=cumulative_time_ms + segment_duration_ms, + gcs_uri=gcs_uri, + duration_ms=segment_duration_ms, + is_freeze_frame=False, + cue_index=None + )) + cumulative_time_ms += segment_duration_ms + segment_idx += 1 + + # Upload freeze segment + freeze_local_path = freeze_segment_paths[i] + gcs_path = f"{gcs_segment_prefix}seg_{segment_idx}_freeze.mp4" + gcs_uri = self._upload_to_gcs_permanent(freeze_local_path, gcs_path) + freeze_duration_ms = p["actual_freeze_duration"] * 1000 + + segment_metadata_list.append(VideoSegmentMetadata( + segment_index=segment_idx, + start_ms=cumulative_time_ms, + end_ms=cumulative_time_ms + freeze_duration_ms, + gcs_uri=gcs_uri, + duration_ms=freeze_duration_ms, + is_freeze_frame=True, + cue_index=p["cue_index"] + )) + cumulative_time_ms += freeze_duration_ms + segment_idx += 1 + + # Upload final segment if exists + if final_segment_path: + gcs_path = f"{gcs_segment_prefix}seg_{segment_idx}.mp4" + gcs_uri = self._upload_to_gcs_permanent(str(final_segment_path), gcs_path) + final_duration_ms = (source_duration - final_segment_start) * 1000 + + segment_metadata_list.append(VideoSegmentMetadata( + segment_index=segment_idx, + start_ms=cumulative_time_ms, + end_ms=cumulative_time_ms + final_duration_ms, + gcs_uri=gcs_uri, + duration_ms=final_duration_ms, + is_freeze_frame=False, + cue_index=None + )) + + logger.info(f"Persisted {len(segment_metadata_list)} segments to GCS") + + # Build PausePointData list with bounds + pause_point_data_list = [] + for idx, p in enumerate(valid_placements): + pause_ms = p["pause_point"] * 1000 + + # Compute min bound: end of previous AD segment (or 0 for first) + if idx == 0: + min_bound_ms = 0.0 + else: + prev_p = valid_placements[idx - 1] + # End of previous freeze = pause_point + freeze_duration + min_bound_ms = (prev_p["pause_point"] + prev_p["actual_freeze_duration"]) * 1000 + + # Compute max bound: start of next pause point (or video end for last) + if idx == len(valid_placements) - 1: + max_bound_ms = source_duration * 1000 + else: + next_p = valid_placements[idx + 1] + max_bound_ms = next_p["pause_point"] * 1000 + + pause_point_data_list.append(PausePointData( + cue_index=p["cue_index"], + original_ms=pause_ms, + adjusted_ms=None, + min_bound_ms=min_bound_ms, + max_bound_ms=max_bound_ms + )) + + logger.info(f"Built {len(pause_point_data_list)} pause point data entries") + + return (output_path, updated_placements, segment_metadata_list, pause_point_data_list) async def _get_video_duration(self, video_path: str) -> float: """Get video duration in seconds using ffprobe.""" diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index 8079fe7..f0c33dd 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -249,11 +249,12 @@ async def ingest_and_ai_task_impl(job_id: str): ) # Update job with AI results, detected language, and outputs + # Set status to TRANSLATING to trigger translation pipeline before QC await db.jobs.update_one( {"_id": job_id}, { "$set": { - "status": JobStatus.PENDING_QC.value, + "status": JobStatus.TRANSLATING.value, "source.language": source_language, # Update with detected language "source.detected_language": detected_language, "ai.ingestion_json": ai_result, @@ -267,22 +268,27 @@ async def ingest_and_ai_task_impl(job_id: str): "$push": { "review.history": { "at": datetime.utcnow(), - "status": JobStatus.PENDING_QC.value, + "status": JobStatus.TRANSLATING.value, "by": "system" } } } ) - + # Broadcast status update broadcast_status_update( - job_id, - JobStatus.PENDING_QC.value, + job_id, + JobStatus.TRANSLATING.value, job_title=job_title, - message=f"{job_title} has completed AI processing and is ready for QC review" + message=f"{job_title} AI processing complete, starting translation pipeline" ) - logger.info(f"Successfully completed ingestion and AI processing for job {job_id}") + logger.info(f"AI processing complete for job {job_id}, triggering translation pipeline") + + # Trigger translation and synthesis pipeline + # This will process all translations, TTS, and accessible video BEFORE QC review + from .translate_and_synthesize import translate_and_synthesize_task + translate_and_synthesize_task.delay(job_id) finally: # Clean up temp file diff --git a/backend/app/tasks/render_accessible_video.py b/backend/app/tasks/render_accessible_video.py index 1337fa7..b258bec 100644 --- a/backend/app/tasks/render_accessible_video.py +++ b/backend/app/tasks/render_accessible_video.py @@ -11,7 +11,7 @@ from motor.motor_asyncio import AsyncIOMotorClient from ..core.config import settings from ..core.logging import get_logger from ..lib.vtt import VTTParser -from ..models.job import JobStatus +from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata from ..schemas.whisper import CachedWhisperTranscript, CachedWordTimestamp from ..services.gcs import gcs_service from ..services.video_renderer import video_renderer_service @@ -198,15 +198,18 @@ async def _async_render_accessible_video(job_id: str, language: str): analysis["warnings"] = existing_warnings + whisper_warnings logger.info(f"Whisper refinement complete with {len(whisper_warnings)} warnings") - # 6. Render accessible video + # 6. Render accessible video with segment persistence for QC editing output_video_path = os.path.join(temp_dir, "accessible_video.mp4") + gcs_segment_prefix = f"{job_id}/{language}/segments/" - logger.info(f"Rendering accessible video using {method} method...") - rendered_path, updated_placements = await video_renderer_service.render_accessible_video( + logger.info(f"Rendering accessible video using {method} method with segment persistence...") + rendered_path, updated_placements, segment_metadata, pause_points = await video_renderer_service.render_accessible_video( source_video_path, ad_segments, analysis, - output_video_path + output_video_path, + persist_segments=True, + gcs_segment_prefix=gcs_segment_prefix ) # Update analysis with actual freeze durations for VTT retiming @@ -214,6 +217,18 @@ async def _async_render_accessible_video(job_id: str, language: str): analysis["placements"] = updated_placements logger.info(f"Updated {len(updated_placements)} placements with actual freeze durations") + # Build edit state for QC review if segment metadata was returned + edit_state = None + if segment_metadata and pause_points: + edit_state = AccessibleVideoEditState( + pause_points=pause_points, + video_segments=segment_metadata, + tts_regeneration_queue=[], + last_render_at=datetime.utcnow(), + whisper_refine_enabled=False + ) + logger.info(f"Built edit state with {len(segment_metadata)} segments and {len(pause_points)} pause points") + # 7. Upload rendered video to GCS video_blob_path = f"{job_id}/{language}/accessible_video.mp4" video_blob = gcs_service.bucket.blob(video_blob_path) @@ -248,10 +263,11 @@ async def _async_render_accessible_video(job_id: str, language: str): retimed_captions_gcs_uri = f"gs://{settings.gcs_bucket}/{retimed_blob_path}" logger.info(f"Uploaded re-timed captions to {retimed_captions_gcs_uri}") - # 9. Update job document with results + # 9. Update job document with results (including edit state for QC review) update_fields = { f"outputs.{language}.accessible_video_gcs": video_gcs_uri, f"outputs.{language}.accessible_video_method": method, + f"outputs.{language}.video_segments_gcs_prefix": f"gs://{settings.gcs_bucket}/{gcs_segment_prefix}", f"accessible_video_progress.{language}": { "status": "completed", "method": method, @@ -264,6 +280,10 @@ async def _async_render_accessible_video(job_id: str, language: str): if retimed_captions_gcs_uri: update_fields[f"outputs.{language}.retimed_captions_vtt_gcs"] = retimed_captions_gcs_uri + # Store edit state for QC review accessible video editing + if edit_state: + update_fields[f"outputs.{language}.accessible_video_edit_state"] = edit_state.model_dump() + await db.jobs.update_one( {"_id": job_id}, {"$set": update_fields} @@ -423,6 +443,7 @@ async def _check_accessible_video_completion(job_id: str, db): ) else: # All videos completed successfully + # NEW WORKFLOW: Go to PENDING_QC for QC review (not PENDING_FINAL_REVIEW) logger.info(f"All accessible videos complete for job {job_id}") if job_doc["status"] in [JobStatus.TTS_GENERATING.value, JobStatus.RENDERING_VIDEO.value]: @@ -430,13 +451,13 @@ async def _check_accessible_video_completion(job_id: str, db): {"_id": job_id}, { "$set": { - "status": JobStatus.PENDING_FINAL_REVIEW.value, + "status": JobStatus.PENDING_QC.value, "updated_at": datetime.utcnow() }, "$push": { "review.history": { "at": datetime.utcnow(), - "status": JobStatus.PENDING_FINAL_REVIEW.value, + "status": JobStatus.PENDING_QC.value, "by": "system" } } @@ -445,9 +466,9 @@ async def _check_accessible_video_completion(job_id: str, db): broadcast_status_update( job_id, - JobStatus.PENDING_FINAL_REVIEW.value, + JobStatus.PENDING_QC.value, job_title=job_title, - message=f"{job_title} has all accessible videos complete - ready for Final Review" + message=f"{job_title} has all accessible videos complete - ready for QC Review" ) diff --git a/backend/app/tasks/rerender_accessible_video.py b/backend/app/tasks/rerender_accessible_video.py new file mode 100644 index 0000000..1868d5b --- /dev/null +++ b/backend/app/tasks/rerender_accessible_video.py @@ -0,0 +1,497 @@ +"""Celery task for re-rendering accessible video with QC changes.""" + +import asyncio +import io +import os +import tempfile +from datetime import datetime + +from celery.result import allow_join_result +from motor.motor_asyncio import AsyncIOMotorClient +from pydub import AudioSegment + +from ..core.config import settings +from ..core.logging import get_logger +from ..lib.vtt import VTTParser +from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata +from ..services.gcs import gcs_service +from ..services.video_renderer import video_renderer_service +from ..services.vtt_retimer import vtt_retimer_service +from ..services.whisper_service import WordTimestamp, whisper_service +from . import celery_app +from .render_accessible_video import _extract_audio_for_whisper, _dispatch_whisper_transcription +from .translate_and_synthesize import broadcast_status_update +from .tts_synthesis import dispatch_language_tts, parse_ad_cues, synthesize_cue_task + +logger = get_logger(__name__) + + +@celery_app.task(bind=True, time_limit=7200, soft_time_limit=7000) +def rerender_accessible_video_task( + self, + job_id: str, + language: str, + regenerate_cue_indices: list[int], + whisper_refine: bool = False +): + """ + Re-render accessible video during QC review with selective TTS regeneration. + + This task: + 1. If regenerate_cue_indices not empty: synthesize new TTS for those cues + 2. Download source video and existing segments/MP3s + 3. If whisper_refine: run Whisper pause point refinement + 4. Re-render video using updated pause points and new/existing TTS + 5. Update job status back to PENDING_QC + + Args: + job_id: Job ID + language: Language being re-rendered + regenerate_cue_indices: List of cue indices to regenerate TTS for + whisper_refine: Whether to run Whisper pause point refinement + """ + logger.info( + f"Starting accessible video re-render for job {job_id}/{language}: " + f"regenerate={regenerate_cue_indices}, whisper_refine={whisper_refine}" + ) + + try: + result = asyncio.run(_async_rerender_accessible_video( + job_id, language, regenerate_cue_indices, whisper_refine + )) + logger.info(f"Accessible video re-render completed for job {job_id}/{language}") + return result + except Exception as e: + logger.error(f"Accessible video re-render failed for job {job_id}/{language}: {e}") + import traceback + logger.error(f"Full traceback: {traceback.format_exc()}") + + # Update job status back to PENDING_QC with error + asyncio.run(_mark_rerender_failed(job_id, language, str(e))) + raise + + +async def _mark_rerender_failed(job_id: str, language: str, error_message: str): + """Mark re-render as failed and return to PENDING_QC.""" + client = AsyncIOMotorClient(settings.mongodb_uri) + db = client[settings.mongodb_db] + + try: + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + "status": JobStatus.PENDING_QC.value, + f"outputs.{language}.accessible_video_edit_state.last_render_error": error_message, + "updated_at": datetime.utcnow() + }, + "$push": { + "review.history": { + "at": datetime.utcnow(), + "status": JobStatus.PENDING_QC.value, + "by": "system", + "notes": f"Re-render failed for {language}: {error_message[:200]}" + } + } + } + ) + + job_doc = await db.jobs.find_one({"_id": job_id}) + broadcast_status_update( + job_id, + JobStatus.PENDING_QC.value, + job_title=job_doc.get("title") if job_doc else None, + message=f"Re-render failed: {error_message[:100]}" + ) + finally: + client.close() + + +async def _async_rerender_accessible_video( + job_id: str, + language: str, + regenerate_cue_indices: list[int], + whisper_refine: bool +): + """Async implementation of accessible video re-rendering.""" + logger.info(f"Async re-render started for job {job_id}/{language}") + + client = AsyncIOMotorClient(settings.mongodb_uri) + db = client[settings.mongodb_db] + + try: + # Get job details + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise ValueError(f"Job {job_id} not found") + + job_title = job_doc.get("title", "Untitled Job") + lang_output = job_doc.get("outputs", {}).get(language) + if not lang_output: + raise ValueError(f"No outputs found for language {language}") + + edit_state = lang_output.get("accessible_video_edit_state") + if not edit_state: + raise ValueError(f"No edit state found for language {language}") + + # Use TMPDIR env var if set + temp_base = os.environ.get('TMPDIR', None) + with tempfile.TemporaryDirectory(dir=temp_base) as temp_dir: + # 1. Download source video + source_video_gcs = job_doc["source"]["gcs_uri"] + source_blob_path = source_video_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + source_video_path = os.path.join(temp_dir, "source.mp4") + + logger.info(f"Downloading source video from {source_blob_path}") + source_blob = gcs_service.bucket.blob(source_blob_path) + source_blob.download_to_filename(source_video_path) + + # 2. Regenerate TTS for queued cues (if any) + if regenerate_cue_indices: + logger.info(f"Regenerating TTS for cues: {regenerate_cue_indices}") + await _regenerate_tts_cues( + job_id, language, regenerate_cue_indices, job_doc, db, temp_dir + ) + + # Clear regeneration queue after successful synthesis + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"outputs.{language}.accessible_video_edit_state.tts_regeneration_queue": [], + "updated_at": datetime.utcnow() + } + } + ) + + # 3. Download AD VTT and per-cue MP3s + ad_vtt_gcs = lang_output.get("ad_vtt_gcs") + if not ad_vtt_gcs: + raise ValueError(f"No AD VTT found for language {language}") + + ad_blob_path = ad_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + ad_blob = gcs_service.bucket.blob(ad_blob_path) + ad_vtt_content = ad_blob.download_as_text() + + # Download per-cue MP3s + ad_cues_prefix = lang_output.get("ad_cues_gcs_prefix") + if not ad_cues_prefix: + raise ValueError(f"No AD cue segments found for language {language}") + + ad_segments = [] + cue_durations = [] + + prefix_path = ad_cues_prefix.replace(f"gs://{settings.gcs_bucket}/", "") + blobs = list(gcs_service.bucket.list_blobs(prefix=prefix_path)) + + cue_blobs = [(b, int(b.name.split("_")[-1].replace(".mp3", ""))) for b in blobs if b.name.endswith(".mp3")] + cue_blobs.sort(key=lambda x: x[1]) + + for blob, cue_index in cue_blobs: + local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3") + blob.download_to_filename(local_path) + ad_segments.append((cue_index, local_path)) + + audio = AudioSegment.from_mp3(local_path) + duration = len(audio) / 1000.0 + cue_durations.append(duration) + + logger.info(f"Downloaded {len(ad_segments)} AD cue segments") + + # 4. Build placements with adjusted pause points + method = lang_output.get("accessible_video_method", "pause_insert") + pause_points = edit_state.get("pause_points", []) + + placements = _build_placements_with_adjustments( + ad_vtt_content, cue_durations, pause_points + ) + logger.info(f"Built {len(placements)} placements with adjusted pause points") + + analysis = { + "method": method, + "method_rationale": "QC re-render with user adjustments", + "placements": placements, + "total_added_duration": sum(cue_durations) if method == "pause_insert" else 0, + "warnings": [] + } + + # 5. Optionally run Whisper refinement + if whisper_refine and method == "pause_insert": + logger.info("Running Whisper pause point refinement...") + analysis, whisper_warnings = await _refine_pause_points_for_rerender( + job_id, source_video_path, analysis, db, temp_dir + ) + if whisper_warnings: + analysis["warnings"] = analysis.get("warnings", []) + whisper_warnings + logger.info(f"Whisper refinement complete with {len(whisper_warnings)} warnings") + + # 6. Render accessible video (persist segments again for future edits) + output_video_path = os.path.join(temp_dir, "accessible_video.mp4") + gcs_segment_prefix = f"{job_id}/{language}/segments/" + + logger.info(f"Re-rendering accessible video using {method} method...") + rendered_path, updated_placements, segment_metadata, new_pause_points = await video_renderer_service.render_accessible_video( + source_video_path, + ad_segments, + analysis, + output_video_path, + persist_segments=True, + gcs_segment_prefix=gcs_segment_prefix + ) + + if updated_placements: + analysis["placements"] = updated_placements + + # 7. Upload rendered video + video_blob_path = f"{job_id}/{language}/accessible_video.mp4" + video_blob = gcs_service.bucket.blob(video_blob_path) + video_blob.content_type = "video/mp4" + video_blob.upload_from_filename(output_video_path) + + video_gcs_uri = f"gs://{settings.gcs_bucket}/{video_blob_path}" + logger.info(f"Uploaded re-rendered accessible video to {video_gcs_uri}") + + # 8. Generate re-timed captions if pause-insert + retimed_captions_gcs_uri = None + if method == "pause_insert": + captions_vtt_gcs = lang_output.get("captions_vtt_gcs") + if captions_vtt_gcs: + captions_blob_path = captions_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + captions_blob = gcs_service.bucket.blob(captions_blob_path) + original_captions_vtt = captions_blob.download_as_text() + + retimed_captions = vtt_retimer_service.retime_for_pause_insert( + original_captions_vtt, analysis + ) + + retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt" + retimed_blob = gcs_service.bucket.blob(retimed_blob_path) + retimed_blob.content_type = "text/vtt" + retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt") + + retimed_captions_gcs_uri = f"gs://{settings.gcs_bucket}/{retimed_blob_path}" + logger.info(f"Uploaded re-timed captions to {retimed_captions_gcs_uri}") + + # 9. Build new edit state + new_edit_state = None + if segment_metadata and new_pause_points: + new_edit_state = AccessibleVideoEditState( + pause_points=new_pause_points, + video_segments=segment_metadata, + tts_regeneration_queue=[], + last_render_at=datetime.utcnow(), + whisper_refine_enabled=whisper_refine + ) + + # 10. Update job document + update_fields = { + f"outputs.{language}.accessible_video_gcs": video_gcs_uri, + f"outputs.{language}.video_segments_gcs_prefix": f"gs://{settings.gcs_bucket}/{gcs_segment_prefix}", + "status": JobStatus.PENDING_QC.value, + "updated_at": datetime.utcnow() + } + + if retimed_captions_gcs_uri: + update_fields[f"outputs.{language}.retimed_captions_vtt_gcs"] = retimed_captions_gcs_uri + + if new_edit_state: + update_fields[f"outputs.{language}.accessible_video_edit_state"] = new_edit_state.model_dump() + + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": update_fields, + "$push": { + "review.history": { + "at": datetime.utcnow(), + "status": JobStatus.PENDING_QC.value, + "by": "system", + "notes": f"Re-render complete for {language}" + } + } + } + ) + + # Broadcast completion + broadcast_status_update( + job_id, + JobStatus.PENDING_QC.value, + job_title=job_title, + message=f"Accessible video re-render complete for {language.upper()}" + ) + + logger.info(f"Accessible video re-render complete for job {job_id}/{language}") + + finally: + client.close() + + +async def _regenerate_tts_cues( + job_id: str, + language: str, + cue_indices: list[int], + job_doc: dict, + db, + temp_dir: str +): + """Regenerate TTS for specific cues using current VTT text.""" + logger.info(f"Regenerating TTS for {len(cue_indices)} cues") + + # Get AD VTT content + lang_output = job_doc.get("outputs", {}).get(language) + ad_vtt_gcs = lang_output.get("ad_vtt_gcs") + + ad_blob_path = ad_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + ad_blob = gcs_service.bucket.blob(ad_blob_path) + ad_vtt_content = ad_blob.download_as_text() + + # Parse cues + cues = parse_ad_cues(ad_vtt_content) + + # Get TTS preferences + tts_preferences = job_doc["requested_outputs"].get("tts_preferences", {}) + voices_per_language = tts_preferences.get("voices_per_language", {}) + voice_name = voices_per_language.get(language, tts_preferences.get("default_voice")) + provider = tts_preferences.get("provider", "gemini") + model = tts_preferences.get("model", "flash") + speed = tts_preferences.get("speed", 1.0) + style_preset = tts_preferences.get("style_preset", "neutral") + custom_style_prompt = tts_preferences.get("custom_style_prompt") + + if style_preset == "custom" and custom_style_prompt: + style_prompt = custom_style_prompt + else: + style_prompt = settings.gemini_tts_style_prompts.get(style_preset, "") + + # Synthesize each cue + for cue_idx in cue_indices: + if cue_idx >= len(cues): + logger.warning(f"Cue index {cue_idx} out of range, skipping") + continue + + cue = cues[cue_idx] + + logger.info(f"Synthesizing TTS for cue {cue_idx}: '{cue['text'][:50]}...'") + + # Dispatch synthesis task + task_result = synthesize_cue_task.apply_async( + kwargs={ + "job_id": job_id, + "language": language, + "cue_index": cue_idx, + "text": cue["text"], + "start_time": cue["start_time"], + "end_time": cue["end_time"], + "voice_name": voice_name, + "provider": provider, + "model": model, + "speed": speed, + "style_prompt": style_prompt + }, + queue="tts" + ) + + # Wait for completion + poll_count = 0 + while not task_result.ready(): + await asyncio.sleep(1.0) + poll_count += 1 + if poll_count % 30 == 0: + logger.info(f"Still waiting for TTS cue {cue_idx}...") + + with allow_join_result(): + result = task_result.get(timeout=120) + + if not result.get("success"): + raise Exception(f"TTS synthesis failed for cue {cue_idx}: {result.get('error_message')}") + + logger.info(f"TTS synthesis complete for cue {cue_idx}") + + logger.info(f"All {len(cue_indices)} TTS cues regenerated") + + +def _build_placements_with_adjustments( + ad_vtt_content: str, + cue_durations: list[float], + pause_points: list[dict] +) -> list[dict]: + """ + Build placement instructions using adjusted pause points from QC edits. + + Args: + ad_vtt_content: AD VTT content + cue_durations: TTS durations per cue + pause_points: Pause point data with original and adjusted values + + Returns: + List of placement dicts + """ + cues = VTTParser.parse(ad_vtt_content) + + # Build lookup of adjusted pause points by cue index + adjusted_pause_by_cue = {} + for pp in pause_points: + cue_idx = pp.get("cue_index") + adjusted = pp.get("adjusted_ms") + original = pp.get("original_ms") + # Use adjusted if set, otherwise original (in seconds) + pause_time_s = (adjusted if adjusted is not None else original) / 1000.0 + adjusted_pause_by_cue[cue_idx] = pause_time_s + + placements = [] + for i, cue in enumerate(cues): + if i >= len(cue_durations): + break + + # Get pause point: use adjusted value if available + pause_point = adjusted_pause_by_cue.get(i, cue.start_time) + + placements.append({ + "ad_cue_index": i, + "original_start_time": cue.start_time, + "original_end_time": cue.end_time, + "target_start_time": cue.start_time, + "ad_duration": cue_durations[i], + "pause_point": pause_point, + "resume_from": pause_point, + "pause_point_rationale": "User-adjusted during QC" if i in adjusted_pause_by_cue else "Original from VTT" + }) + + return placements + + +async def _refine_pause_points_for_rerender( + job_id: str, + video_path: str, + analysis: dict, + db, + temp_dir: str +) -> tuple[dict, list[str]]: + """Run Whisper pause point refinement for re-render.""" + logger.info(f"Refining pause points with Whisper for re-render of job {job_id}") + + audio_path = os.path.join(temp_dir, "source_audio.mp3") + await _extract_audio_for_whisper(video_path, audio_path) + + try: + words = await _dispatch_whisper_transcription(job_id, audio_path) + except Exception as e: + logger.error(f"Whisper transcription failed: {e}") + return analysis, [f"Whisper failed: {str(e)} - using current timestamps"] + + if not words: + return analysis, ["No speech detected - using current timestamps"] + + gaps = whisper_service.identify_speech_gaps(words) + + refined_placements, warnings = whisper_service.refine_all_pause_points( + analysis.get("placements", []), + words, + gaps + ) + + refined_analysis = analysis.copy() + refined_analysis["placements"] = refined_placements + refined_analysis["whisper_refined"] = True + + return refined_analysis, warnings diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index 418107c..eada0d0 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -139,9 +139,16 @@ async def _async_translate_and_synthesize(job_id: str): job_title = job_doc.get("title", "Untitled Job") logger.info(f"✅ Found job document for {job_id} ({job_title}), status: {job_doc.get('status', 'UNKNOWN')}") - # Check for any approved status (English or non-English source) - if not JobStatus.is_approved(job_doc["status"]): - logger.warning(f"⚠️ Job {job_id} not in approved status (current: {job_doc['status']}), skipping") + # Check for valid status to process translation + # Valid statuses: approved_english, approved_source (legacy), or translating (new workflow) + current_status = job_doc["status"] + valid_statuses = [ + JobStatus.APPROVED_ENGLISH.value, + JobStatus.APPROVED_SOURCE.value, + JobStatus.TRANSLATING.value, + ] + if current_status not in valid_statuses: + logger.warning(f"⚠️ Job {job_id} not in valid status for translation (current: {current_status}), skipping") return # Get source language from job @@ -389,20 +396,21 @@ async def _async_translate_and_synthesize(job_id: str): ) # Update final status + # NEW WORKFLOW: Translation pipeline now ends at PENDING_QC for QC review # If accessible video is requested, the render task will handle the transition - # to PENDING_FINAL_REVIEW when all videos are complete + # to PENDING_QC when all videos are complete if not accessible_video_requested: await db.jobs.update_one( {"_id": job_id}, { "$set": { - "status": JobStatus.PENDING_FINAL_REVIEW.value, + "status": JobStatus.PENDING_QC.value, "updated_at": datetime.utcnow() }, "$push": { "review.history": { "at": datetime.utcnow(), - "status": JobStatus.PENDING_FINAL_REVIEW.value, + "status": JobStatus.PENDING_QC.value, "by": "system" } } @@ -412,13 +420,13 @@ async def _async_translate_and_synthesize(job_id: str): # Broadcast status update broadcast_status_update( job_id, - JobStatus.PENDING_FINAL_REVIEW.value, + JobStatus.PENDING_QC.value, job_title=job_title, - message=f"{job_title} has finished translation and audio generation - ready for Final Review" + message=f"{job_title} has finished translation and audio generation - ready for QC Review" ) else: # When accessible video is requested, stay in TTS_GENERATING - # The render_accessible_video task will transition to PENDING_FINAL_REVIEW + # The render_accessible_video task will transition to PENDING_QC when all videos complete logger.info( f"Accessible video rendering triggered for job {job_id}. " f"Staying in TTS_GENERATING until all videos are complete." diff --git a/frontend/src/components/RerenderControls.tsx b/frontend/src/components/RerenderControls.tsx new file mode 100644 index 0000000..15a3470 --- /dev/null +++ b/frontend/src/components/RerenderControls.tsx @@ -0,0 +1,112 @@ +import { useState } from 'react'; + +interface RerenderControlsProps { + pendingRegenerations: number[]; + pausePointsModified: boolean; + isRendering: boolean; + onRender: (options: { whisperRefine: boolean }) => void; + onClearQueue: () => void; +} + +export function RerenderControls({ + pendingRegenerations, + pausePointsModified, + isRendering, + onRender, + onClearQueue, +}: RerenderControlsProps) { + const [whisperRefine, setWhisperRefine] = useState(false); + + const hasChanges = pendingRegenerations.length > 0 || pausePointsModified; + + return ( +
+ Pause points have been adjusted +
+ )} + + {!hasChanges && ( ++ No changes to render. Adjust pause points or queue TTS regenerations. +
+ )} ++ Enable if you changed the number or position of AD cues. Skip if you only adjusted pause timing. +
+ + +{error}
+ )} +Video preview will be available once processing begins
-Video preview will be available once processing begins
+Loading accessible video...
+Accessible video not available for this language yet.
+Translation and TTS synthesis may still be in progress.
+