hm_video_ai_qc_tool/web/qc_runner.py
2025-12-31 12:59:50 +02:00

222 lines
6.3 KiB
Python

"""
QC Runner - Background QC execution with real-time progress events
Runs QC checks in background threads and emits progress updates
via thread-safe queues for Server-Sent Events (SSE) streaming.
"""
import threading
from queue import Queue
import json
import os
import sys
import datetime
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
import qc_module
# Global dict to track job progress: {job_id: Queue}
progress_queues = {}
# Global dict to track job results: {job_id: report_path}
job_results = {}
def start_qc_job(job_id, profile_path, video_path, selected_checks, check_config):
"""
Start QC execution in background thread
Args:
job_id: Unique job identifier
profile_path: Path to QC profile JSON
video_path: Path to video file
selected_checks: List of check IDs to run
check_config: Dict of custom check configurations
"""
queue = Queue()
progress_queues[job_id] = queue
# Start background thread
thread = threading.Thread(
target=_run_qc_checks,
args=(job_id, profile_path, video_path, selected_checks, check_config, queue),
daemon=True
)
thread.start()
def get_progress_queue(job_id):
"""Get progress queue for a job"""
return progress_queues.get(job_id)
def get_job_result(job_id):
"""Get result (report path) for a completed job"""
return job_results.get(job_id)
def _run_qc_checks(job_id, profile_path, video_path, selected_checks, check_config, queue):
"""
Run QC checks in background thread and emit progress events
This function runs in a separate thread and should not be called directly.
Use start_qc_job() instead.
"""
try:
# Emit start event
_emit_progress(queue, 'system', 'started',
f'Starting QC on {os.path.basename(video_path)}')
# Load and filter profile
with open(profile_path, 'r', encoding='utf-8') as f:
all_checks = json.load(f)
# Filter to selected checks
checks = [c for c in all_checks if c['id'] in selected_checks]
# Apply custom configurations
for check in checks:
if check['id'] in check_config:
check['config'].update(check_config[check['id']])
# Run QC checks with progress callbacks
results = _execute_qc_profile(checks, video_path, queue)
# Generate web-styled report
_emit_progress(queue, 'system', 'generating_report', 'Generating HTML report')
report_path = _generate_web_report(results, video_path)
# Store result
job_results[job_id] = report_path
# Emit completion event
_emit_progress(queue, 'system', 'completed', report_path)
except Exception as e:
# Emit error event
_emit_progress(queue, 'system', 'error', str(e))
finally:
# Signal completion (None closes SSE stream)
queue.put(None)
def _execute_qc_profile(checks, video_path, queue):
"""
Execute QC profile with progress callbacks
Args:
checks: List of check definitions
video_path: Path to video file
queue: Queue for progress events
Returns:
dict: QC results in standard format
"""
context = {}
results = {
"profile": "HM_video",
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
"checks": []
}
for idx, check_def in enumerate(checks, start=1):
check_id = check_def['id']
check_script = check_def['script']
check_config = check_def['config'].copy()
# Inject input file
check_config['input_file'] = video_path
# Emit start event
description = check_config.get('description', check_id)
_emit_progress(queue, check_id, 'running', f'Running: {description}')
try:
# Run check
result = qc_module.run_single_check(check_script, check_config, context, check_id)
# Store result
results['checks'].append({
'index': idx,
'id': check_id,
'script': check_script,
'config': check_config,
'result': result
})
# Emit completion event
status = result.get('status', 'unknown')
if status == 'passed':
message = f'{check_id} passed'
elif status == 'error':
message = f'{check_id} error: {result.get("error_message", "Unknown error")}'
elif status == 'skipped':
message = f'{check_id} skipped'
else:
message = f'{check_id} completed with status: {status}'
_emit_progress(queue, check_id, status, message)
except Exception as e:
# Handle check execution error
result = {
'status': 'error',
'error_message': f'Exception during check execution: {str(e)}'
}
results['checks'].append({
'index': idx,
'id': check_id,
'script': check_script,
'config': check_config,
'result': result
})
_emit_progress(queue, check_id, 'error', str(e))
return results
def _generate_web_report(results, video_path):
"""
Generate web-styled HTML report
Args:
results: QC results dict
video_path: Path to video file
Returns:
str: Path to generated report
"""
from utils.web_report import WebHTMLReporter
reports_dir = config.REPORTS_DIR
os.makedirs(reports_dir, exist_ok=True)
input_filename = os.path.basename(video_path)
report_path = WebHTMLReporter.generate_report(
json_data=results,
reports_dir=reports_dir,
input_filename=input_filename
)
return report_path
def _emit_progress(queue, check_id, status, message):
"""
Emit progress event to queue
Args:
queue: Queue object
check_id: Check identifier or 'system'
status: Status string (running, passed, error, skipped, etc.)
message: Progress message
"""
event = {
'check_id': check_id,
'status': status,
'message': message
}
queue.put(event)