diff --git a/backend/app/tasks/_websocket_bridge.py b/backend/app/tasks/_websocket_bridge.py new file mode 100644 index 0000000..7c11daf --- /dev/null +++ b/backend/app/tasks/_websocket_bridge.py @@ -0,0 +1,57 @@ +"""Synchronous WebSocket broadcast helper for Celery workers. + +Celery tasks run outside the FastAPI event loop. This module provides a +sync Redis publish so tasks can notify connected clients without asyncio. +""" +import traceback +from datetime import datetime +from typing import Optional + +import redis as sync_redis + +from ..core.config import settings +from ..core.logging import get_logger +from ..services.websocket import JobStatusUpdate + +logger = get_logger(__name__) + + +def broadcast_status_update( + job_id: str, + status: str, + job_title: Optional[str] = None, + message: Optional[str] = None, + progress: Optional[int] = None, +) -> None: + """Publish a job-status event to Redis so WebSocket subscribers receive it. + + Swallows all exceptions — a broadcast failure must never abort a task. + """ + try: + update = JobStatusUpdate( + job_id=job_id, + status=status, + updated_at=datetime.utcnow(), + job_title=job_title, + message=message, + progress=progress, + ) + payload = update.model_dump_json() + + redis_client = sync_redis.Redis.from_url( + settings.redis_url, + encoding="utf-8", + decode_responses=True, + ) + try: + redis_client.publish("job_status_updates", payload) + redis_client.publish(f"job_status_updates:{job_id}", payload) + finally: + redis_client.close() + + logger.info("Broadcast status update job=%s status=%s", job_id, status) + + except Exception: + logger.error( + "Failed to broadcast status update job=%s\n%s", job_id, traceback.format_exc() + ) diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index 1d473a8..d121fcf 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -16,63 +16,11 @@ from ..services.gcs import gcs_service, upload_vtt_to_gcs from ..services.gemini import gemini_service from ..services.websocket import connection_manager from . import celery_app +from ._websocket_bridge import broadcast_status_update logger = get_logger(__name__) -def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None): - """ - Helper function to broadcast job status updates via WebSocket - Uses sync Redis client for Celery worker context - """ - logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, job_title={job_title}, message={message}") - try: - import redis as sync_redis - from ..core.config import settings - from ..services.websocket import JobStatusUpdate - from datetime import datetime - - logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}") - - # Create status update - update = JobStatusUpdate( - job_id=job_id, - status=status, - updated_at=datetime.utcnow(), - job_title=job_title, - message=message, - progress=progress - ) - - logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}") - - # Create synchronous Redis client - redis_client = sync_redis.Redis.from_url( - settings.redis_url, - encoding="utf-8", - decode_responses=True - ) - - logger.info(f"🔊 Redis client created, now publishing to channels") - - # Publish to channels - result1 = redis_client.publish("job_status_updates", update.model_dump_json()) - result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json()) - - logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers") - - # Close connection - redis_client.close() - - logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}") - - except Exception as e: - logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}") - import traceback - logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}") - # Don't let WebSocket failures break the worker task - - class AsyncTask(Task): """Base task class that supports async execution""" def __call__(self, *args, **kwargs): diff --git a/backend/app/tasks/render_accessible_video.py b/backend/app/tasks/render_accessible_video.py index 4c4b903..b7cd395 100644 --- a/backend/app/tasks/render_accessible_video.py +++ b/backend/app/tasks/render_accessible_video.py @@ -18,7 +18,7 @@ from ..services.video_renderer import video_renderer_service from ..services.vtt_retimer import vtt_retimer_service from ..services.whisper_service import WordTimestamp, whisper_service from . import celery_app -from .translate_and_synthesize import broadcast_status_update +from ._websocket_bridge import broadcast_status_update from .tts_synthesis import parse_cue_index_from_blob_name from .whisper_transcribe import transcribe_video_audio_task diff --git a/backend/app/tasks/rerender_accessible_video.py b/backend/app/tasks/rerender_accessible_video.py index 0203d4e..3ea4cad 100644 --- a/backend/app/tasks/rerender_accessible_video.py +++ b/backend/app/tasks/rerender_accessible_video.py @@ -20,7 +20,7 @@ from ..services.vtt_retimer import vtt_retimer_service from ..services.whisper_service import WordTimestamp, whisper_service from . import celery_app from .render_accessible_video import _extract_audio_for_whisper, _dispatch_whisper_transcription -from .translate_and_synthesize import broadcast_status_update +from ._websocket_bridge import broadcast_status_update from .tts_synthesis import dispatch_language_tts, parse_ad_cues, parse_cue_index_from_blob_name, synthesize_cue_task logger = get_logger(__name__) diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index 4a84a32..12251e6 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -19,6 +19,7 @@ from ..services.gemini import gemini_service from ..services.gemini_tts import TTSSynthesisError from ..services.websocket import connection_manager from . import celery_app +from ._websocket_bridge import broadcast_status_update logger = get_logger(__name__) @@ -26,58 +27,6 @@ logger = get_logger(__name__) MAX_CONCURRENT_VIDEO_NATIVE = 3 -def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None): - """ - Helper function to broadcast job status updates via WebSocket - Uses sync Redis client for Celery worker context - """ - logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, job_title={job_title}, message={message}") - try: - import redis as sync_redis - from ..core.config import settings - from ..services.websocket import JobStatusUpdate - from datetime import datetime - - logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}") - - # Create status update - update = JobStatusUpdate( - job_id=job_id, - status=status, - updated_at=datetime.utcnow(), - job_title=job_title, - message=message, - progress=progress - ) - - logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}") - - # Create synchronous Redis client - redis_client = sync_redis.Redis.from_url( - settings.redis_url, - encoding="utf-8", - decode_responses=True - ) - - logger.info(f"🔊 Redis client created, now publishing to channels") - - # Publish to channels - result1 = redis_client.publish("job_status_updates", update.model_dump_json()) - result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json()) - - logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers") - - # Close connection - redis_client.close() - - logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}") - - except Exception as e: - logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}") - import traceback - logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}") - - async def retry_with_backoff(func, max_retries=3, base_delay=1): """Retry a function with exponential backoff""" last_exception = None