""" Synchronous WebSocket Publisher for Celery Workers This module provides a synchronous Redis publisher for broadcasting job status updates from Celery workers. It maintains a persistent Redis connection pool to avoid connection overhead per publish operation. """ import logging import threading from datetime import datetime from typing import Any, Optional import redis from pydantic import BaseModel from ..core.config import settings logger = logging.getLogger(__name__) # Channel name constants CHANNEL_GLOBAL = "job_status_updates" CHANNEL_JOB_FMT = "job_status_updates:{job_id}" class JobStatusUpdate(BaseModel): """Schema for job status update messages""" job_id: str status: str updated_at: datetime job_title: Optional[str] = None message: Optional[str] = None progress: Optional[int] = None metadata: Optional[dict[str, Any]] = None eligible_users: Optional[set[str]] = None # Pre-computed eligible users class WebSocketPublisher: """Synchronous Redis publisher for WebSocket updates from Celery workers""" def __init__(self): self._redis_client: Optional[redis.Redis] = None self._lock = threading.Lock() def _get_client(self) -> redis.Redis: """Get or create Redis client (thread-safe)""" if self._redis_client is None: with self._lock: if self._redis_client is None: self._redis_client = redis.Redis.from_url( settings.redis_url, encoding="utf-8", decode_responses=True ) return self._redis_client def publish_job_status_update( self, job_id: str, status: str, job_title: Optional[str] = None, message: Optional[str] = None, progress: Optional[int] = None, metadata: Optional[dict[str, Any]] = None, eligible_users: Optional[set[str]] = None ) -> bool: """ Publish job status update to Redis pub/sub channels Returns True if successful, False otherwise """ try: update = JobStatusUpdate( job_id=job_id, status=status, updated_at=datetime.utcnow(), job_title=job_title, message=message, progress=progress, metadata=metadata, eligible_users=eligible_users ) # Serialize once for efficiency payload = update.model_dump_json(separators=(",", ":")) # Use pipeline for atomic publish client = self._get_client() with client.pipeline() as pipe: # Publish to global channel pipe.publish(CHANNEL_GLOBAL, payload) # Publish to job-specific channel pipe.publish(CHANNEL_JOB_FMT.format(job_id=job_id), payload) pipe.execute() logger.debug(f"Published status update for job {job_id}: {status}") return True except Exception as e: logger.error(f"Failed to publish job status update for {job_id}: {e}") return False def close(self): """Close Redis connection""" if self._redis_client: with self._lock: if self._redis_client: self._redis_client.connection_pool.disconnect() self._redis_client = None logger.info("WebSocket publisher Redis connection closed") # Global publisher instance for Celery workers _publisher = WebSocketPublisher() def publish_job_status_update( job_id: str, status: str, job_title: Optional[str] = None, message: Optional[str] = None, progress: Optional[int] = None, metadata: Optional[dict[str, Any]] = None, eligible_users: Optional[set[str]] = None ) -> bool: """ Convenience function to publish job status update This is the function that Celery workers should use """ return _publisher.publish_job_status_update( job_id=job_id, status=status, job_title=job_title, message=message, progress=progress, metadata=metadata, eligible_users=eligible_users ) def close_publisher(): """Close the global publisher - call this on worker shutdown""" _publisher.close() def get_job_eligible_users(job_id: str) -> set[str]: """ Get eligible users for a job (synchronous version for Celery workers) This should be called at publish time to avoid DB lookups in the hot path """ eligible_users = set() try: # Import MongoDB client (synchronous) from pymongo import MongoClient # Use synchronous MongoDB client for Celery workers client = MongoClient(settings.mongodb_url) db = client[settings.database_name] # Get the job job = db["jobs"].find_one({"_id": job_id}) if not job: logger.warning(f"Job {job_id} not found for eligibility check") return eligible_users # Add job creator if job.get("client_id"): eligible_users.add(job["client_id"]) # Add reviewers from review history review = job.get("review", {}) if review.get("reviewer_id"): eligible_users.add(review["reviewer_id"]) # Add reviewers from history for history_item in review.get("history", []): if history_item.get("by"): eligible_users.add(history_item["by"]) # Add all admin users (they can see all jobs) for admin_user in db["users"].find({"role": "admin"}): user_id = str(admin_user["_id"]) eligible_users.add(user_id) client.close() logger.debug(f"Job {job_id} eligible users: {len(eligible_users)}") except Exception as e: logger.error(f"Error getting eligible users for job {job_id}: {e}") return eligible_users def publish_job_update_with_eligibility( job_id: str, status: str, job_title: Optional[str] = None, message: Optional[str] = None, progress: Optional[int] = None, metadata: Optional[dict[str, Any]] = None ) -> bool: """ Convenience function that computes eligible users and publishes This is the recommended function for Celery workers to use """ eligible_users = get_job_eligible_users(job_id) return publish_job_status_update( job_id=job_id, status=status, job_title=job_title, message=message, progress=progress, metadata=metadata, eligible_users=eligible_users )