222 lines
6.3 KiB
Python
222 lines
6.3 KiB
Python
"""
|
|
QC Runner - Background QC execution with real-time progress events
|
|
|
|
Runs QC checks in background threads and emits progress updates
|
|
via thread-safe queues for Server-Sent Events (SSE) streaming.
|
|
"""
|
|
|
|
import threading
|
|
from queue import Queue
|
|
import json
|
|
import os
|
|
import sys
|
|
import datetime
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
import config
|
|
import qc_module
|
|
|
|
# Global dict to track job progress: {job_id: Queue}
|
|
progress_queues = {}
|
|
# Global dict to track job results: {job_id: report_path}
|
|
job_results = {}
|
|
|
|
|
|
def start_qc_job(job_id, profile_path, video_path, selected_checks, check_config):
|
|
"""
|
|
Start QC execution in background thread
|
|
|
|
Args:
|
|
job_id: Unique job identifier
|
|
profile_path: Path to QC profile JSON
|
|
video_path: Path to video file
|
|
selected_checks: List of check IDs to run
|
|
check_config: Dict of custom check configurations
|
|
"""
|
|
queue = Queue()
|
|
progress_queues[job_id] = queue
|
|
|
|
# Start background thread
|
|
thread = threading.Thread(
|
|
target=_run_qc_checks,
|
|
args=(job_id, profile_path, video_path, selected_checks, check_config, queue),
|
|
daemon=True
|
|
)
|
|
thread.start()
|
|
|
|
|
|
def get_progress_queue(job_id):
|
|
"""Get progress queue for a job"""
|
|
return progress_queues.get(job_id)
|
|
|
|
|
|
def get_job_result(job_id):
|
|
"""Get result (report path) for a completed job"""
|
|
return job_results.get(job_id)
|
|
|
|
|
|
def _run_qc_checks(job_id, profile_path, video_path, selected_checks, check_config, queue):
|
|
"""
|
|
Run QC checks in background thread and emit progress events
|
|
|
|
This function runs in a separate thread and should not be called directly.
|
|
Use start_qc_job() instead.
|
|
"""
|
|
try:
|
|
# Emit start event
|
|
_emit_progress(queue, 'system', 'started',
|
|
f'Starting QC on {os.path.basename(video_path)}')
|
|
|
|
# Load and filter profile
|
|
with open(profile_path, 'r', encoding='utf-8') as f:
|
|
all_checks = json.load(f)
|
|
|
|
# Filter to selected checks
|
|
checks = [c for c in all_checks if c['id'] in selected_checks]
|
|
|
|
# Apply custom configurations
|
|
for check in checks:
|
|
if check['id'] in check_config:
|
|
check['config'].update(check_config[check['id']])
|
|
|
|
# Run QC checks with progress callbacks
|
|
results = _execute_qc_profile(checks, video_path, queue)
|
|
|
|
# Generate web-styled report
|
|
_emit_progress(queue, 'system', 'generating_report', 'Generating HTML report')
|
|
report_path = _generate_web_report(results, video_path)
|
|
|
|
# Store result
|
|
job_results[job_id] = report_path
|
|
|
|
# Emit completion event
|
|
_emit_progress(queue, 'system', 'completed', report_path)
|
|
|
|
except Exception as e:
|
|
# Emit error event
|
|
_emit_progress(queue, 'system', 'error', str(e))
|
|
|
|
finally:
|
|
# Signal completion (None closes SSE stream)
|
|
queue.put(None)
|
|
|
|
|
|
def _execute_qc_profile(checks, video_path, queue):
|
|
"""
|
|
Execute QC profile with progress callbacks
|
|
|
|
Args:
|
|
checks: List of check definitions
|
|
video_path: Path to video file
|
|
queue: Queue for progress events
|
|
|
|
Returns:
|
|
dict: QC results in standard format
|
|
"""
|
|
context = {}
|
|
results = {
|
|
"profile": "HM_video",
|
|
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
|
|
"checks": []
|
|
}
|
|
|
|
for idx, check_def in enumerate(checks, start=1):
|
|
check_id = check_def['id']
|
|
check_script = check_def['script']
|
|
check_config = check_def['config'].copy()
|
|
|
|
# Inject input file
|
|
check_config['input_file'] = video_path
|
|
|
|
# Emit start event
|
|
description = check_config.get('description', check_id)
|
|
_emit_progress(queue, check_id, 'running', f'Running: {description}')
|
|
|
|
try:
|
|
# Run check
|
|
result = qc_module.run_single_check(check_script, check_config, context, check_id)
|
|
|
|
# Store result
|
|
results['checks'].append({
|
|
'index': idx,
|
|
'id': check_id,
|
|
'script': check_script,
|
|
'config': check_config,
|
|
'result': result
|
|
})
|
|
|
|
# Emit completion event
|
|
status = result.get('status', 'unknown')
|
|
if status == 'passed':
|
|
message = f'✓ {check_id} passed'
|
|
elif status == 'error':
|
|
message = f'✗ {check_id} error: {result.get("error_message", "Unknown error")}'
|
|
elif status == 'skipped':
|
|
message = f'⊘ {check_id} skipped'
|
|
else:
|
|
message = f'{check_id} completed with status: {status}'
|
|
|
|
_emit_progress(queue, check_id, status, message)
|
|
|
|
except Exception as e:
|
|
# Handle check execution error
|
|
result = {
|
|
'status': 'error',
|
|
'error_message': f'Exception during check execution: {str(e)}'
|
|
}
|
|
results['checks'].append({
|
|
'index': idx,
|
|
'id': check_id,
|
|
'script': check_script,
|
|
'config': check_config,
|
|
'result': result
|
|
})
|
|
_emit_progress(queue, check_id, 'error', str(e))
|
|
|
|
return results
|
|
|
|
|
|
def _generate_web_report(results, video_path):
|
|
"""
|
|
Generate web-styled HTML report
|
|
|
|
Args:
|
|
results: QC results dict
|
|
video_path: Path to video file
|
|
|
|
Returns:
|
|
str: Path to generated report
|
|
"""
|
|
from utils.web_report import WebHTMLReporter
|
|
|
|
reports_dir = config.REPORTS_DIR
|
|
os.makedirs(reports_dir, exist_ok=True)
|
|
|
|
input_filename = os.path.basename(video_path)
|
|
|
|
report_path = WebHTMLReporter.generate_report(
|
|
json_data=results,
|
|
reports_dir=reports_dir,
|
|
input_filename=input_filename
|
|
)
|
|
|
|
return report_path
|
|
|
|
|
|
def _emit_progress(queue, check_id, status, message):
|
|
"""
|
|
Emit progress event to queue
|
|
|
|
Args:
|
|
queue: Queue object
|
|
check_id: Check identifier or 'system'
|
|
status: Status string (running, passed, error, skipped, etc.)
|
|
message: Progress message
|
|
"""
|
|
event = {
|
|
'check_id': check_id,
|
|
'status': status,
|
|
'message': message
|
|
}
|
|
queue.put(event)
|