ac-tool/backend/server/api/jobs.py
Vadym Samoilenko 5231c8bd37 Fix AI model, Language/Country mapping, and Handsontable rendering
- GEMINI_MODEL for AI commands: gemini-2.0-flash-exp → gemini-3-flash-preview
- Language/Country: handle plain 2-letter codes (EN→Language, UK→Country)
  and "EN-UK" split format; previously only split format worked
- Handsontable black screen: add min-h-0 on flex-1 container so height:100%
  resolves correctly inside the flexbox chain

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 16:38:38 +00:00

627 lines
No EOL
20 KiB
Python
Executable file

"""
Jobs API endpoints for file upload and processing management
"""
import logging
import os
import zipfile
from datetime import datetime
from io import BytesIO
from quart import Blueprint, request, jsonify, send_file, g
import csv
from ..auth.middleware import dev_mode_bypass, auth_required, get_user_id
from ..jobs.models import Job, ModelConfiguration
from ..jobs.manager import JobManager
from ..ws.manager import WebSocketManager
logger = logging.getLogger(__name__)
jobs_bp = Blueprint('jobs', __name__, url_prefix='/api/jobs')
@jobs_bp.route('', methods=['POST'])
@dev_mode_bypass
async def create_jobs():
"""
Create new processing jobs from uploaded files
Accepts multipart/form-data with:
- files: One or more files to process
- modelConfig (optional): JSON string with model configuration
Returns:
List of created job objects
"""
try:
job_manager = JobManager.get_instance()
ws_manager = WebSocketManager()
user_id = get_user_id()
# Get uploaded files
files = await request.files
if not files:
return jsonify({
'error': 'no_files',
'message': 'No files provided for upload'
}), 400
logger.info(f"Received {len(files)} files for upload")
# Get model configuration from form data
form_data = await request.form
model_config_json = form_data.get('modelConfig')
model_config = None
if model_config_json:
try:
import json
model_config_data = json.loads(model_config_json)
model_config = ModelConfiguration.from_dict(model_config_data)
except Exception as e:
return jsonify({
'error': 'invalid_model_config',
'message': f'Invalid model configuration: {e}'
}), 400
created_jobs = []
errors = []
# Process each uploaded file
for field_name, file_storage in files.items():
try:
if not file_storage or not file_storage.filename:
logger.warning(f"Skipping empty file field: {field_name}")
continue
logger.info(f"Processing file: {file_storage.filename}")
# Read file data
file_data = file_storage.read()
file_size = len(file_data)
# Create job
job = await job_manager.create_job(
file_name=file_storage.filename,
file_size=file_size,
file_data=file_data,
user_id=user_id,
model_config=model_config
)
created_jobs.append(job)
logger.info(f"Created and queued job {job.id} for {file_storage.filename}")
# Broadcast job creation
await ws_manager.broadcast_to_user(user_id, {
'type': 'job.created',
'job': job.to_dict()
})
# Broadcast job accepted (when it enters the queue)
await ws_manager.broadcast_to_user(user_id, {
'type': 'job.accepted',
'jobId': job.id
})
logger.info(f"Created job {job.id} for file {file_storage.filename} (user: {user_id})")
except Exception as e:
error_msg = f"Failed to process file {file_storage.filename}: {str(e)}"
errors.append(error_msg)
logger.error(error_msg)
if not created_jobs and errors:
return jsonify({
'error': 'upload_failed',
'message': 'Failed to process any files',
'details': errors
}), 400
return jsonify({
'jobs': [job.to_dict() for job in created_jobs],
'errors': errors
})
except Exception as e:
logger.error(f"Job creation failed: {e}", exc_info=True)
return jsonify({
'error': 'server_error',
'message': 'Failed to create jobs'
}), 500
@jobs_bp.route('', methods=['GET'])
@dev_mode_bypass
async def list_jobs():
"""
List jobs for the current user
Query parameters:
- limit: Maximum number of jobs to return (default: 50, max: 100)
- offset: Number of jobs to skip (default: 0)
- status: Filter by job status (optional)
Returns:
Paginated list of jobs
"""
try:
job_manager = JobManager.get_instance()
user_id = get_user_id()
# Parse query parameters
limit = min(int(request.args.get('limit', 50)), 100)
offset = int(request.args.get('offset', 0))
status_filter = request.args.get('status')
# Get user jobs
jobs = await job_manager.get_user_jobs(user_id, limit, offset)
# Apply status filter if provided
if status_filter:
jobs = [job for job in jobs if job.phase.value.lower() == status_filter.lower()]
return jsonify({
'jobs': [job.to_dict() for job in jobs],
'pagination': {
'limit': limit,
'offset': offset,
'count': len(jobs)
}
})
except Exception as e:
logger.error(f"Failed to list jobs: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to retrieve jobs'
}), 500
@jobs_bp.route('/<job_id>', methods=['GET'])
@dev_mode_bypass
async def get_job(job_id: str):
"""
Get details for a specific job
Returns:
Job details including progress, logs, and results
"""
try:
job_manager = JobManager.get_instance()
user_id = get_user_id()
job = await job_manager.get_job(job_id)
if not job:
return jsonify({
'error': 'not_found',
'message': 'Job not found'
}), 404
# Check if user owns this job (skip check in dev mode)
from ..config_runtime import server_config
if not server_config.DEV_MODE and job.user_id != user_id:
return jsonify({
'error': 'forbidden',
'message': 'Access denied'
}), 403
return jsonify({
'job': job.to_dict()
})
except Exception as e:
logger.error(f"Failed to get job {job_id}: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to retrieve job'
}), 500
@jobs_bp.route('/<job_id>/download', methods=['GET'])
@dev_mode_bypass
async def download_job_result(job_id: str):
"""
Download the CSV result file for a completed job
Returns:
CSV file as download attachment
"""
try:
job_manager = JobManager.get_instance()
user_id = get_user_id()
job = await job_manager.get_job(job_id)
if not job:
return jsonify({
'error': 'not_found',
'message': 'Job not found'
}), 404
# Check if user owns this job (skip check in dev mode)
from ..config_runtime import server_config
if not server_config.DEV_MODE and job.user_id != user_id:
return jsonify({
'error': 'forbidden',
'message': 'Access denied'
}), 403
# Check if job is completed and has output
if not job.output_path or not os.path.exists(job.output_path):
return jsonify({
'error': 'not_ready',
'message': 'Job result not available'
}), 400
# Generate download filename
base_name = os.path.splitext(job.file_name)[0]
download_filename = f"{base_name}-results.csv"
return await send_file(
job.output_path,
as_attachment=True,
attachment_filename=download_filename,
mimetype='text/csv'
)
except Exception as e:
logger.error(f"Download failed for job {job_id}: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to download result'
}), 500
@jobs_bp.route('/<job_id>/logs', methods=['GET'])
@dev_mode_bypass
async def get_job_logs(job_id: str):
"""
Get logs for a specific job
Query parameters:
- limit: Maximum number of log entries (default: 100)
- level: Filter by log level (optional)
Returns:
List of log entries
"""
try:
job_manager = JobManager.get_instance()
user_id = get_user_id()
job = await job_manager.get_job(job_id)
if not job:
return jsonify({
'error': 'not_found',
'message': 'Job not found'
}), 404
# Check if user owns this job (skip check in dev mode)
from ..config_runtime import server_config
if not server_config.DEV_MODE and job.user_id != user_id:
return jsonify({
'error': 'forbidden',
'message': 'Access denied'
}), 403
# Parse query parameters
limit = min(int(request.args.get('limit', 100)), 1000)
level_filter = request.args.get('level')
# Get logs
logs = job.logs
# Apply level filter if provided
if level_filter:
logs = [log for log in logs if log.level.lower() == level_filter.lower()]
# Apply limit
logs = logs[-limit:] if len(logs) > limit else logs
return jsonify({
'logs': [log.to_dict() for log in logs],
'count': len(logs)
})
except Exception as e:
logger.error(f"Failed to get logs for job {job_id}: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to retrieve job logs'
}), 500
@jobs_bp.route('/<job_id>', methods=['DELETE'])
@dev_mode_bypass
async def delete_job(job_id: str):
"""
Delete a job and clean up its files
Returns:
Success confirmation
"""
try:
job_manager = JobManager.get_instance()
ws_manager = WebSocketManager()
user_id = get_user_id()
job = await job_manager.get_job(job_id)
if not job:
return jsonify({
'error': 'not_found',
'message': 'Job not found'
}), 404
# Check if user owns this job (skip check in dev mode)
from ..config_runtime import server_config
if not server_config.DEV_MODE and job.user_id != user_id:
return jsonify({
'error': 'forbidden',
'message': 'Access denied'
}), 403
# Delete job
success = await job_manager.delete_job(job_id)
if success:
# Broadcast deletion
await ws_manager.broadcast_to_user(user_id, {
'type': 'job.deleted',
'jobId': job_id
})
logger.info(f"Deleted job {job_id} (user: {user_id})")
return jsonify({
'message': 'Job deleted successfully'
})
else:
return jsonify({
'error': 'deletion_failed',
'message': 'Failed to delete job'
}), 500
except Exception as e:
logger.error(f"Failed to delete job {job_id}: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to delete job'
}), 500
@jobs_bp.route('/batch-download', methods=['POST'])
@dev_mode_bypass
async def batch_download():
"""
Download multiple job results as a ZIP file
Expects:
{
"jobIds": ["job1", "job2", "job3"]
}
Returns:
ZIP file containing CSV results
"""
try:
job_manager = JobManager.get_instance()
user_id = get_user_id()
data = await request.get_json()
job_ids = data.get('jobIds', [])
if not job_ids:
return jsonify({
'error': 'invalid_request',
'message': 'No job IDs provided'
}), 400
# Create ZIP file in memory
zip_buffer = BytesIO()
csv_files = []
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for job_id in job_ids:
job = await job_manager.get_job(job_id)
if not job:
logger.warning(f"Job {job_id} not found for batch download")
continue
# Check if user owns this job (skip check in dev mode)
from ..config_runtime import server_config
if not server_config.DEV_MODE and job.user_id != user_id:
logger.warning(f"User {user_id} denied access to job {job_id}")
continue
# Check if job has output
if not job.output_path or not os.path.exists(job.output_path):
logger.warning(f"Job {job_id} has no output file")
continue
# Add CSV to ZIP asynchronously
base_name = os.path.splitext(job.file_name)[0]
csv_filename = f"{base_name}-{job_id[:8]}.csv"
# Read file in thread pool to avoid blocking
def _read_csv():
with open(job.output_path, 'rb') as csv_file:
return csv_file.read()
import asyncio
loop = asyncio.get_running_loop()
csv_content = await loop.run_in_executor(None, _read_csv)
zip_file.writestr(csv_filename, csv_content)
csv_files.append(csv_filename)
if not csv_files:
return jsonify({
'error': 'no_results',
'message': 'No completed jobs found for download'
}), 400
zip_buffer.seek(0)
# Generate download filename
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
zip_filename = f"batch_results_{timestamp}.zip"
return await send_file(
zip_buffer,
as_attachment=True,
attachment_filename=zip_filename,
mimetype='application/zip'
)
except Exception as e:
logger.error(f"Batch download failed: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to create batch download'
}), 500
@jobs_bp.route('/cleanup', methods=['POST'])
@dev_mode_bypass
async def cleanup_expired():
"""
Manually trigger cleanup of expired jobs and files
(Admin/maintenance endpoint)
Returns:
Number of items cleaned up
"""
try:
job_manager = JobManager.get_instance()
# Perform cleanup
cleaned_count = await job_manager.cleanup_expired_jobs()
logger.info(f"Manual cleanup completed: {cleaned_count} items removed")
return jsonify({
'message': 'Cleanup completed',
'itemsRemoved': cleaned_count
})
except Exception as e:
logger.error(f"Cleanup failed: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to perform cleanup'
}), 500
@jobs_bp.route('/stats', methods=['GET'])
@dev_mode_bypass
async def get_job_stats():
"""
Get job processing statistics for the current user
Returns:
Statistics about job processing
"""
try:
job_manager = JobManager.get_instance()
user_id = get_user_id()
# Get all user jobs
all_jobs = await job_manager.get_user_jobs(user_id, limit=1000)
# Calculate statistics
total_jobs = len(all_jobs)
completed_jobs = len([j for j in all_jobs if j.phase.value == 'COMPLETED'])
failed_jobs = len([j for j in all_jobs if j.phase.value == 'FAILED'])
active_jobs = len([j for j in all_jobs if j.phase.value not in ['COMPLETED', 'FAILED']])
total_assets = sum(j.summary.assets_extracted for j in all_jobs if j.summary)
total_cost = sum(j.summary.cost_usd_total for j in all_jobs if j.summary)
return jsonify({
'stats': {
'totalJobs': total_jobs,
'completedJobs': completed_jobs,
'failedJobs': failed_jobs,
'activeJobs': active_jobs,
'successRate': completed_jobs / total_jobs if total_jobs > 0 else 0,
'totalAssetsExtracted': total_assets,
'totalCostUsd': round(total_cost, 4)
}
})
except Exception as e:
logger.error(f"Failed to get job stats: {e}")
return jsonify({
'error': 'server_error',
'message': 'Failed to retrieve statistics'
}), 500
@jobs_bp.route('/<job_id>/deliverables', methods=['GET'])
@auth_required
async def get_job_deliverables(job_id: str):
"""
Return extracted deliverables from a completed job as JSON rows
ready for the Review → Import flow.
Reads the output CSV and maps columns to AC Deliverable schema.
"""
try:
job_manager = JobManager.get_instance()
user_id = get_user_id()
job = await job_manager.get_job(job_id)
if not job:
return jsonify({'error': 'not_found'}), 404
from ..config_runtime import server_config
if not server_config.DEV_MODE and job.user_id != user_id:
return jsonify({'error': 'forbidden'}), 403
if not job.output_path or not os.path.exists(job.output_path):
return jsonify({'error': 'not_ready', 'message': 'Job not completed yet'}), 400
deliverables = []
with open(job.output_path, newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# Map brief-extractor CSV columns → AC Deliverable schema
market = row.get('language_country_market', '').strip()
lang, country = ('', '')
if '-' in market:
parts = market.split('-', 1)
lang = parts[0].strip().upper()
country = parts[1].strip().upper()
elif len(market) == 2:
# Plain 2-letter code — could be language (EN, DE, FR) or country (UK, US)
# Common language codes get mapped to Language; country-only codes to Country
LANG_CODES = {'EN', 'DE', 'FR', 'NL', 'ES', 'IT', 'PT', 'PL', 'RU', 'JA', 'ZH', 'AR', 'KO'}
m = market.upper()
if m in LANG_CODES:
lang = m
else:
country = m
elif market:
# Longer value like "English" or "Dutch" — put in Language
lang = market
deliverables.append({
'Number': '',
'Title': row.get('title', ''),
'Status': row.get('status', 'Booked') or 'Booked',
'Category': row.get('category', ''),
'Media': row.get('media', ''),
'Sub-media': row.get('asset_type', ''),
'Format': row.get('technical_specifications', ''),
'Supply date': row.get('review_date', ''),
'Live date': row.get('live_date', ''),
'Language': lang,
'Country': country,
'Quantity': int(row.get('quantity', 1) or 1),
# Extra brief fields kept for review UI
'_brief_title': row.get('title', ''),
'_brand_identifier': row.get('brand_identifier', ''),
'_priority': row.get('priority_level', ''),
})
return jsonify({'deliverables': deliverables, 'count': len(deliverables)})
except Exception as e:
logger.error(f"Failed to get deliverables for job {job_id}: {e}")
return jsonify({'error': 'server_error'}), 500