refactor: extract broadcast_status_update to shared _websocket_bridge (H-08)

The function was copy-pasted identically in ingest_and_ai.py and
translate_and_synthesize.py. Extracted to tasks/_websocket_bridge.py
as the single definition; all four task modules now import from there.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-29 14:14:57 +01:00
parent 87ae6571fe
commit 86ef5a86fb
5 changed files with 61 additions and 107 deletions

View file

@ -0,0 +1,57 @@
"""Synchronous WebSocket broadcast helper for Celery workers.
Celery tasks run outside the FastAPI event loop. This module provides a
sync Redis publish so tasks can notify connected clients without asyncio.
"""
import traceback
from datetime import datetime
from typing import Optional
import redis as sync_redis
from ..core.config import settings
from ..core.logging import get_logger
from ..services.websocket import JobStatusUpdate
logger = get_logger(__name__)
def broadcast_status_update(
job_id: str,
status: str,
job_title: Optional[str] = None,
message: Optional[str] = None,
progress: Optional[int] = None,
) -> None:
"""Publish a job-status event to Redis so WebSocket subscribers receive it.
Swallows all exceptions a broadcast failure must never abort a task.
"""
try:
update = JobStatusUpdate(
job_id=job_id,
status=status,
updated_at=datetime.utcnow(),
job_title=job_title,
message=message,
progress=progress,
)
payload = update.model_dump_json()
redis_client = sync_redis.Redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True,
)
try:
redis_client.publish("job_status_updates", payload)
redis_client.publish(f"job_status_updates:{job_id}", payload)
finally:
redis_client.close()
logger.info("Broadcast status update job=%s status=%s", job_id, status)
except Exception:
logger.error(
"Failed to broadcast status update job=%s\n%s", job_id, traceback.format_exc()
)

View file

@ -16,63 +16,11 @@ from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.websocket import connection_manager
from . import celery_app
from ._websocket_bridge import broadcast_status_update
logger = get_logger(__name__)
def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None):
"""
Helper function to broadcast job status updates via WebSocket
Uses sync Redis client for Celery worker context
"""
logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, job_title={job_title}, message={message}")
try:
import redis as sync_redis
from ..core.config import settings
from ..services.websocket import JobStatusUpdate
from datetime import datetime
logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}")
# Create status update
update = JobStatusUpdate(
job_id=job_id,
status=status,
updated_at=datetime.utcnow(),
job_title=job_title,
message=message,
progress=progress
)
logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}")
# Create synchronous Redis client
redis_client = sync_redis.Redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
logger.info(f"🔊 Redis client created, now publishing to channels")
# Publish to channels
result1 = redis_client.publish("job_status_updates", update.model_dump_json())
result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json())
logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers")
# Close connection
redis_client.close()
logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}")
except Exception as e:
logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}")
import traceback
logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}")
# Don't let WebSocket failures break the worker task
class AsyncTask(Task):
"""Base task class that supports async execution"""
def __call__(self, *args, **kwargs):

View file

@ -18,7 +18,7 @@ from ..services.video_renderer import video_renderer_service
from ..services.vtt_retimer import vtt_retimer_service
from ..services.whisper_service import WordTimestamp, whisper_service
from . import celery_app
from .translate_and_synthesize import broadcast_status_update
from ._websocket_bridge import broadcast_status_update
from .tts_synthesis import parse_cue_index_from_blob_name
from .whisper_transcribe import transcribe_video_audio_task

View file

@ -20,7 +20,7 @@ from ..services.vtt_retimer import vtt_retimer_service
from ..services.whisper_service import WordTimestamp, whisper_service
from . import celery_app
from .render_accessible_video import _extract_audio_for_whisper, _dispatch_whisper_transcription
from .translate_and_synthesize import broadcast_status_update
from ._websocket_bridge import broadcast_status_update
from .tts_synthesis import dispatch_language_tts, parse_ad_cues, parse_cue_index_from_blob_name, synthesize_cue_task
logger = get_logger(__name__)

View file

@ -19,6 +19,7 @@ from ..services.gemini import gemini_service
from ..services.gemini_tts import TTSSynthesisError
from ..services.websocket import connection_manager
from . import celery_app
from ._websocket_bridge import broadcast_status_update
logger = get_logger(__name__)
@ -26,58 +27,6 @@ logger = get_logger(__name__)
MAX_CONCURRENT_VIDEO_NATIVE = 3
def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None):
"""
Helper function to broadcast job status updates via WebSocket
Uses sync Redis client for Celery worker context
"""
logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, job_title={job_title}, message={message}")
try:
import redis as sync_redis
from ..core.config import settings
from ..services.websocket import JobStatusUpdate
from datetime import datetime
logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}")
# Create status update
update = JobStatusUpdate(
job_id=job_id,
status=status,
updated_at=datetime.utcnow(),
job_title=job_title,
message=message,
progress=progress
)
logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}")
# Create synchronous Redis client
redis_client = sync_redis.Redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
logger.info(f"🔊 Redis client created, now publishing to channels")
# Publish to channels
result1 = redis_client.publish("job_status_updates", update.model_dump_json())
result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json())
logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers")
# Close connection
redis_client.close()
logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}")
except Exception as e:
logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}")
import traceback
logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}")
async def retry_with_backoff(func, max_retries=3, base_delay=1):
"""Retry a function with exponential backoff"""
last_exception = None