hm_ai_qc_report_tool/modules/hm_qc/routes.py
nickviljoen 4aa74b114a HM QC: thread signed-in user into batch executor
Single-file QC populated executor.context['user'] from current_user_email()
in routes.py, but batch QC routed through BatchQCExecutor — which never
accepted a user kwarg or set context['user'] on its per-file QCExecutor
instances. Result: every LLM call from a batched HM QC run logged as
anonymous in the Usage dashboard, only single-file and Video QC runs
showed the user's email.

BatchQCExecutor now takes user and stamps it onto each per-file
executor's context just before execute(), matching the Video QC
batch executor pattern.
2026-05-09 20:40:00 +02:00

822 lines
27 KiB
Python

"""
HM QC Module Routes.
Handles the complete QC workflow:
1. Upload - File upload (PDF/images), single or batch
2. Configure - Profile and check selection
3. Execute - Run QC checks with progress tracking (single or batch)
4. Results - Display scored results and download report
"""
import os
import uuid
import yaml
import logging
from datetime import datetime
from flask import (
render_template, request, jsonify, send_file, current_app,
Response, stream_with_context
)
from werkzeug.utils import secure_filename
from core.auth import current_user_email
from .blueprint import hm_qc_bp
from .scoring import ScoringEngine
from core.utils.progress_tracker import UnifiedProgressTracker
from core.models.qc_report import QCReport
from core.models.database import db
logger = logging.getLogger(__name__)
# Allowed file extensions
ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'psd'}
MAX_BATCH_FILES = 100
def allowed_file(filename):
"""Check if file extension is allowed."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@hm_qc_bp.route('/')
@hm_qc_bp.route('/index')
def index():
"""Main HM QC page with recent reports grouped by batch."""
try:
batches, individual_reports = QCReport.get_recent_grouped(
limit=100, report_type='hm_qc'
)
except Exception:
batches, individual_reports = [], []
return render_template(
'hm_qc/index.html',
active_tab='hm-qc',
batches=batches,
individual_reports=individual_reports
)
@hm_qc_bp.route('/upload', methods=['GET', 'POST'])
def upload():
"""
Handle file upload (single or multiple).
GET: Show upload form
POST: Process uploaded file(s)
"""
if request.method == 'GET':
try:
recent_reports = QCReport.get_recent(limit=10, report_type='hm_qc')
except Exception:
recent_reports = []
return render_template(
'hm_qc/upload.html',
active_tab='hm-qc',
recent_reports=recent_reports
)
# POST - handle file upload
try:
# Generate session ID
session_id = str(uuid.uuid4())
# Save file(s)
upload_path = os.path.join(
current_app.config['HM_QC_UPLOAD_PATH'],
session_id
)
os.makedirs(upload_path, exist_ok=True)
saved_files = []
# Check for multi-file upload
files = request.files.getlist('files')
if not files or (len(files) == 1 and files[0].filename == ''):
# Fallback to single file
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
files = [file]
# Validate count
if len(files) > MAX_BATCH_FILES:
return jsonify({
'error': f'Maximum {MAX_BATCH_FILES} files allowed. Got {len(files)}.'
}), 400
for file in files:
if not allowed_file(file.filename):
continue # Skip invalid files silently
filename = secure_filename(file.filename)
filepath = os.path.join(upload_path, filename)
file.save(filepath)
saved_files.append(filename)
if not saved_files:
return jsonify({
'error': f'No valid files. Allowed types: {", ".join(ALLOWED_EXTENSIONS)}'
}), 400
is_batch = len(saved_files) > 1
logger.info(f"{'Batch' if is_batch else 'File'} uploaded: {len(saved_files)} file(s) (session: {session_id})")
return jsonify({
'success': True,
'session_id': session_id,
'filename': saved_files[0] if not is_batch else None,
'filenames': saved_files,
'file_count': len(saved_files),
'is_batch': is_batch,
'message': f'{len(saved_files)} file(s) uploaded successfully'
})
except Exception as e:
logger.error(f"Upload error: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/configure/<session_id>')
def configure(session_id):
"""
Show configuration page for selecting profile and checks.
Args:
session_id: Upload session ID
"""
try:
# Load available profiles
profiles = load_profiles()
# Count uploaded files
upload_path = os.path.join(
current_app.config['HM_QC_UPLOAD_PATH'],
session_id
)
files = [f for f in os.listdir(upload_path) if allowed_file(f)] if os.path.exists(upload_path) else []
return render_template(
'hm_qc/configure.html',
active_tab='hm-qc',
session_id=session_id,
profiles=profiles,
file_count=len(files),
filenames=files
)
except Exception as e:
logger.error(f"Configure error: {e}")
return render_template(
'hm_qc/error.html',
active_tab='hm-qc',
error=str(e)
)
@hm_qc_bp.route('/execute', methods=['POST'])
def execute():
"""
Start QC execution (single file).
Expects JSON with:
- session_id: Upload session ID
- profile: Selected profile name
- checks: List of enabled check names (optional)
- job_number: Job number for reporting (optional)
"""
import threading
from .executor import QCExecutor
try:
data = request.get_json()
session_id = data.get('session_id')
profile_name = data.get('profile')
job_number = data.get('job_number')
llm_provider = data.get('llm_provider')
if not session_id or not profile_name:
return jsonify({'error': 'Missing required parameters'}), 400
# Get file path from session
upload_path = os.path.join(
current_app.config['HM_QC_UPLOAD_PATH'],
session_id
)
files = [f for f in os.listdir(upload_path) if allowed_file(f)]
if not files:
return jsonify({'error': 'Uploaded file not found'}), 404
file_path = os.path.join(upload_path, files[0])
# Load profile
profiles = load_profiles()
profile = profiles.get(profile_name)
if not profile:
return jsonify({'error': f'Profile "{profile_name}" not found'}), 404
# Override LLM provider if user selected one
if llm_provider:
provider_models = {'openai': 'gpt-4o', 'google': 'gemini-2.5-flash'}
for check in profile.get('checks', []):
if check.get('llm_provider'):
check['llm_provider'] = llm_provider
check['llm_model'] = provider_models.get(llm_provider, check.get('llm_model'))
logger.info(f"Starting QC execution for session {session_id} with profile {profile_name}")
campaign_id = data.get('campaign_id')
pricing_reference_id = data.get('pricing_reference_id')
executor = QCExecutor(
session_id=session_id,
file_path=file_path,
profile=profile,
job_number=job_number,
campaign_id=campaign_id,
pricing_reference_id=pricing_reference_id
)
# Pass logged-in user to context for usage tracking
executor.context['user'] = current_user_email()
app = current_app._get_current_object()
def run_with_context():
with app.app_context():
executor.execute()
thread = threading.Thread(target=run_with_context)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'session_id': session_id,
'message': 'QC execution started',
'progress_url': f'/hm-qc/progress/{session_id}'
})
except Exception as e:
logger.error(f"Execute error: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/execute/batch', methods=['POST'])
def execute_batch():
"""
Start batch QC execution for multiple files.
Expects JSON with:
- session_id: Upload session ID
- profile: Selected profile name
- job_number: Job number for reporting (optional)
"""
import threading
from .batch_executor import BatchQCExecutor
try:
data = request.get_json()
session_id = data.get('session_id')
profile_name = data.get('profile')
job_number = data.get('job_number')
llm_provider = data.get('llm_provider')
if not session_id or not profile_name:
return jsonify({'error': 'Missing required parameters'}), 400
upload_path = os.path.join(
current_app.config['HM_QC_UPLOAD_PATH'],
session_id
)
files = [f for f in os.listdir(upload_path) if allowed_file(f)]
if not files:
return jsonify({'error': 'No uploaded files found'}), 404
file_paths = [os.path.join(upload_path, f) for f in files]
profiles = load_profiles()
profile = profiles.get(profile_name)
if not profile:
return jsonify({'error': f'Profile "{profile_name}" not found'}), 404
# Override LLM provider if user selected one
if llm_provider:
provider_models = {'openai': 'gpt-4o', 'google': 'gemini-2.5-flash'}
for check in profile.get('checks', []):
if check.get('llm_provider'):
check['llm_provider'] = llm_provider
check['llm_model'] = provider_models.get(llm_provider, check.get('llm_model'))
logger.info(f"Starting batch QC for {len(files)} files (session: {session_id})")
campaign_id = data.get('campaign_id')
pricing_reference_id = data.get('pricing_reference_id')
batch_id = str(uuid.uuid4())
app = current_app._get_current_object()
batch_executor = BatchQCExecutor(
session_id=session_id,
file_paths=file_paths,
profile=profile,
job_number=job_number,
campaign_id=campaign_id,
pricing_reference_id=pricing_reference_id,
batch_id=batch_id,
user=current_user_email(),
app=app
)
def run_batch():
with app.app_context():
result = batch_executor.execute()
# Store results in progress details for retrieval
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
if progress['status'] == 'completed':
tracker.update(
100,
progress['message'],
details={'batch_results': result}
)
thread = threading.Thread(target=run_batch)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'session_id': session_id,
'file_count': len(files),
'message': f'Batch QC started for {len(files)} files',
'progress_url': f'/hm-qc/progress/{session_id}'
})
except Exception as e:
logger.error(f"Batch execute error: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/progress/<session_id>')
def progress_stream(session_id):
"""SSE stream for progress tracking."""
try:
tracker = UnifiedProgressTracker(session_id)
return Response(
stream_with_context(tracker.stream_progress()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
except Exception as e:
logger.error(f"Progress stream error: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/api/progress/<session_id>')
def progress_poll(session_id):
"""Polling endpoint for progress tracking (fallback)."""
try:
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
return jsonify(progress)
except Exception as e:
logger.error(f"Progress poll error: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/report/<int:report_id>', methods=['DELETE'])
def delete_report(report_id):
"""Delete a QC report by database ID."""
try:
report = QCReport.query.get(report_id)
if not report:
return jsonify({'error': 'Report not found'}), 404
# Delete file from disk
if report.file_path and os.path.exists(report.file_path):
os.remove(report.file_path)
db.session.delete(report)
db.session.commit()
return jsonify({'success': True})
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting report {report_id}: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/thumbnail/<int:report_id>')
def thumbnail(report_id):
"""Serve a thumbnail image for a report."""
import json as json_module
try:
report = QCReport.query.get(report_id)
if not report or not report.metadata_json:
return '', 404
meta = json_module.loads(report.metadata_json)
thumb_path = meta.get('thumbnail_path')
if not thumb_path or not os.path.exists(thumb_path):
return '', 404
return send_file(
os.path.abspath(thumb_path),
mimetype='image/jpeg',
max_age=86400 # Cache for 24 hours
)
except Exception:
return '', 404
@hm_qc_bp.route('/report/<int:report_id>/download')
def download_report(report_id):
"""Download a QC report HTML file."""
from flask import send_file
try:
report = QCReport.query.get(report_id)
if not report:
return jsonify({'error': 'Report not found'}), 404
if not report.file_path or not os.path.exists(report.file_path):
return jsonify({'error': 'Report file not found on disk'}), 404
return send_file(
os.path.abspath(report.file_path),
as_attachment=True,
download_name=os.path.basename(report.file_path)
)
except Exception as e:
logger.error(f"Error downloading report {report_id}: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/report/batch/<batch_id>/download')
def download_batch(batch_id):
"""Download all reports from a batch as a ZIP file."""
import zipfile
from io import BytesIO
try:
reports = QCReport.get_by_batch_id(batch_id, report_type='hm_qc')
if not reports:
return jsonify({'error': 'No reports found for this batch'}), 404
buffer = BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for report in reports:
if report.file_path and os.path.exists(report.file_path):
zf.write(report.file_path, os.path.basename(report.file_path))
buffer.seek(0)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
return send_file(
buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f'QC_Batch_{batch_id[:8]}_{timestamp}.zip'
)
except Exception as e:
logger.error(f"Error downloading batch {batch_id}: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/report/batch/<batch_id>', methods=['DELETE'])
def delete_batch(batch_id):
"""Delete all reports in a batch."""
try:
reports = QCReport.get_by_batch_id(batch_id, report_type='hm_qc')
if not reports:
return jsonify({'error': 'No reports found for this batch'}), 404
for report in reports:
# Delete report file from disk
if report.file_path and os.path.exists(report.file_path):
os.remove(report.file_path)
# Delete thumbnail
thumb_path = None
if report.metadata_json:
try:
import json as json_module
meta = json_module.loads(report.metadata_json)
thumb_path = meta.get('thumbnail_path')
except Exception:
pass
if thumb_path and os.path.exists(thumb_path):
os.remove(thumb_path)
db.session.delete(report)
db.session.commit()
logger.info(f"Deleted batch {batch_id} ({len(reports)} reports)")
return jsonify({'success': True, 'deleted': len(reports)})
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting batch {batch_id}: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/report/consolidated', methods=['POST'])
def consolidated_report():
"""Generate a consolidated HTML report from selected report IDs."""
from io import BytesIO
import json as json_module
try:
data = request.get_json()
report_ids = data.get('report_ids', [])
if not report_ids:
return jsonify({'error': 'No reports selected'}), 400
reports = QCReport.query.filter(QCReport.id.in_(report_ids)).order_by(
QCReport.created_at.asc()
).all()
if not reports:
return jsonify({'error': 'No reports found'}), 404
# Build consolidated HTML
scores = [r.score for r in reports if r.score is not None]
avg_score = round(sum(scores) / len(scores), 1) if scores else 0
passed = sum(1 for r in reports if r.status == 'passed')
warnings = sum(1 for r in reports if r.status == 'warning')
failed = sum(1 for r in reports if r.status in ('failed', 'error'))
if avg_score >= 90:
overall_color = '#28a745'
elif avg_score >= 70:
overall_color = '#ffc107'
else:
overall_color = '#dc3545'
job_numbers = sorted(set(r.job_number for r in reports if r.job_number))
rows_html = ''
for r in reports:
s_color = '#28a745' if r.status == 'passed' else '#ffc107' if r.status == 'warning' else '#dc3545'
s_bg = s_color
s_text = 'white' if r.status != 'warning' else 'black'
rows_html += f'''<tr>
<td>{r.filename}</td>
<td>{r.job_number or '-'}</td>
<td style="font-weight:bold;">{r.score:.0f if r.score is not None else '-'}</td>
<td><span style="background:{s_bg};color:{s_text};padding:3px 8px;border-radius:4px;font-size:12px;">{r.status.upper()}</span></td>
<td>{r.created_at.strftime('%Y-%m-%d %H:%M') if r.created_at else '-'}</td>
</tr>'''
# Embed individual reports
individual_html = ''
for r in reports:
if r.file_path and os.path.exists(r.file_path):
with open(r.file_path, 'r', encoding='utf-8') as f:
content = f.read()
individual_html += f'''
<div style="page-break-before:always; margin-top:40px; border:1px solid #ddd; border-radius:8px; overflow:hidden;">
<div style="background:#222;color:#FFDD00;padding:10px 20px;font-weight:bold;">
{r.filename}{r.score:.0f if r.score is not None else 'N/A'} ({r.status.upper()})
</div>
<iframe srcdoc="{content.replace(chr(34), '&quot;')}" style="width:100%;height:600px;border:none;"></iframe>
</div>'''
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
html = f'''<!DOCTYPE html>
<html><head><meta charset="UTF-8">
<title>Consolidated QC Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }}
.container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 30px; border-radius: 8px; }}
.header {{ text-align: center; border-bottom: 3px solid #FFDD00; padding-bottom: 20px; margin-bottom: 30px; }}
table {{ width: 100%; border-collapse: collapse; }}
th {{ background: #222; color: #FFDD00; padding: 10px; text-align: left; }}
td {{ padding: 8px 10px; border-bottom: 1px solid #eee; }}
tr:hover {{ background: #f8f9fa; }}
.summary-grid {{ display: flex; gap: 20px; justify-content: center; margin: 20px 0; }}
.summary-box {{ text-align: center; padding: 15px 25px; border-radius: 8px; background: #f8f9fa; }}
.summary-box .num {{ font-size: 2rem; font-weight: bold; }}
.summary-box .label {{ font-size: 0.85rem; color: #666; }}
</style>
</head><body>
<div class="container">
<div class="header">
<h1>Consolidated QC Report</h1>
<p><strong>Generated:</strong> {timestamp}</p>
<p><strong>Job Number(s):</strong> {', '.join(job_numbers) if job_numbers else 'N/A'}</p>
<div class="summary-grid">
<div class="summary-box"><div class="num" style="color:#0d6efd;">{len(reports)}</div><div class="label">Total Files</div></div>
<div class="summary-box"><div class="num" style="color:#28a745;">{passed}</div><div class="label">Passed</div></div>
<div class="summary-box"><div class="num" style="color:#dc3545;">{failed}</div><div class="label">Failed</div></div>
<div class="summary-box"><div class="num" style="color:#ffc107;">{warnings}</div><div class="label">Warnings</div></div>
<div class="summary-box"><div class="num" style="color:{overall_color};">{avg_score}</div><div class="label">Avg Score</div></div>
</div>
</div>
<h2>Summary Table</h2>
<table><thead><tr><th>Filename</th><th>Job #</th><th>Score</th><th>Status</th><th>Date</th></tr></thead>
<tbody>{rows_html}</tbody></table>
<h2 style="margin-top:40px;">Individual Reports</h2>
{individual_html}
</div></body></html>'''
buffer = BytesIO(html.encode('utf-8'))
buffer.seek(0)
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
return send_file(
buffer,
mimetype='text/html',
as_attachment=True,
download_name=f'QC_Consolidated_{ts}.html'
)
except Exception as e:
logger.error(f"Consolidated report error: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/report/<int:report_id>')
def view_report(report_id):
"""View a saved QC report by database ID."""
try:
report = QCReport.query.get(report_id)
if not report:
return render_template('hm_qc/results.html', error='Report not found'), 404
# Read the HTML report file
html_content = None
if report.file_path and os.path.exists(report.file_path):
with open(report.file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
return render_template(
'hm_qc/view_report.html',
active_tab='hm-qc',
report=report,
html_content=html_content
)
except Exception as e:
logger.error(f"Error viewing report {report_id}: {e}")
return render_template('hm_qc/results.html', error=str(e)), 500
@hm_qc_bp.route('/results/<session_id>')
def results(session_id):
"""
Show QC results (single file).
Args:
session_id: QC session ID
"""
try:
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
if progress['status'] != 'completed':
return render_template(
'hm_qc/results.html',
active_tab='hm-qc',
session_id=session_id,
results=None,
progress=progress,
message=f"QC execution {progress['status']}: {progress.get('message', 'Processing...')}"
)
# Find report in database by session_id
report = QCReport.query.filter(
QCReport.metadata_json.like(f'%{session_id}%')
).order_by(QCReport.created_at.desc()).first()
if not report:
return render_template(
'hm_qc/results.html',
active_tab='hm-qc',
session_id=session_id,
results=None,
message='Report not found'
)
# Read report HTML
with open(report.file_path, 'r', encoding='utf-8') as f:
report_html = f.read()
return render_template(
'hm_qc/results.html',
active_tab='hm-qc',
session_id=session_id,
report=report,
report_html=report_html,
message='QC execution completed successfully'
)
except Exception as e:
logger.error(f"Results error: {e}")
return render_template(
'hm_qc/error.html',
active_tab='hm-qc',
error=str(e)
)
@hm_qc_bp.route('/results/batch/<session_id>')
def results_batch(session_id):
"""
Show batch QC results.
Args:
session_id: Batch session ID
"""
try:
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
if progress['status'] != 'completed':
return render_template(
'hm_qc/results.html',
active_tab='hm-qc',
session_id=session_id,
results=None,
is_batch=True,
progress=progress,
message=f"Batch QC {progress['status']}: {progress.get('message', 'Processing...')}"
)
# Get batch results from progress details
batch_results = None
details = progress.get('details')
if details and isinstance(details, dict):
batch_results = details.get('batch_results')
# Also query database for all reports from this batch session
reports = QCReport.query.filter(
QCReport.metadata_json.like(f'%{session_id}%')
).order_by(QCReport.created_at.desc()).all()
# Extract batch_id from first report's metadata
batch_id = None
if reports:
try:
import json as json_module
meta = json_module.loads(reports[0].metadata_json or '{}')
batch_id = meta.get('batch_id')
except Exception:
pass
return render_template(
'hm_qc/results.html',
active_tab='hm-qc',
session_id=session_id,
is_batch=True,
batch_results=batch_results,
reports=reports,
batch_id=batch_id,
message='Batch QC completed'
)
except Exception as e:
logger.error(f"Batch results error: {e}")
return render_template(
'hm_qc/error.html',
active_tab='hm-qc',
error=str(e)
)
@hm_qc_bp.route('/api/profiles')
def api_profiles():
"""Get available QC profiles."""
try:
profiles = load_profiles()
return jsonify({'profiles': profiles})
except Exception as e:
logger.error(f"API profiles error: {e}")
return jsonify({'error': str(e)}), 500
def load_profiles():
"""Load QC profiles from YAML configuration."""
profiles_path = os.path.join(
os.path.dirname(__file__),
'profiles',
'profiles.yaml'
)
try:
with open(profiles_path, 'r') as f:
data = yaml.safe_load(f)
return data.get('profiles', {})
except Exception as e:
logger.error(f"Error loading profiles: {e}")
return {}