fix: run Whisper transcription inline instead of as subtask
Celery does not allow calling result.get() within a task as it causes deadlocks. Changed the implementation to run Whisper transcription directly using asyncio.to_thread() instead of dispatching to a separate Celery queue. The Whisper transcript is still cached in MongoDB for reuse across language variants. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
3ca70a7c03
commit
7b0ebb357c
1 changed files with 7 additions and 17 deletions
|
|
@ -19,7 +19,6 @@ from ..services.vtt_retimer import vtt_retimer_service
|
|||
from ..services.whisper_service import WordTimestamp, whisper_service
|
||||
from . import celery_app
|
||||
from .translate_and_synthesize import broadcast_status_update, retry_with_backoff
|
||||
from .whisper_transcribe import transcribe_video_audio_task
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
|
@ -395,32 +394,23 @@ async def _refine_pause_points_with_whisper(
|
|||
audio_path = os.path.join(temp_dir, "source_audio.mp3")
|
||||
await _extract_audio_for_whisper(video_path, audio_path)
|
||||
|
||||
# Dispatch to Whisper queue and wait for result
|
||||
logger.info(f"Dispatching Whisper transcription task for job {job_id}")
|
||||
result = transcribe_video_audio_task.apply_async(
|
||||
args=[job_id, audio_path],
|
||||
queue="whisper"
|
||||
)
|
||||
|
||||
# Wait for transcription to complete (up to 30 minutes)
|
||||
# Run Whisper transcription directly (inline)
|
||||
# Note: We run this inline instead of via Celery subtask because
|
||||
# calling result.get() within a task causes deadlocks
|
||||
logger.info(f"Running Whisper transcription inline for job {job_id}")
|
||||
try:
|
||||
transcription_result = result.get(timeout=1800)
|
||||
# Run Whisper in a thread pool to avoid blocking async loop
|
||||
words = await asyncio.to_thread(whisper_service.transcribe_audio, audio_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Whisper transcription failed for job {job_id}: {e}")
|
||||
return analysis, [f"Whisper transcription failed: {str(e)} - using original Gemini timestamps"]
|
||||
|
||||
# Convert to WordTimestamp objects
|
||||
words = [
|
||||
WordTimestamp(word=w["word"], start=w["start"], end=w["end"])
|
||||
for w in transcription_result.get("words", [])
|
||||
]
|
||||
|
||||
if words:
|
||||
# Cache the transcript
|
||||
cache_data = CachedWhisperTranscript(
|
||||
words=[CachedWordTimestamp(word=w.word, start=w.start, end=w.end) for w in words],
|
||||
model_name=settings.whisper_model,
|
||||
audio_duration=transcription_result.get("audio_duration", 0),
|
||||
audio_duration=0, # We don't track duration when running inline
|
||||
created_at=datetime.utcnow().isoformat()
|
||||
)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue