Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI) into a single Docker app with React/TypeScript frontend. Features: - Brief upload → AI extraction → review → Activation Calendar import - Handsontable v17 spreadsheet with dependent dropdowns (148 categories) - AI natural language commands via Gemini (YOLO mode, voice input) - Azure AD MSAL SPA PKCE authentication, user roles (user/admin) - CSV Activation Calendar export - Real-time WebSocket job progress - Admin: user management, dropdown Excel upload - Multi-stage Dockerfile, docker-compose, nginx proxy instructions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
338 lines
No EOL
10 KiB
Python
Executable file
338 lines
No EOL
10 KiB
Python
Executable file
"""
|
|
Job manager for handling job queue, registry, and lifecycle
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
from threading import RLock
|
|
|
|
from .models import Job, JobPhase, ModelConfiguration, ModelInfo
|
|
from .storage import StorageManager
|
|
from ..config_runtime import server_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class JobManager:
|
|
"""
|
|
Manages job lifecycle, queue, and in-memory registry
|
|
Thread-safe singleton for job management
|
|
"""
|
|
|
|
_instance: Optional['JobManager'] = None
|
|
_lock = RLock()
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
with cls._lock:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if hasattr(self, '_initialized'):
|
|
return
|
|
|
|
self._initialized = True
|
|
self.jobs: Dict[str, Job] = {}
|
|
self.queue: asyncio.Queue = asyncio.Queue()
|
|
self.processing_semaphore = asyncio.Semaphore(server_config.MAX_CONCURRENT_JOBS)
|
|
self.storage = StorageManager()
|
|
self._lock = asyncio.Lock()
|
|
|
|
logger.info(f"JobManager initialized with concurrency limit: {server_config.MAX_CONCURRENT_JOBS}")
|
|
|
|
@classmethod
|
|
def get_instance(cls) -> 'JobManager':
|
|
"""Get the singleton instance"""
|
|
return cls()
|
|
|
|
async def create_job(
|
|
self,
|
|
file_name: str,
|
|
file_size: int,
|
|
file_data: bytes,
|
|
user_id: str,
|
|
model_config: Optional[ModelConfiguration] = None
|
|
) -> Job:
|
|
"""
|
|
Create a new job from uploaded file
|
|
|
|
Args:
|
|
file_name: Original filename
|
|
file_size: Size in bytes
|
|
file_data: Binary file content
|
|
user_id: User identifier
|
|
model_config: Model configuration for processing
|
|
|
|
Returns:
|
|
Created job object
|
|
"""
|
|
# Validate file
|
|
is_valid, error_msg = self.storage.validate_file(file_name, file_size)
|
|
if not is_valid:
|
|
raise ValueError(f"File validation failed: {error_msg}")
|
|
|
|
# Create job
|
|
job = Job.create(
|
|
file_name=file_name,
|
|
file_size=file_size,
|
|
user_id=user_id,
|
|
upload_path="", # Will be set after saving
|
|
model_config=model_config
|
|
)
|
|
|
|
try:
|
|
# Save uploaded file
|
|
upload_path = await self.storage.save_uploaded_file(
|
|
file_data=file_data,
|
|
filename=file_name,
|
|
job_id=job.id
|
|
)
|
|
job.upload_path = upload_path
|
|
|
|
# Add to registry
|
|
async with self._lock:
|
|
self.jobs[job.id] = job
|
|
|
|
# Add to queue
|
|
await self.queue.put(job.id)
|
|
|
|
logger.info(f"Created job {job.id} for file {file_name} (user: {user_id})")
|
|
return job
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create job for {file_name}: {e}")
|
|
# Cleanup on failure
|
|
if job.upload_path:
|
|
await self.storage.cleanup_job_files(job.upload_path, None)
|
|
raise
|
|
|
|
async def get_job(self, job_id: str) -> Optional[Job]:
|
|
"""Get job by ID"""
|
|
async with self._lock:
|
|
return self.jobs.get(job_id)
|
|
|
|
async def update_job(self, job_id: str, **updates) -> bool:
|
|
"""
|
|
Update job attributes
|
|
|
|
Args:
|
|
job_id: Job identifier
|
|
**updates: Attributes to update
|
|
|
|
Returns:
|
|
True if job was found and updated
|
|
"""
|
|
async with self._lock:
|
|
job = self.jobs.get(job_id)
|
|
if not job:
|
|
return False
|
|
|
|
for attr, value in updates.items():
|
|
if hasattr(job, attr):
|
|
setattr(job, attr, value)
|
|
|
|
job.updated_at = datetime.utcnow()
|
|
return True
|
|
|
|
async def get_user_jobs(
|
|
self,
|
|
user_id: str,
|
|
limit: int = 100,
|
|
offset: int = 0
|
|
) -> List[Job]:
|
|
"""
|
|
Get jobs for a specific user
|
|
|
|
Args:
|
|
user_id: User identifier
|
|
limit: Maximum number of jobs to return
|
|
offset: Number of jobs to skip
|
|
|
|
Returns:
|
|
List of user's jobs, newest first
|
|
"""
|
|
async with self._lock:
|
|
user_jobs = [
|
|
job for job in self.jobs.values()
|
|
if job.user_id == user_id
|
|
]
|
|
|
|
# Sort by creation time, newest first
|
|
user_jobs.sort(key=lambda j: j.created_at, reverse=True)
|
|
|
|
# Apply pagination
|
|
return user_jobs[offset:offset + limit]
|
|
|
|
async def get_all_jobs(self, limit: int = 100, offset: int = 0) -> List[Job]:
|
|
"""
|
|
Get all jobs (admin function)
|
|
|
|
Args:
|
|
limit: Maximum number of jobs to return
|
|
offset: Number of jobs to skip
|
|
|
|
Returns:
|
|
List of all jobs, newest first
|
|
"""
|
|
async with self._lock:
|
|
all_jobs = list(self.jobs.values())
|
|
|
|
# Sort by creation time, newest first
|
|
all_jobs.sort(key=lambda j: j.created_at, reverse=True)
|
|
|
|
# Apply pagination
|
|
return all_jobs[offset:offset + limit]
|
|
|
|
async def delete_job(self, job_id: str) -> bool:
|
|
"""
|
|
Delete a job and clean up its files
|
|
|
|
Args:
|
|
job_id: Job identifier
|
|
|
|
Returns:
|
|
True if job was found and deleted
|
|
"""
|
|
async with self._lock:
|
|
job = self.jobs.get(job_id)
|
|
if not job:
|
|
return False
|
|
|
|
# Clean up files
|
|
await self.storage.cleanup_job_files(job.upload_path, job.output_path)
|
|
|
|
# Remove from registry
|
|
del self.jobs[job_id]
|
|
|
|
logger.info(f"Deleted job {job_id}")
|
|
return True
|
|
|
|
async def get_queue_size(self) -> int:
|
|
"""Get current queue size"""
|
|
return self.queue.qsize()
|
|
|
|
async def get_active_jobs_count(self) -> int:
|
|
"""Get number of jobs currently being processed"""
|
|
async with self._lock:
|
|
return len([
|
|
job for job in self.jobs.values()
|
|
if job.phase in [JobPhase.EXTRACT_CONTENT, JobPhase.LLM_ANALYSIS,
|
|
JobPhase.CONSOLIDATION, JobPhase.CSV_GENERATION]
|
|
])
|
|
|
|
def serialize_all(self) -> List[Dict]:
|
|
"""Serialize all jobs for WebSocket broadcast"""
|
|
return [job.to_dict() for job in self.jobs.values()]
|
|
|
|
async def cleanup_expired_jobs(self) -> int:
|
|
"""
|
|
Clean up expired jobs and their files
|
|
|
|
Returns:
|
|
Number of jobs cleaned up
|
|
"""
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=server_config.FILE_RETENTION_HOURS)
|
|
cleanup_count = 0
|
|
|
|
# Get jobs to cleanup
|
|
jobs_to_cleanup = []
|
|
async with self._lock:
|
|
for job_id, job in list(self.jobs.items()):
|
|
# Clean up completed/failed jobs older than retention period
|
|
if (job.phase in [JobPhase.COMPLETED, JobPhase.FAILED] and
|
|
job.updated_at < cutoff_time):
|
|
jobs_to_cleanup.append(job_id)
|
|
|
|
# Clean up identified jobs
|
|
for job_id in jobs_to_cleanup:
|
|
if await self.delete_job(job_id):
|
|
cleanup_count += 1
|
|
|
|
# Also clean up orphaned files
|
|
orphaned_count = await self.storage.cleanup_expired_files()
|
|
|
|
total_cleaned = cleanup_count + orphaned_count
|
|
if total_cleaned > 0:
|
|
logger.info(f"Cleaned up {cleanup_count} expired jobs and {orphaned_count} orphaned files")
|
|
|
|
return total_cleaned
|
|
|
|
@staticmethod
|
|
def get_available_models() -> List[ModelInfo]:
|
|
"""
|
|
Get list of available models with their information
|
|
|
|
Returns:
|
|
List of available model information
|
|
"""
|
|
# Import here to avoid circular imports
|
|
from core.config import config as core_config
|
|
|
|
models = []
|
|
|
|
# Define model information based on existing configuration
|
|
model_info_map = {
|
|
'openai-gpt51': ModelInfo(
|
|
key='openai-gpt51',
|
|
name='GPT-5.1',
|
|
provider='OpenAI',
|
|
description='Latest OpenAI model with advanced reasoning capabilities',
|
|
cost_per_1m_input=1.25,
|
|
cost_per_1m_output=10.00,
|
|
can_be_primary=True,
|
|
can_be_consolidation=True
|
|
),
|
|
'anthropic-opus45': ModelInfo(
|
|
key='anthropic-opus45',
|
|
name='Claude Opus 4.5',
|
|
provider='Anthropic',
|
|
description='Highest quality model for complex analysis',
|
|
cost_per_1m_input=5.00,
|
|
cost_per_1m_output=25.00,
|
|
can_be_primary=True,
|
|
can_be_consolidation=True
|
|
),
|
|
'anthropic-sonnet45': ModelInfo(
|
|
key='anthropic-sonnet45',
|
|
name='Claude Sonnet 4.5',
|
|
provider='Anthropic',
|
|
description='Balanced performance and cost',
|
|
cost_per_1m_input=3.00,
|
|
cost_per_1m_output=15.00,
|
|
can_be_primary=True,
|
|
can_be_consolidation=True
|
|
),
|
|
'google-gemini31': ModelInfo(
|
|
key='google-gemini31',
|
|
name='Gemini 3.1 Pro',
|
|
provider='Google',
|
|
description='Cost-effective model with high context limit',
|
|
cost_per_1m_input=1.25,
|
|
cost_per_1m_output=5.00,
|
|
can_be_primary=True,
|
|
can_be_consolidation=True
|
|
)
|
|
}
|
|
|
|
# Return models that exist in the configuration
|
|
for model_key in core_config.MODEL_MAPPINGS.keys():
|
|
if model_key in model_info_map:
|
|
models.append(model_info_map[model_key])
|
|
|
|
return models
|
|
|
|
def get_default_model_config() -> ModelConfiguration:
|
|
"""Get default model configuration"""
|
|
from core.config import config as core_config
|
|
|
|
return ModelConfiguration(
|
|
primary_models=core_config.get_default_primary_models(),
|
|
consolidation_model=core_config.DEFAULT_CONSOLIDATION_MODEL,
|
|
minimum_success_threshold=core_config.MINIMUM_SUCCESS_THRESHOLD
|
|
)
|
|
|
|
# Global instance
|
|
job_manager = JobManager.get_instance() |