hm_ai_qc_report_tool/modules/video_qc/routes.py
nickviljoen 57dbefe4f2 Video QC routes: accept .srt uploads + pre-flight pairing endpoint
Adds .srt to ALLOWED_EXTENSIONS; introduces is_video() / is_srt() helpers.
New /pairing-preview/<session_id> endpoint returns pair_map + unpaired lists
for the configure UI. Batch execute threads srt_paths into BatchVideoQCExecutor.
2026-05-15 20:45:55 +02:00

556 lines
18 KiB
Python

"""
Video QC Module Routes.
Handles video QC workflow:
1. Upload - Video file upload (single or batch)
2. Configure - LLM provider selection
3. Execute - Frame extraction + AI checks with progress (single or batch)
4. Results - Display scored results
"""
import os
import uuid
import threading
import logging
from datetime import datetime
from flask import (
render_template, request, jsonify, send_file, current_app,
Response, stream_with_context
)
from werkzeug.utils import secure_filename
from core.auth import current_user_email
from .blueprint import video_qc_bp
from core.utils.progress_tracker import UnifiedProgressTracker
from core.models.qc_report import QCReport
from core.models.database import db
logger = logging.getLogger(__name__)
ALLOWED_VIDEO_EXTENSIONS = {'mp4', 'mov', 'avi', 'mkv'}
ALLOWED_SRT_EXTENSIONS = {'srt'}
ALLOWED_EXTENSIONS = ALLOWED_VIDEO_EXTENSIONS | ALLOWED_SRT_EXTENSIONS
MAX_BATCH_FILES = 50
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def is_video(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_VIDEO_EXTENSIONS
def is_srt(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SRT_EXTENSIONS
@video_qc_bp.route('/')
@video_qc_bp.route('/index')
def index():
"""Main Video QC page with recent reports grouped by batch."""
try:
batches, individual_reports = QCReport.get_recent_grouped(
limit=100, report_type='video_qc'
)
except Exception:
batches, individual_reports = [], []
return render_template(
'video_qc/index.html',
active_tab='video-qc',
batches=batches,
individual_reports=individual_reports
)
@video_qc_bp.route('/upload', methods=['GET', 'POST'])
def upload():
"""Handle video file upload (single or multiple)."""
if request.method == 'GET':
return render_template('video_qc/upload.html', active_tab='video-qc')
try:
session_id = str(uuid.uuid4())
upload_path = os.path.join(
current_app.config['VIDEO_QC_UPLOAD_PATH'], session_id
)
os.makedirs(upload_path, exist_ok=True)
saved_files = []
# Multi-file upload
files = request.files.getlist('files')
if not files or (len(files) == 1 and files[0].filename == ''):
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
single = request.files['file']
if single.filename == '':
return jsonify({'error': 'No file selected'}), 400
files = [single]
if len(files) > MAX_BATCH_FILES:
return jsonify({
'error': f'Maximum {MAX_BATCH_FILES} files allowed. Got {len(files)}.'
}), 400
for f in files:
if not allowed_file(f.filename):
continue
filename = secure_filename(f.filename)
filepath = os.path.join(upload_path, filename)
f.save(filepath)
saved_files.append(filename)
if not saved_files:
return jsonify({
'error': f'No valid videos. Allowed types: {", ".join(ALLOWED_EXTENSIONS)}'
}), 400
is_batch = len(saved_files) > 1
logger.info(
f"{'Batch' if is_batch else 'Video'} uploaded: {len(saved_files)} file(s) "
f"(session: {session_id})"
)
return jsonify({
'success': True,
'session_id': session_id,
'filename': saved_files[0] if not is_batch else None,
'filenames': saved_files,
'file_count': len(saved_files),
'is_batch': is_batch
})
except Exception as e:
logger.error(f"Upload error: {e}")
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/configure/<session_id>')
def configure(session_id):
"""Show configuration page."""
upload_path = os.path.join(
current_app.config['VIDEO_QC_UPLOAD_PATH'], session_id
)
files = []
if os.path.exists(upload_path):
files = [f for f in os.listdir(upload_path) if allowed_file(f)]
return render_template(
'video_qc/configure.html',
active_tab='video-qc',
session_id=session_id,
filename=files[0] if files else 'Unknown',
filenames=files,
file_count=len(files)
)
@video_qc_bp.route('/pairing-preview/<session_id>')
def pairing_preview(session_id):
"""Pre-flight: return pair_map + unpaired counts for the configure UI."""
upload_path = os.path.join(
current_app.config['VIDEO_QC_UPLOAD_PATH'], session_id
)
if not os.path.isdir(upload_path):
return jsonify({'error': 'Session upload folder not found'}), 404
files = os.listdir(upload_path)
video_paths = [os.path.join(upload_path, f) for f in files if is_video(f)]
srt_paths = [os.path.join(upload_path, f) for f in files if is_srt(f)]
if not srt_paths:
return jsonify({
'pairs': [{'video': os.path.basename(v), 'srt': None} for v in video_paths],
'unpaired_srts': [],
'unpaired_videos': [os.path.basename(v) for v in video_paths],
'srt_count': 0,
})
from .utils.srt_pairing import pair_batch
pair_map, unpaired_srts, unpaired_videos = pair_batch(video_paths, srt_paths)
return jsonify({
'pairs': [
{'video': os.path.basename(v), 'srt': os.path.basename(s) if s else None}
for v, s in pair_map.items()
],
'unpaired_srts': [os.path.basename(s) for s in unpaired_srts],
'unpaired_videos': [os.path.basename(v) for v in unpaired_videos],
'srt_count': len(srt_paths),
})
@video_qc_bp.route('/execute', methods=['POST'])
def execute():
"""Start single-file video QC execution."""
from .executor import VideoQCExecutor
try:
data = request.get_json()
session_id = data.get('session_id')
job_number = data.get('job_number')
llm_provider = data.get('llm_provider', 'google')
if not session_id:
return jsonify({'error': 'Missing session_id'}), 400
provider_models = {'openai': 'gpt-4o', 'google': 'gemini-2.5-flash'}
llm_model = provider_models.get(llm_provider, 'gemini-2.5-flash')
upload_path = os.path.join(
current_app.config['VIDEO_QC_UPLOAD_PATH'], session_id
)
files = [f for f in os.listdir(upload_path) if allowed_file(f)]
if not files:
return jsonify({'error': 'No video file found'}), 404
file_path = os.path.join(upload_path, files[0])
user_email = current_user_email()
campaign_id = data.get('campaign_id')
pricing_reference_id = data.get('pricing_reference_id')
executor = VideoQCExecutor(
session_id=session_id,
file_path=file_path,
job_number=job_number,
llm_provider=llm_provider,
llm_model=llm_model,
user=user_email,
campaign_id=campaign_id,
pricing_reference_id=pricing_reference_id
)
app = current_app._get_current_object()
def run():
with app.app_context():
executor.execute()
thread = threading.Thread(target=run)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'session_id': session_id
})
except Exception as e:
logger.error(f"Execute error: {e}")
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/execute/batch', methods=['POST'])
def execute_batch():
"""Start batch video QC execution for multiple files."""
from .batch_executor import BatchVideoQCExecutor
try:
data = request.get_json()
session_id = data.get('session_id')
job_number = data.get('job_number')
llm_provider = data.get('llm_provider', 'google')
if not session_id:
return jsonify({'error': 'Missing session_id'}), 400
provider_models = {'openai': 'gpt-4o', 'google': 'gemini-2.5-flash'}
llm_model = provider_models.get(llm_provider, 'gemini-2.5-flash')
upload_path = os.path.join(
current_app.config['VIDEO_QC_UPLOAD_PATH'], session_id
)
files = os.listdir(upload_path)
video_files = [f for f in files if is_video(f)]
srt_files = [f for f in files if is_srt(f)]
if not video_files:
return jsonify({'error': 'No video files found'}), 404
file_paths = [os.path.join(upload_path, f) for f in video_files]
srt_paths = [os.path.join(upload_path, f) for f in srt_files]
user_email = current_user_email()
campaign_id = data.get('campaign_id')
pricing_reference_id = data.get('pricing_reference_id')
batch_id = str(uuid.uuid4())
app = current_app._get_current_object()
batch_executor = BatchVideoQCExecutor(
session_id=session_id,
file_paths=file_paths,
srt_paths=srt_paths,
job_number=job_number,
llm_provider=llm_provider,
llm_model=llm_model,
user=user_email,
campaign_id=campaign_id,
pricing_reference_id=pricing_reference_id,
batch_id=batch_id,
app=app
)
def run_batch():
with app.app_context():
result = batch_executor.execute()
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
if progress['status'] == 'completed':
tracker.update(
100,
progress['message'],
details={'batch_results': result}
)
thread = threading.Thread(target=run_batch)
thread.daemon = True
thread.start()
logger.info(f"Starting batch Video QC for {len(files)} files (session: {session_id})")
return jsonify({
'success': True,
'session_id': session_id,
'file_count': len(files),
'message': f'Batch Video QC started for {len(files)} files'
})
except Exception as e:
logger.error(f"Batch execute error: {e}")
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/progress/<session_id>')
def progress_stream(session_id):
"""SSE progress stream."""
try:
tracker = UnifiedProgressTracker(session_id)
return Response(
stream_with_context(tracker.stream_progress()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
except Exception as e:
logger.error(f"Progress stream error: {e}")
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/api/progress/<session_id>')
def progress_poll(session_id):
"""Polling progress endpoint."""
try:
tracker = UnifiedProgressTracker(session_id)
return jsonify(tracker.get_progress())
except Exception as e:
logger.error(f"Progress poll error: {e}")
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/results/<session_id>')
def results(session_id):
"""Show single-file results page."""
try:
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
report = QCReport.query.filter(
QCReport.metadata_json.like(f'%{session_id}%')
).order_by(QCReport.created_at.desc()).first()
html_content = None
if report and report.file_path and os.path.exists(report.file_path):
with open(report.file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
return render_template(
'video_qc/results.html',
active_tab='video-qc',
session_id=session_id,
progress=progress,
report=report,
html_content=html_content
)
except Exception as e:
logger.error(f"Results error: {e}")
return render_template(
'video_qc/results.html',
active_tab='video-qc',
session_id=session_id,
error=str(e)
)
@video_qc_bp.route('/results/batch/<session_id>')
def results_batch(session_id):
"""Show batch Video QC results."""
try:
# Force a fresh DB read — per-file saves come from the background
# thread (and possibly a different gunicorn worker), so the request
# session may otherwise serve stale views of the QCReport table.
# This mirrors the pattern in UnifiedProgressTracker.get_progress.
db.session.expire_all()
db.session.commit()
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
# All reports from this batch session
reports = QCReport.query.filter(
QCReport.metadata_json.like(f'%{session_id}%')
).order_by(QCReport.created_at.desc()).all()
# Extract batch_id from first report's metadata
batch_id = None
if reports:
try:
import json as json_module
meta = json_module.loads(reports[0].metadata_json or '{}')
batch_id = meta.get('batch_id')
except Exception:
pass
batch_results = None
details = progress.get('details')
if details and isinstance(details, dict):
batch_results = details.get('batch_results')
return render_template(
'video_qc/results.html',
active_tab='video-qc',
session_id=session_id,
is_batch=True,
progress=progress,
reports=reports,
batch_id=batch_id,
batch_results=batch_results
)
except Exception as e:
logger.error(f"Batch results error: {e}")
return render_template(
'video_qc/results.html',
active_tab='video-qc',
session_id=session_id,
is_batch=True,
error=str(e)
)
@video_qc_bp.route('/report/<int:report_id>')
def view_report(report_id):
"""View a saved video QC report."""
try:
report = QCReport.query.get(report_id)
if not report:
return render_template('video_qc/results.html', error='Report not found'), 404
html_content = None
if report.file_path and os.path.exists(report.file_path):
with open(report.file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
return render_template(
'video_qc/view_report.html',
active_tab='video-qc',
report=report,
html_content=html_content
)
except Exception as e:
logger.error(f"View report error: {e}")
return render_template('video_qc/results.html', error=str(e)), 500
@video_qc_bp.route('/report/<int:report_id>', methods=['DELETE'])
def delete_report(report_id):
"""Delete a video QC report."""
try:
report = QCReport.query.get(report_id)
if not report:
return jsonify({'error': 'Report not found'}), 404
if report.file_path and os.path.exists(report.file_path):
os.remove(report.file_path)
db.session.delete(report)
db.session.commit()
return jsonify({'success': True})
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/report/<int:report_id>/download')
def download_report(report_id):
"""Download a video QC report HTML file."""
try:
report = QCReport.query.get(report_id)
if not report:
return jsonify({'error': 'Report not found'}), 404
if not report.file_path or not os.path.exists(report.file_path):
return jsonify({'error': 'Report file not found on disk'}), 404
return send_file(
os.path.abspath(report.file_path),
as_attachment=True,
download_name=os.path.basename(report.file_path)
)
except Exception as e:
logger.error(f"Error downloading report {report_id}: {e}")
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/report/batch/<batch_id>/download')
def download_batch(batch_id):
"""Download all reports from a batch as a ZIP file."""
import zipfile
from io import BytesIO
try:
reports = QCReport.get_by_batch_id(batch_id, report_type='video_qc')
if not reports:
return jsonify({'error': 'No reports found for this batch'}), 404
buffer = BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for report in reports:
if report.file_path and os.path.exists(report.file_path):
zf.write(report.file_path, os.path.basename(report.file_path))
buffer.seek(0)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
return send_file(
buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f'VideoQC_Batch_{batch_id[:8]}_{timestamp}.zip'
)
except Exception as e:
logger.error(f"Error downloading batch {batch_id}: {e}")
return jsonify({'error': str(e)}), 500
@video_qc_bp.route('/report/batch/<batch_id>', methods=['DELETE'])
def delete_batch(batch_id):
"""Delete all reports in a batch."""
try:
reports = QCReport.get_by_batch_id(batch_id, report_type='video_qc')
if not reports:
return jsonify({'error': 'No reports found for this batch'}), 404
for report in reports:
if report.file_path and os.path.exists(report.file_path):
os.remove(report.file_path)
db.session.delete(report)
db.session.commit()
logger.info(f"Deleted video batch {batch_id} ({len(reports)} reports)")
return jsonify({'success': True, 'deleted': len(reports)})
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting batch {batch_id}: {e}")
return jsonify({'error': str(e)}), 500