136 lines
5.1 KiB
Python
136 lines
5.1 KiB
Python
import asyncio
|
|
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
from ..models.job import JobStatus
|
|
from . import celery_app
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@celery_app.task(
|
|
bind=True,
|
|
acks_late=True, # Acknowledge task only after completion
|
|
reject_on_worker_lost=True, # Retry if worker crashes
|
|
autoretry_for=(Exception,), # Auto-retry on any exception
|
|
retry_kwargs={'max_retries': None, 'countdown': 60}, # Infinite retries with 60s delay
|
|
retry_backoff=True, # Exponential backoff
|
|
)
|
|
def start_change_stream_watcher(self):
|
|
"""Start MongoDB change stream watcher for job status changes"""
|
|
try:
|
|
asyncio.run(_watch_job_changes())
|
|
except Exception as e:
|
|
logger.error(f"Change stream watcher failed: {e}")
|
|
# Task will auto-retry due to configuration
|
|
raise
|
|
|
|
|
|
async def _watch_job_changes():
|
|
"""Watch MongoDB change streams for job status updates"""
|
|
client = AsyncIOMotorClient(settings.mongodb_uri)
|
|
db = client[settings.mongodb_db]
|
|
|
|
logger.info("Starting MongoDB change stream watcher")
|
|
|
|
try:
|
|
# Add a heartbeat mechanism to ensure the connection stays alive
|
|
await client.admin.command('ping')
|
|
logger.info("MongoDB connection verified")
|
|
# Watch for changes to the jobs collection
|
|
pipeline = [
|
|
{
|
|
"$match": {
|
|
"operationType": "update",
|
|
"fullDocument.status": {
|
|
"$in": [
|
|
JobStatus.APPROVED_ENGLISH.value,
|
|
JobStatus.COMPLETED.value
|
|
]
|
|
}
|
|
}
|
|
}
|
|
]
|
|
|
|
async with db.jobs.watch(
|
|
pipeline,
|
|
full_document="updateLookup",
|
|
max_await_time_ms=30000, # 30 second timeout for getMore operations
|
|
batch_size=10 # Process changes in small batches
|
|
) as stream:
|
|
logger.info("Change stream watcher active, waiting for job status changes...")
|
|
|
|
async for change in stream:
|
|
try:
|
|
job_doc = change["fullDocument"]
|
|
if not job_doc:
|
|
logger.warning("Received change event without fullDocument")
|
|
continue
|
|
|
|
job_id = str(job_doc["_id"])
|
|
status = job_doc["status"]
|
|
|
|
logger.info(f"Job {job_id} status changed to {status}")
|
|
|
|
if status == JobStatus.APPROVED_ENGLISH.value:
|
|
# Trigger translation and synthesis
|
|
from .translate_and_synthesize import translate_and_synthesize_task
|
|
translate_and_synthesize_task.delay(job_id)
|
|
logger.info(f"Enqueued translation task for job {job_id}")
|
|
|
|
elif status == JobStatus.COMPLETED.value:
|
|
# Trigger client notification
|
|
from .notify import notify_client_task
|
|
notify_client_task.delay(job_id)
|
|
logger.info(f"Enqueued notification task for job {job_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing change stream event: {e}")
|
|
# Continue processing other events
|
|
continue
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "replica sets" in error_msg:
|
|
logger.warning("Change stream watcher not available - MongoDB not configured as replica set")
|
|
logger.info("This is normal in development. Job progression works via immediate triggering in approval endpoint.")
|
|
else:
|
|
logger.error(f"Change stream watcher failed: {e}")
|
|
# Don't re-raise in development to prevent worker crashes
|
|
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
# Auto-start the watcher when the worker starts
|
|
@celery_app.task(
|
|
bind=True,
|
|
autoretry_for=(Exception,),
|
|
retry_kwargs={'max_retries': 3, 'countdown': 30}
|
|
)
|
|
def ensure_watcher_running(self):
|
|
"""Ensure the change stream watcher is running"""
|
|
try:
|
|
# Check if watcher is already running
|
|
active_tasks = celery_app.control.inspect().active()
|
|
|
|
if not active_tasks:
|
|
logger.warning("Could not inspect active tasks - starting watcher anyway")
|
|
else:
|
|
# Look for running watcher
|
|
for worker, tasks in active_tasks.items():
|
|
if tasks: # Check if tasks list is not None
|
|
for task in tasks:
|
|
if task.get("name") == "app.tasks.watchers.start_change_stream_watcher":
|
|
logger.info(f"Change stream watcher already running on worker {worker}")
|
|
return
|
|
|
|
# Start the watcher
|
|
result = start_change_stream_watcher.delay()
|
|
logger.info(f"Started change stream watcher with task ID: {result.id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to ensure watcher is running: {e}")
|
|
raise # Will trigger retry
|