ac-tool/backend/server/jobs/manager.py
Vadym Samoilenko 72c50b2c92 Initial commit — AC Tool unified application
Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI)
into a single Docker app with React/TypeScript frontend.

Features:
- Brief upload → AI extraction → review → Activation Calendar import
- Handsontable v17 spreadsheet with dependent dropdowns (148 categories)
- AI natural language commands via Gemini (YOLO mode, voice input)
- Azure AD MSAL SPA PKCE authentication, user roles (user/admin)
- CSV Activation Calendar export
- Real-time WebSocket job progress
- Admin: user management, dropdown Excel upload
- Multi-stage Dockerfile, docker-compose, nginx proxy instructions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 13:24:46 +00:00

338 lines
No EOL
10 KiB
Python
Executable file

"""
Job manager for handling job queue, registry, and lifecycle
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from threading import RLock
from .models import Job, JobPhase, ModelConfiguration, ModelInfo
from .storage import StorageManager
from ..config_runtime import server_config
logger = logging.getLogger(__name__)
class JobManager:
"""
Manages job lifecycle, queue, and in-memory registry
Thread-safe singleton for job management
"""
_instance: Optional['JobManager'] = None
_lock = RLock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self._initialized = True
self.jobs: Dict[str, Job] = {}
self.queue: asyncio.Queue = asyncio.Queue()
self.processing_semaphore = asyncio.Semaphore(server_config.MAX_CONCURRENT_JOBS)
self.storage = StorageManager()
self._lock = asyncio.Lock()
logger.info(f"JobManager initialized with concurrency limit: {server_config.MAX_CONCURRENT_JOBS}")
@classmethod
def get_instance(cls) -> 'JobManager':
"""Get the singleton instance"""
return cls()
async def create_job(
self,
file_name: str,
file_size: int,
file_data: bytes,
user_id: str,
model_config: Optional[ModelConfiguration] = None
) -> Job:
"""
Create a new job from uploaded file
Args:
file_name: Original filename
file_size: Size in bytes
file_data: Binary file content
user_id: User identifier
model_config: Model configuration for processing
Returns:
Created job object
"""
# Validate file
is_valid, error_msg = self.storage.validate_file(file_name, file_size)
if not is_valid:
raise ValueError(f"File validation failed: {error_msg}")
# Create job
job = Job.create(
file_name=file_name,
file_size=file_size,
user_id=user_id,
upload_path="", # Will be set after saving
model_config=model_config
)
try:
# Save uploaded file
upload_path = await self.storage.save_uploaded_file(
file_data=file_data,
filename=file_name,
job_id=job.id
)
job.upload_path = upload_path
# Add to registry
async with self._lock:
self.jobs[job.id] = job
# Add to queue
await self.queue.put(job.id)
logger.info(f"Created job {job.id} for file {file_name} (user: {user_id})")
return job
except Exception as e:
logger.error(f"Failed to create job for {file_name}: {e}")
# Cleanup on failure
if job.upload_path:
await self.storage.cleanup_job_files(job.upload_path, None)
raise
async def get_job(self, job_id: str) -> Optional[Job]:
"""Get job by ID"""
async with self._lock:
return self.jobs.get(job_id)
async def update_job(self, job_id: str, **updates) -> bool:
"""
Update job attributes
Args:
job_id: Job identifier
**updates: Attributes to update
Returns:
True if job was found and updated
"""
async with self._lock:
job = self.jobs.get(job_id)
if not job:
return False
for attr, value in updates.items():
if hasattr(job, attr):
setattr(job, attr, value)
job.updated_at = datetime.utcnow()
return True
async def get_user_jobs(
self,
user_id: str,
limit: int = 100,
offset: int = 0
) -> List[Job]:
"""
Get jobs for a specific user
Args:
user_id: User identifier
limit: Maximum number of jobs to return
offset: Number of jobs to skip
Returns:
List of user's jobs, newest first
"""
async with self._lock:
user_jobs = [
job for job in self.jobs.values()
if job.user_id == user_id
]
# Sort by creation time, newest first
user_jobs.sort(key=lambda j: j.created_at, reverse=True)
# Apply pagination
return user_jobs[offset:offset + limit]
async def get_all_jobs(self, limit: int = 100, offset: int = 0) -> List[Job]:
"""
Get all jobs (admin function)
Args:
limit: Maximum number of jobs to return
offset: Number of jobs to skip
Returns:
List of all jobs, newest first
"""
async with self._lock:
all_jobs = list(self.jobs.values())
# Sort by creation time, newest first
all_jobs.sort(key=lambda j: j.created_at, reverse=True)
# Apply pagination
return all_jobs[offset:offset + limit]
async def delete_job(self, job_id: str) -> bool:
"""
Delete a job and clean up its files
Args:
job_id: Job identifier
Returns:
True if job was found and deleted
"""
async with self._lock:
job = self.jobs.get(job_id)
if not job:
return False
# Clean up files
await self.storage.cleanup_job_files(job.upload_path, job.output_path)
# Remove from registry
del self.jobs[job_id]
logger.info(f"Deleted job {job_id}")
return True
async def get_queue_size(self) -> int:
"""Get current queue size"""
return self.queue.qsize()
async def get_active_jobs_count(self) -> int:
"""Get number of jobs currently being processed"""
async with self._lock:
return len([
job for job in self.jobs.values()
if job.phase in [JobPhase.EXTRACT_CONTENT, JobPhase.LLM_ANALYSIS,
JobPhase.CONSOLIDATION, JobPhase.CSV_GENERATION]
])
def serialize_all(self) -> List[Dict]:
"""Serialize all jobs for WebSocket broadcast"""
return [job.to_dict() for job in self.jobs.values()]
async def cleanup_expired_jobs(self) -> int:
"""
Clean up expired jobs and their files
Returns:
Number of jobs cleaned up
"""
cutoff_time = datetime.utcnow() - timedelta(hours=server_config.FILE_RETENTION_HOURS)
cleanup_count = 0
# Get jobs to cleanup
jobs_to_cleanup = []
async with self._lock:
for job_id, job in list(self.jobs.items()):
# Clean up completed/failed jobs older than retention period
if (job.phase in [JobPhase.COMPLETED, JobPhase.FAILED] and
job.updated_at < cutoff_time):
jobs_to_cleanup.append(job_id)
# Clean up identified jobs
for job_id in jobs_to_cleanup:
if await self.delete_job(job_id):
cleanup_count += 1
# Also clean up orphaned files
orphaned_count = await self.storage.cleanup_expired_files()
total_cleaned = cleanup_count + orphaned_count
if total_cleaned > 0:
logger.info(f"Cleaned up {cleanup_count} expired jobs and {orphaned_count} orphaned files")
return total_cleaned
@staticmethod
def get_available_models() -> List[ModelInfo]:
"""
Get list of available models with their information
Returns:
List of available model information
"""
# Import here to avoid circular imports
from core.config import config as core_config
models = []
# Define model information based on existing configuration
model_info_map = {
'openai-gpt51': ModelInfo(
key='openai-gpt51',
name='GPT-5.1',
provider='OpenAI',
description='Latest OpenAI model with advanced reasoning capabilities',
cost_per_1m_input=1.25,
cost_per_1m_output=10.00,
can_be_primary=True,
can_be_consolidation=True
),
'anthropic-opus45': ModelInfo(
key='anthropic-opus45',
name='Claude Opus 4.5',
provider='Anthropic',
description='Highest quality model for complex analysis',
cost_per_1m_input=5.00,
cost_per_1m_output=25.00,
can_be_primary=True,
can_be_consolidation=True
),
'anthropic-sonnet45': ModelInfo(
key='anthropic-sonnet45',
name='Claude Sonnet 4.5',
provider='Anthropic',
description='Balanced performance and cost',
cost_per_1m_input=3.00,
cost_per_1m_output=15.00,
can_be_primary=True,
can_be_consolidation=True
),
'google-gemini31': ModelInfo(
key='google-gemini31',
name='Gemini 3.1 Pro',
provider='Google',
description='Cost-effective model with high context limit',
cost_per_1m_input=1.25,
cost_per_1m_output=5.00,
can_be_primary=True,
can_be_consolidation=True
)
}
# Return models that exist in the configuration
for model_key in core_config.MODEL_MAPPINGS.keys():
if model_key in model_info_map:
models.append(model_info_map[model_key])
return models
def get_default_model_config() -> ModelConfiguration:
"""Get default model configuration"""
from core.config import config as core_config
return ModelConfiguration(
primary_models=core_config.get_default_primary_models(),
consolidation_model=core_config.DEFAULT_CONSOLIDATION_MODEL,
minimum_success_threshold=core_config.MINIMUM_SUCCESS_THRESHOLD
)
# Global instance
job_manager = JobManager.get_instance()