- GEMINI_MODEL for AI commands: gemini-2.0-flash-exp → gemini-3-flash-preview - Language/Country: handle plain 2-letter codes (EN→Language, UK→Country) and "EN-UK" split format; previously only split format worked - Handsontable black screen: add min-h-0 on flex-1 container so height:100% resolves correctly inside the flexbox chain Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
627 lines
No EOL
20 KiB
Python
Executable file
627 lines
No EOL
20 KiB
Python
Executable file
"""
|
|
Jobs API endpoints for file upload and processing management
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import zipfile
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
from quart import Blueprint, request, jsonify, send_file, g
|
|
|
|
import csv
|
|
from ..auth.middleware import dev_mode_bypass, auth_required, get_user_id
|
|
from ..jobs.models import Job, ModelConfiguration
|
|
from ..jobs.manager import JobManager
|
|
from ..ws.manager import WebSocketManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
jobs_bp = Blueprint('jobs', __name__, url_prefix='/api/jobs')
|
|
|
|
@jobs_bp.route('', methods=['POST'])
|
|
@dev_mode_bypass
|
|
async def create_jobs():
|
|
"""
|
|
Create new processing jobs from uploaded files
|
|
|
|
Accepts multipart/form-data with:
|
|
- files: One or more files to process
|
|
- modelConfig (optional): JSON string with model configuration
|
|
|
|
Returns:
|
|
List of created job objects
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
ws_manager = WebSocketManager()
|
|
user_id = get_user_id()
|
|
|
|
# Get uploaded files
|
|
files = await request.files
|
|
|
|
if not files:
|
|
return jsonify({
|
|
'error': 'no_files',
|
|
'message': 'No files provided for upload'
|
|
}), 400
|
|
|
|
logger.info(f"Received {len(files)} files for upload")
|
|
|
|
# Get model configuration from form data
|
|
form_data = await request.form
|
|
model_config_json = form_data.get('modelConfig')
|
|
|
|
model_config = None
|
|
if model_config_json:
|
|
try:
|
|
import json
|
|
model_config_data = json.loads(model_config_json)
|
|
model_config = ModelConfiguration.from_dict(model_config_data)
|
|
except Exception as e:
|
|
return jsonify({
|
|
'error': 'invalid_model_config',
|
|
'message': f'Invalid model configuration: {e}'
|
|
}), 400
|
|
|
|
created_jobs = []
|
|
errors = []
|
|
|
|
# Process each uploaded file
|
|
for field_name, file_storage in files.items():
|
|
try:
|
|
if not file_storage or not file_storage.filename:
|
|
logger.warning(f"Skipping empty file field: {field_name}")
|
|
continue
|
|
|
|
logger.info(f"Processing file: {file_storage.filename}")
|
|
|
|
# Read file data
|
|
file_data = file_storage.read()
|
|
file_size = len(file_data)
|
|
|
|
# Create job
|
|
job = await job_manager.create_job(
|
|
file_name=file_storage.filename,
|
|
file_size=file_size,
|
|
file_data=file_data,
|
|
user_id=user_id,
|
|
model_config=model_config
|
|
)
|
|
|
|
created_jobs.append(job)
|
|
logger.info(f"Created and queued job {job.id} for {file_storage.filename}")
|
|
|
|
# Broadcast job creation
|
|
await ws_manager.broadcast_to_user(user_id, {
|
|
'type': 'job.created',
|
|
'job': job.to_dict()
|
|
})
|
|
|
|
# Broadcast job accepted (when it enters the queue)
|
|
await ws_manager.broadcast_to_user(user_id, {
|
|
'type': 'job.accepted',
|
|
'jobId': job.id
|
|
})
|
|
|
|
logger.info(f"Created job {job.id} for file {file_storage.filename} (user: {user_id})")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to process file {file_storage.filename}: {str(e)}"
|
|
errors.append(error_msg)
|
|
logger.error(error_msg)
|
|
|
|
if not created_jobs and errors:
|
|
return jsonify({
|
|
'error': 'upload_failed',
|
|
'message': 'Failed to process any files',
|
|
'details': errors
|
|
}), 400
|
|
|
|
return jsonify({
|
|
'jobs': [job.to_dict() for job in created_jobs],
|
|
'errors': errors
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Job creation failed: {e}", exc_info=True)
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to create jobs'
|
|
}), 500
|
|
|
|
@jobs_bp.route('', methods=['GET'])
|
|
@dev_mode_bypass
|
|
async def list_jobs():
|
|
"""
|
|
List jobs for the current user
|
|
|
|
Query parameters:
|
|
- limit: Maximum number of jobs to return (default: 50, max: 100)
|
|
- offset: Number of jobs to skip (default: 0)
|
|
- status: Filter by job status (optional)
|
|
|
|
Returns:
|
|
Paginated list of jobs
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
user_id = get_user_id()
|
|
|
|
# Parse query parameters
|
|
limit = min(int(request.args.get('limit', 50)), 100)
|
|
offset = int(request.args.get('offset', 0))
|
|
status_filter = request.args.get('status')
|
|
|
|
# Get user jobs
|
|
jobs = await job_manager.get_user_jobs(user_id, limit, offset)
|
|
|
|
# Apply status filter if provided
|
|
if status_filter:
|
|
jobs = [job for job in jobs if job.phase.value.lower() == status_filter.lower()]
|
|
|
|
return jsonify({
|
|
'jobs': [job.to_dict() for job in jobs],
|
|
'pagination': {
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'count': len(jobs)
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list jobs: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to retrieve jobs'
|
|
}), 500
|
|
|
|
@jobs_bp.route('/<job_id>', methods=['GET'])
|
|
@dev_mode_bypass
|
|
async def get_job(job_id: str):
|
|
"""
|
|
Get details for a specific job
|
|
|
|
Returns:
|
|
Job details including progress, logs, and results
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
user_id = get_user_id()
|
|
|
|
job = await job_manager.get_job(job_id)
|
|
|
|
if not job:
|
|
return jsonify({
|
|
'error': 'not_found',
|
|
'message': 'Job not found'
|
|
}), 404
|
|
|
|
# Check if user owns this job (skip check in dev mode)
|
|
from ..config_runtime import server_config
|
|
if not server_config.DEV_MODE and job.user_id != user_id:
|
|
return jsonify({
|
|
'error': 'forbidden',
|
|
'message': 'Access denied'
|
|
}), 403
|
|
|
|
return jsonify({
|
|
'job': job.to_dict()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get job {job_id}: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to retrieve job'
|
|
}), 500
|
|
|
|
@jobs_bp.route('/<job_id>/download', methods=['GET'])
|
|
@dev_mode_bypass
|
|
async def download_job_result(job_id: str):
|
|
"""
|
|
Download the CSV result file for a completed job
|
|
|
|
Returns:
|
|
CSV file as download attachment
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
user_id = get_user_id()
|
|
|
|
job = await job_manager.get_job(job_id)
|
|
|
|
if not job:
|
|
return jsonify({
|
|
'error': 'not_found',
|
|
'message': 'Job not found'
|
|
}), 404
|
|
|
|
# Check if user owns this job (skip check in dev mode)
|
|
from ..config_runtime import server_config
|
|
if not server_config.DEV_MODE and job.user_id != user_id:
|
|
return jsonify({
|
|
'error': 'forbidden',
|
|
'message': 'Access denied'
|
|
}), 403
|
|
|
|
# Check if job is completed and has output
|
|
if not job.output_path or not os.path.exists(job.output_path):
|
|
return jsonify({
|
|
'error': 'not_ready',
|
|
'message': 'Job result not available'
|
|
}), 400
|
|
|
|
# Generate download filename
|
|
base_name = os.path.splitext(job.file_name)[0]
|
|
download_filename = f"{base_name}-results.csv"
|
|
|
|
return await send_file(
|
|
job.output_path,
|
|
as_attachment=True,
|
|
attachment_filename=download_filename,
|
|
mimetype='text/csv'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Download failed for job {job_id}: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to download result'
|
|
}), 500
|
|
|
|
@jobs_bp.route('/<job_id>/logs', methods=['GET'])
|
|
@dev_mode_bypass
|
|
async def get_job_logs(job_id: str):
|
|
"""
|
|
Get logs for a specific job
|
|
|
|
Query parameters:
|
|
- limit: Maximum number of log entries (default: 100)
|
|
- level: Filter by log level (optional)
|
|
|
|
Returns:
|
|
List of log entries
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
user_id = get_user_id()
|
|
|
|
job = await job_manager.get_job(job_id)
|
|
|
|
if not job:
|
|
return jsonify({
|
|
'error': 'not_found',
|
|
'message': 'Job not found'
|
|
}), 404
|
|
|
|
# Check if user owns this job (skip check in dev mode)
|
|
from ..config_runtime import server_config
|
|
if not server_config.DEV_MODE and job.user_id != user_id:
|
|
return jsonify({
|
|
'error': 'forbidden',
|
|
'message': 'Access denied'
|
|
}), 403
|
|
|
|
# Parse query parameters
|
|
limit = min(int(request.args.get('limit', 100)), 1000)
|
|
level_filter = request.args.get('level')
|
|
|
|
# Get logs
|
|
logs = job.logs
|
|
|
|
# Apply level filter if provided
|
|
if level_filter:
|
|
logs = [log for log in logs if log.level.lower() == level_filter.lower()]
|
|
|
|
# Apply limit
|
|
logs = logs[-limit:] if len(logs) > limit else logs
|
|
|
|
return jsonify({
|
|
'logs': [log.to_dict() for log in logs],
|
|
'count': len(logs)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get logs for job {job_id}: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to retrieve job logs'
|
|
}), 500
|
|
|
|
@jobs_bp.route('/<job_id>', methods=['DELETE'])
|
|
@dev_mode_bypass
|
|
async def delete_job(job_id: str):
|
|
"""
|
|
Delete a job and clean up its files
|
|
|
|
Returns:
|
|
Success confirmation
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
ws_manager = WebSocketManager()
|
|
user_id = get_user_id()
|
|
|
|
job = await job_manager.get_job(job_id)
|
|
|
|
if not job:
|
|
return jsonify({
|
|
'error': 'not_found',
|
|
'message': 'Job not found'
|
|
}), 404
|
|
|
|
# Check if user owns this job (skip check in dev mode)
|
|
from ..config_runtime import server_config
|
|
if not server_config.DEV_MODE and job.user_id != user_id:
|
|
return jsonify({
|
|
'error': 'forbidden',
|
|
'message': 'Access denied'
|
|
}), 403
|
|
|
|
# Delete job
|
|
success = await job_manager.delete_job(job_id)
|
|
|
|
if success:
|
|
# Broadcast deletion
|
|
await ws_manager.broadcast_to_user(user_id, {
|
|
'type': 'job.deleted',
|
|
'jobId': job_id
|
|
})
|
|
|
|
logger.info(f"Deleted job {job_id} (user: {user_id})")
|
|
|
|
return jsonify({
|
|
'message': 'Job deleted successfully'
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'error': 'deletion_failed',
|
|
'message': 'Failed to delete job'
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete job {job_id}: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to delete job'
|
|
}), 500
|
|
|
|
@jobs_bp.route('/batch-download', methods=['POST'])
|
|
@dev_mode_bypass
|
|
async def batch_download():
|
|
"""
|
|
Download multiple job results as a ZIP file
|
|
|
|
Expects:
|
|
{
|
|
"jobIds": ["job1", "job2", "job3"]
|
|
}
|
|
|
|
Returns:
|
|
ZIP file containing CSV results
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
user_id = get_user_id()
|
|
|
|
data = await request.get_json()
|
|
job_ids = data.get('jobIds', [])
|
|
|
|
if not job_ids:
|
|
return jsonify({
|
|
'error': 'invalid_request',
|
|
'message': 'No job IDs provided'
|
|
}), 400
|
|
|
|
# Create ZIP file in memory
|
|
zip_buffer = BytesIO()
|
|
csv_files = []
|
|
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
|
|
for job_id in job_ids:
|
|
job = await job_manager.get_job(job_id)
|
|
|
|
if not job:
|
|
logger.warning(f"Job {job_id} not found for batch download")
|
|
continue
|
|
|
|
# Check if user owns this job (skip check in dev mode)
|
|
from ..config_runtime import server_config
|
|
if not server_config.DEV_MODE and job.user_id != user_id:
|
|
logger.warning(f"User {user_id} denied access to job {job_id}")
|
|
continue
|
|
|
|
# Check if job has output
|
|
if not job.output_path or not os.path.exists(job.output_path):
|
|
logger.warning(f"Job {job_id} has no output file")
|
|
continue
|
|
|
|
# Add CSV to ZIP asynchronously
|
|
base_name = os.path.splitext(job.file_name)[0]
|
|
csv_filename = f"{base_name}-{job_id[:8]}.csv"
|
|
|
|
# Read file in thread pool to avoid blocking
|
|
def _read_csv():
|
|
with open(job.output_path, 'rb') as csv_file:
|
|
return csv_file.read()
|
|
|
|
import asyncio
|
|
loop = asyncio.get_running_loop()
|
|
csv_content = await loop.run_in_executor(None, _read_csv)
|
|
zip_file.writestr(csv_filename, csv_content)
|
|
|
|
csv_files.append(csv_filename)
|
|
|
|
if not csv_files:
|
|
return jsonify({
|
|
'error': 'no_results',
|
|
'message': 'No completed jobs found for download'
|
|
}), 400
|
|
|
|
zip_buffer.seek(0)
|
|
|
|
# Generate download filename
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
zip_filename = f"batch_results_{timestamp}.zip"
|
|
|
|
return await send_file(
|
|
zip_buffer,
|
|
as_attachment=True,
|
|
attachment_filename=zip_filename,
|
|
mimetype='application/zip'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Batch download failed: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to create batch download'
|
|
}), 500
|
|
|
|
@jobs_bp.route('/cleanup', methods=['POST'])
|
|
@dev_mode_bypass
|
|
async def cleanup_expired():
|
|
"""
|
|
Manually trigger cleanup of expired jobs and files
|
|
(Admin/maintenance endpoint)
|
|
|
|
Returns:
|
|
Number of items cleaned up
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
|
|
# Perform cleanup
|
|
cleaned_count = await job_manager.cleanup_expired_jobs()
|
|
|
|
logger.info(f"Manual cleanup completed: {cleaned_count} items removed")
|
|
|
|
return jsonify({
|
|
'message': 'Cleanup completed',
|
|
'itemsRemoved': cleaned_count
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Cleanup failed: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to perform cleanup'
|
|
}), 500
|
|
|
|
@jobs_bp.route('/stats', methods=['GET'])
|
|
@dev_mode_bypass
|
|
async def get_job_stats():
|
|
"""
|
|
Get job processing statistics for the current user
|
|
|
|
Returns:
|
|
Statistics about job processing
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
user_id = get_user_id()
|
|
|
|
# Get all user jobs
|
|
all_jobs = await job_manager.get_user_jobs(user_id, limit=1000)
|
|
|
|
# Calculate statistics
|
|
total_jobs = len(all_jobs)
|
|
completed_jobs = len([j for j in all_jobs if j.phase.value == 'COMPLETED'])
|
|
failed_jobs = len([j for j in all_jobs if j.phase.value == 'FAILED'])
|
|
active_jobs = len([j for j in all_jobs if j.phase.value not in ['COMPLETED', 'FAILED']])
|
|
|
|
total_assets = sum(j.summary.assets_extracted for j in all_jobs if j.summary)
|
|
total_cost = sum(j.summary.cost_usd_total for j in all_jobs if j.summary)
|
|
|
|
return jsonify({
|
|
'stats': {
|
|
'totalJobs': total_jobs,
|
|
'completedJobs': completed_jobs,
|
|
'failedJobs': failed_jobs,
|
|
'activeJobs': active_jobs,
|
|
'successRate': completed_jobs / total_jobs if total_jobs > 0 else 0,
|
|
'totalAssetsExtracted': total_assets,
|
|
'totalCostUsd': round(total_cost, 4)
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get job stats: {e}")
|
|
return jsonify({
|
|
'error': 'server_error',
|
|
'message': 'Failed to retrieve statistics'
|
|
}), 500
|
|
|
|
|
|
@jobs_bp.route('/<job_id>/deliverables', methods=['GET'])
|
|
@auth_required
|
|
async def get_job_deliverables(job_id: str):
|
|
"""
|
|
Return extracted deliverables from a completed job as JSON rows
|
|
ready for the Review → Import flow.
|
|
Reads the output CSV and maps columns to AC Deliverable schema.
|
|
"""
|
|
try:
|
|
job_manager = JobManager.get_instance()
|
|
user_id = get_user_id()
|
|
|
|
job = await job_manager.get_job(job_id)
|
|
if not job:
|
|
return jsonify({'error': 'not_found'}), 404
|
|
|
|
from ..config_runtime import server_config
|
|
if not server_config.DEV_MODE and job.user_id != user_id:
|
|
return jsonify({'error': 'forbidden'}), 403
|
|
|
|
if not job.output_path or not os.path.exists(job.output_path):
|
|
return jsonify({'error': 'not_ready', 'message': 'Job not completed yet'}), 400
|
|
|
|
deliverables = []
|
|
with open(job.output_path, newline='', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
# Map brief-extractor CSV columns → AC Deliverable schema
|
|
market = row.get('language_country_market', '').strip()
|
|
lang, country = ('', '')
|
|
if '-' in market:
|
|
parts = market.split('-', 1)
|
|
lang = parts[0].strip().upper()
|
|
country = parts[1].strip().upper()
|
|
elif len(market) == 2:
|
|
# Plain 2-letter code — could be language (EN, DE, FR) or country (UK, US)
|
|
# Common language codes get mapped to Language; country-only codes to Country
|
|
LANG_CODES = {'EN', 'DE', 'FR', 'NL', 'ES', 'IT', 'PT', 'PL', 'RU', 'JA', 'ZH', 'AR', 'KO'}
|
|
m = market.upper()
|
|
if m in LANG_CODES:
|
|
lang = m
|
|
else:
|
|
country = m
|
|
elif market:
|
|
# Longer value like "English" or "Dutch" — put in Language
|
|
lang = market
|
|
|
|
deliverables.append({
|
|
'Number': '',
|
|
'Title': row.get('title', ''),
|
|
'Status': row.get('status', 'Booked') or 'Booked',
|
|
'Category': row.get('category', ''),
|
|
'Media': row.get('media', ''),
|
|
'Sub-media': row.get('asset_type', ''),
|
|
'Format': row.get('technical_specifications', ''),
|
|
'Supply date': row.get('review_date', ''),
|
|
'Live date': row.get('live_date', ''),
|
|
'Language': lang,
|
|
'Country': country,
|
|
'Quantity': int(row.get('quantity', 1) or 1),
|
|
# Extra brief fields kept for review UI
|
|
'_brief_title': row.get('title', ''),
|
|
'_brand_identifier': row.get('brand_identifier', ''),
|
|
'_priority': row.get('priority_level', ''),
|
|
})
|
|
|
|
return jsonify({'deliverables': deliverables, 'count': len(deliverables)})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get deliverables for job {job_id}: {e}")
|
|
return jsonify({'error': 'server_error'}), 500 |