rackham-meeting-analyzer/backend/app/services/queue.py
2025-11-03 08:15:51 -06:00

212 lines
6.7 KiB
Python

"""
Job queue service for single-concurrency video processing.
Implements FIFO queue with single worker.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
from motor.motor_asyncio import AsyncIOMotorDatabase
from app.models.job import JobStatus
from app.services.gemini import analyze_video_singlepass
from app.services.storage import storage_service
from app.core.config import settings
logger = logging.getLogger(__name__)
class JobQueue:
"""Single-concurrency FIFO job queue"""
def __init__(self, db: AsyncIOMotorDatabase):
self.db = db
self.is_processing = False
self.worker_task: Optional[asyncio.Task] = None
async def start_worker(self):
"""Start the background worker"""
if self.worker_task is None or self.worker_task.done():
self.worker_task = asyncio.create_task(self._worker_loop())
logger.info("Job queue worker started")
async def stop_worker(self):
"""Stop the background worker"""
if self.worker_task and not self.worker_task.done():
self.worker_task.cancel()
try:
await self.worker_task
except asyncio.CancelledError:
pass
logger.info("Job queue worker stopped")
async def _worker_loop(self):
"""Main worker loop - processes jobs from queue"""
while True:
try:
# Check if already processing
if self.is_processing:
await asyncio.sleep(5)
continue
# Find next pending job
job = await self.db.jobs.find_one(
{"status": JobStatus.UPLOADED},
sort=[("created_at", 1)] # FIFO
)
if job is None:
# No jobs to process, wait and check again
await asyncio.sleep(5)
continue
# Process the job
self.is_processing = True
await self._process_job(job["_id"])
self.is_processing = False
except asyncio.CancelledError:
logger.info("Worker loop cancelled")
break
except Exception as e:
logger.error(f"Worker loop error: {str(e)}", exc_info=True)
self.is_processing = False
await asyncio.sleep(10) # Wait before retrying
async def _process_job(self, job_id: str):
"""Process a single job"""
logger.info(f"Processing job {job_id}")
try:
# Update status to processing
await self.db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.PROCESSING,
"progress": 10.0,
"updated_at": datetime.utcnow()
}
}
)
# Get video path
video_path = storage_service.get_video_path(job_id)
if video_path is None:
raise Exception("Video file not found")
# Update progress
await self.db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"progress": 20.0,
"updated_at": datetime.utcnow()
}
}
)
# Call Gemini for analysis
logger.info(f"Starting Gemini analysis for job {job_id}")
analysis_data, error = await analyze_video_singlepass(str(video_path))
if error or analysis_data is None:
# Analysis failed
await self.db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.FAILED,
"error_message": error or "Unknown error",
"updated_at": datetime.utcnow()
}
}
)
logger.error(f"Job {job_id} failed: {error}")
return
# Update progress
await self.db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"progress": 90.0,
"updated_at": datetime.utcnow()
}
}
)
# Get job info for user_id
job = await self.db.jobs.find_one({"_id": job_id})
if job is None:
raise Exception("Job not found")
# Save analysis to database
expires_at = datetime.utcnow() + timedelta(days=settings.DATA_RETENTION_DAYS)
await self.db.analyses.insert_one({
"_id": job_id, # Use same ID as job
"job_id": job_id,
"user_id": job["user_id"],
"data": analysis_data,
"created_at": datetime.utcnow(),
"expires_at": expires_at
})
# Mark job as completed
await self.db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.COMPLETED,
"progress": 100.0,
"completed_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
}
)
logger.info(f"Job {job_id} completed successfully")
except Exception as e:
# Job failed
logger.error(f"Job {job_id} processing error: {str(e)}", exc_info=True)
await self.db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.FAILED,
"error_message": str(e),
"updated_at": datetime.utcnow()
}
}
)
# Global queue instance (will be initialized with db in main.py)
_job_queue: Optional[JobQueue] = None
def init_queue(db: AsyncIOMotorDatabase):
"""Initialize the job queue"""
global _job_queue
_job_queue = JobQueue(db)
async def start_queue():
"""Start the queue worker"""
if _job_queue:
await _job_queue.start_worker()
async def stop_queue():
"""Stop the queue worker"""
if _job_queue:
await _job_queue.stop_worker()
def get_queue() -> JobQueue:
"""Get the job queue instance"""
if _job_queue is None:
raise RuntimeError("Job queue not initialized")
return _job_queue