hm_ai_qc_report_tool/modules/reporting/routes.py
nickviljoen a52d50d549 Reporting: show searched Box folder under the search input
Mirrors the hint pattern just added to Video Master so users can see
exactly which Box folder the search is scanning, with a clickable
link to open it in Box for self-diagnosis when a job number doesn't
turn up.
2026-05-09 20:23:06 +02:00

805 lines
27 KiB
Python

"""
Reporting Module Routes.
Handles all reporting-related endpoints:
- Search and dashboard (sync and async)
- Report viewing (parsed and raw)
- Report export (HTML, CSV, errors-only)
- Progress tracking (SSE and polling)
- Multi-job support
"""
import csv
import io
import logging
import threading
import uuid
from datetime import datetime
from io import BytesIO
from flask import (
render_template, request, jsonify, send_file, current_app,
Response, stream_with_context
)
from .blueprint import reporting_bp
from .aggregator import ReportAggregator
from .result_cache import cache_set, cache_get
from core.utils.report_parser import (
QCReportParser, aggregate_reports,
sanitize_error_for_display, get_designer_friendly_checks
)
from core.utils.progress_tracker import UnifiedProgressTracker
from core.models.qc_report import QCReport
from core.models.database import db
from sqlalchemy import func
logger = logging.getLogger(__name__)
def _save_search_results(result, job_numbers, is_multi):
"""Save Box reports from a search to the database for history."""
try:
import os
import json
if is_multi:
all_parsed = []
for jn, job_data in result.get('per_job', {}).items():
for report in job_data.get('parsed_reports', []):
report['_job_number'] = jn
all_parsed.append(report)
else:
all_parsed = result.get('parsed_reports', [])
for report in all_parsed:
report['_job_number'] = job_numbers[0] if job_numbers else None
saved = 0
for report in all_parsed:
if report.get('source') != 'box':
continue
box_id = report.get('box_id')
filename = report.get('box_name') or report.get('filename', 'unknown')
job_number = report.get('_job_number')
html_content = report.get('html_content', '')
# Skip if already saved (check by box_id in metadata)
existing = QCReport.query.filter(
QCReport.metadata_json.like(f'%"box_id": "{box_id}"%')
).first()
if existing:
continue
# Save HTML to disk
report_dir = os.path.join('storage', 'reports', 'box', job_number or 'no_job')
os.makedirs(report_dir, exist_ok=True)
report_path = os.path.join(report_dir, filename)
if html_content:
with open(report_path, 'w', encoding='utf-8') as f:
f.write(html_content)
# Save to database
db_report = QCReport(
report_type='box_import',
job_number=job_number,
filename=filename,
file_path=report_path,
score=None,
status='imported',
created_by='box_search',
metadata_json=json.dumps({'box_id': box_id, 'source': 'box'})
)
db.session.add(db_report)
saved += 1
if saved > 0:
db.session.commit()
logger.info(f"Saved {saved} Box reports to database")
except Exception as e:
logger.error(f"Error saving search results: {e}")
db.session.rollback()
@reporting_bp.route('/')
@reporting_bp.route('/index')
def index():
"""Render main reporting page with search interface and recent reports."""
try:
# Box reports grouped by job number — aggregate in SQL so every job is
# represented (a per-row LIMIT could otherwise hide jobs whose rows fall
# outside the window when one job dominates the recent reports).
rows = db.session.query(
QCReport.job_number,
func.count(QCReport.id).label('count'),
func.max(QCReport.created_at).label('latest'),
).filter_by(report_type='box_import').group_by(
QCReport.job_number
).order_by(func.max(QCReport.created_at).desc()).all()
box_jobs = {}
for job_number, count, latest in rows:
if isinstance(latest, str):
try:
latest = datetime.fromisoformat(latest)
except ValueError:
latest = None
box_jobs[job_number or 'No Job Number'] = {
'count': count, 'latest': latest
}
except Exception:
box_jobs = {}
return render_template(
'reporting/index.html',
active_tab='reporting',
box_jobs=box_jobs,
report_folder_id=current_app.config.get('BOX_REPORT_FOLDER_ID')
)
@reporting_bp.route('/history/delete/<job_number>', methods=['DELETE'])
def delete_box_reports(job_number):
"""Delete all saved Box reports for a job number."""
import os
try:
reports = QCReport.query.filter_by(
job_number=job_number, report_type='box_import'
).all()
for report in reports:
if report.file_path and os.path.exists(report.file_path):
os.remove(report.file_path)
db.session.delete(report)
db.session.commit()
return jsonify({'success': True, 'deleted': len(reports)})
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting box reports for {job_number}: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/history/<job_number>')
def history_dashboard(job_number):
"""View previously saved reports for a job number from the database (no Box re-fetch)."""
try:
reports = QCReport.query.filter_by(job_number=job_number).order_by(
QCReport.created_at.desc()
).all()
if not reports:
return render_template(
'reporting/dashboard.html',
job_number=job_number,
error=f'No saved reports found for job {job_number}'
)
# Parse saved HTML files
parsed_reports = []
for report in reports:
try:
import os
if os.path.exists(report.file_path):
with open(report.file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
parser = QCReportParser(html_content)
parsed_data = parser.parse()
parsed_data['source'] = report.report_type
parsed_data['source_label'] = 'Box.com' if report.report_type == 'box_import' else 'QC Tool'
parsed_data['filename'] = report.filename
parsed_data['html_content'] = html_content
parsed_data['score'] = report.score
parsed_data['status'] = report.status
parsed_data['created_at'] = report.created_at.isoformat() if report.created_at else None
parsed_reports.append(parsed_data)
else:
logger.warning(f"Report file not found: {report.file_path}")
except Exception as e:
logger.error(f"Error parsing saved report {report.filename}: {e}")
aggregated = aggregate_reports(parsed_reports)
for report in parsed_reports:
if 'checks' in report:
report['friendly_checks'] = get_designer_friendly_checks(report)
return render_template(
'reporting/dashboard.html',
job_number=job_number,
reports=parsed_reports,
aggregated=aggregated,
statistics={'total': len(parsed_reports)},
from_history=True
)
except Exception as e:
logger.error(f"Error loading history for {job_number}: {e}")
return render_template(
'reporting/dashboard.html',
job_number=job_number,
error=f'Error loading reports: {str(e)}'
)
@reporting_bp.route('/search', methods=['POST'])
def search():
"""
Search for reports by job number (synchronous).
Returns JSON with consolidated reports from Box.com and database.
"""
try:
data = request.get_json()
job_number = data.get('job_number', '').strip()
if not job_number:
return jsonify({'error': 'Job number is required'}), 400
logger.info(f"Searching for job number: {job_number}")
# Get Box client from app context
box_client = current_app.get_box_client()
# Create aggregator and get consolidated reports
aggregator = ReportAggregator(box_client)
reports = aggregator.get_consolidated_reports(job_number)
if not reports:
return jsonify({
'job_number': job_number,
'reports': [],
'statistics': {'total': 0},
'message': f'No reports found for job number: {job_number}'
})
# Calculate statistics
statistics = aggregator.get_report_statistics(reports)
return jsonify({
'job_number': job_number,
'reports': reports,
'statistics': statistics,
'count': len(reports)
})
except Exception as e:
logger.error(f"Error searching for reports: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/search/async', methods=['POST'])
def search_async():
"""
Start async search for reports by job number(s).
Supports comma-separated job numbers for multi-job searches.
Returns session_id for progress tracking.
"""
try:
data = request.get_json()
raw_input = data.get('job_number', '').strip()
if not raw_input:
return jsonify({'error': 'Job number is required'}), 400
# Parse comma-separated job numbers
job_numbers = [j.strip() for j in raw_input.split(',') if j.strip()]
if not job_numbers:
return jsonify({'error': 'No valid job numbers provided'}), 400
session_id = str(uuid.uuid4())
is_multi = len(job_numbers) > 1
logger.info(f"Async search started: {job_numbers} (session: {session_id})")
# Start background thread
app = current_app._get_current_object()
def run_search():
with app.app_context():
try:
tracker = UnifiedProgressTracker(session_id)
box_client = app.get_box_client()
aggregator = ReportAggregator(box_client)
if is_multi:
result = aggregator.get_consolidated_reports_multi(job_numbers, tracker)
else:
result = aggregator.get_consolidated_reports_with_progress(
job_numbers[0], tracker
)
# Cache results
cache_set(f"search_{session_id}", {
'result': result,
'job_numbers': job_numbers,
'is_multi': is_multi
})
# Save Box reports to database for history
_save_search_results(result, job_numbers, is_multi)
tracker.complete("Search complete")
except Exception as e:
logger.error(f"Async search error: {e}", exc_info=True)
tracker = UnifiedProgressTracker(session_id)
tracker.fail(f"Search failed: {str(e)}")
thread = threading.Thread(target=run_search)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'session_id': session_id,
'job_numbers': job_numbers,
'is_multi': is_multi,
'progress_url': f'/reporting/progress/{session_id}'
})
except Exception as e:
logger.error(f"Error starting async search: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/progress/<session_id>')
def progress_stream(session_id):
"""SSE stream for search progress tracking."""
try:
tracker = UnifiedProgressTracker(session_id)
return Response(
stream_with_context(tracker.stream_progress()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
except Exception as e:
logger.error(f"Progress stream error: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/api/progress/<session_id>')
def progress_poll(session_id):
"""Polling fallback for progress tracking."""
try:
tracker = UnifiedProgressTracker(session_id)
progress = tracker.get_progress()
return jsonify(progress)
except Exception as e:
logger.error(f"Progress poll error: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/dashboard/<job_number>')
def dashboard(job_number):
"""
Render consolidated dashboard for a specific job number.
Tries to load from cache (async search result) first,
falls back to synchronous fetch.
"""
try:
logger.info(f"Loading dashboard for job number: {job_number}")
# Check for cached async result via session_id query param
session_id = request.args.get('session_id')
cached = None
if session_id:
cached = cache_get(f"search_{session_id}")
if cached and not cached.get('is_multi'):
result = cached['result']
parsed_reports = result['parsed_reports']
aggregated = result['aggregated']
statistics = result['statistics']
else:
# Synchronous fallback
box_client = current_app.get_box_client()
aggregator = ReportAggregator(box_client)
result = aggregator.get_consolidated_reports_with_progress(job_number)
parsed_reports = result['parsed_reports']
aggregated = result['aggregated']
statistics = result['statistics']
if not parsed_reports:
return render_template(
'reporting/dashboard.html',
active_tab='reporting',
job_number=job_number,
error=f'No reports found for job number: {job_number}'
)
# Generate designer-friendly check data for each report
for report in parsed_reports:
if 'checks' in report:
report['friendly_checks'] = get_designer_friendly_checks(report)
return render_template(
'reporting/dashboard.html',
active_tab='reporting',
job_number=job_number,
reports=parsed_reports,
aggregated=aggregated,
statistics=statistics,
report_count=len(parsed_reports)
)
except Exception as e:
logger.error(f"Error loading dashboard: {e}")
return render_template(
'reporting/dashboard.html',
active_tab='reporting',
job_number=job_number,
error=str(e)
)
@reporting_bp.route('/dashboard/multi')
def dashboard_multi():
"""
Render multi-job dashboard.
Requires session_id query parameter pointing to cached multi-job result.
"""
try:
session_id = request.args.get('session_id')
if not session_id:
return render_template(
'reporting/dashboard_multi.html',
active_tab='reporting',
error='No session ID provided. Please search first.'
)
cached = cache_get(f"search_{session_id}")
if not cached or not cached.get('is_multi'):
return render_template(
'reporting/dashboard_multi.html',
active_tab='reporting',
error='Search results expired or not found. Please search again.'
)
result = cached['result']
job_numbers = cached['job_numbers']
# Generate friendly checks for each report
for job_number, job_data in result['per_job'].items():
for report in job_data['parsed_reports']:
if 'checks' in report:
report['friendly_checks'] = get_designer_friendly_checks(report)
return render_template(
'reporting/dashboard_multi.html',
active_tab='reporting',
result=result,
job_numbers=job_numbers,
session_id=session_id
)
except Exception as e:
logger.error(f"Error loading multi-job dashboard: {e}")
return render_template(
'reporting/dashboard_multi.html',
active_tab='reporting',
error=str(e)
)
@reporting_bp.route('/api/report/<file_id>')
def get_report(file_id):
"""Get parsed report data as JSON."""
try:
box_client = current_app.get_box_client()
content = box_client.download_file(file_id)
html_content = content.decode('utf-8')
parser = QCReportParser(html_content)
parsed_data = parser.parse()
return jsonify(parsed_data)
except Exception as e:
logger.error(f"Error getting report {file_id}: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/api/report/<file_id>/raw')
def get_raw_report(file_id):
"""Get raw HTML report."""
try:
box_client = current_app.get_box_client()
content = box_client.download_file(file_id)
return content, 200, {'Content-Type': 'text/html; charset=utf-8'}
except Exception as e:
logger.error(f"Error getting raw report {file_id}: {e}")
return f'<html><body><h1>Error</h1><p>{str(e)}</p></body></html>', 500
@reporting_bp.route('/export/html/<job_number>')
def export_html(job_number):
"""Export combined report as HTML."""
try:
logger.info(f"Exporting HTML for job number: {job_number}")
box_client = current_app.get_box_client()
reports = box_client.search_by_job_number(job_number)
if not reports:
return jsonify({'error': f'No reports found for job number: {job_number}'}), 404
parsed_reports = []
for report_info in reports:
try:
content = box_client.download_file(report_info['id'])
html_content = content.decode('utf-8')
parser = QCReportParser(html_content)
parsed_data = parser.parse()
parsed_data['html_content'] = html_content
parsed_reports.append(parsed_data)
except Exception as e:
logger.error(f"Error parsing report for HTML export: {e}")
aggregated = aggregate_reports(parsed_reports)
html_string = render_template(
'reporting/pdf_export.html',
job_number=job_number,
reports=parsed_reports,
aggregated=aggregated,
generated_at=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'QC_Report_{job_number}_{timestamp}.html'
return send_file(
BytesIO(html_string.encode('utf-8')),
mimetype='text/html',
as_attachment=True,
download_name=filename
)
except Exception as e:
logger.error(f"Error exporting HTML: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/export/html/<job_number>/errors')
def export_html_errors_only(job_number):
"""Export error reports only as HTML."""
try:
logger.info(f"Exporting error reports only for job number: {job_number}")
box_client = current_app.get_box_client()
reports = box_client.search_by_job_number(job_number)
if not reports:
return jsonify({'error': f'No reports found for job number: {job_number}'}), 404
parsed_reports = []
for report_info in reports:
try:
content = box_client.download_file(report_info['id'])
html_content = content.decode('utf-8')
parser = QCReportParser(html_content)
parsed_data = parser.parse()
parsed_data['html_content'] = html_content
if parsed_data.get('summary', {}).get('error', 0) > 0:
parsed_reports.append(parsed_data)
except Exception as e:
logger.error(f"Error parsing report for error export: {e}")
if not parsed_reports:
return jsonify({'error': f'No error reports found for job number: {job_number}'}), 404
aggregated = aggregate_reports(parsed_reports)
html_string = render_template(
'reporting/pdf_export.html',
job_number=job_number,
reports=parsed_reports,
aggregated=aggregated,
generated_at=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
errors_only=True
)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'QC_Report_{job_number}_ERRORS_ONLY_{timestamp}.html'
return send_file(
BytesIO(html_string.encode('utf-8')),
mimetype='text/html',
as_attachment=True,
download_name=filename
)
except Exception as e:
logger.error(f"Error exporting error reports: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/export/csv/<job_number>')
def export_csv(job_number):
"""
Export all checks as CSV for a job number.
CSV columns: Job Number, Filename, Box Link, Check Name, Status,
Issue Description, Action Required
"""
try:
logger.info(f"Exporting CSV for job number: {job_number}")
# Try cache first
session_id = request.args.get('session_id')
parsed_reports = _get_parsed_reports(job_number, session_id)
if not parsed_reports:
return jsonify({'error': f'No reports found for job number: {job_number}'}), 404
rows = _build_csv_rows(job_number, parsed_reports)
output = _generate_csv(rows)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'QC_Report_{job_number}_{timestamp}.csv'
return send_file(
BytesIO(output.encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=filename
)
except Exception as e:
logger.error(f"Error exporting CSV: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/export/csv/<job_number>/errors')
def export_csv_errors(job_number):
"""Export only error/warning checks as CSV."""
try:
logger.info(f"Exporting error CSV for job number: {job_number}")
session_id = request.args.get('session_id')
parsed_reports = _get_parsed_reports(job_number, session_id)
if not parsed_reports:
return jsonify({'error': f'No reports found for job number: {job_number}'}), 404
rows = _build_csv_rows(job_number, parsed_reports, errors_only=True)
if not rows:
return jsonify({'error': f'No errors found for job number: {job_number}'}), 404
output = _generate_csv(rows)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'QC_Report_{job_number}_ERRORS_{timestamp}.csv'
return send_file(
BytesIO(output.encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=filename
)
except Exception as e:
logger.error(f"Error exporting error CSV: {e}")
return jsonify({'error': str(e)}), 500
@reporting_bp.route('/export/csv/multi')
def export_csv_multi():
"""Export multi-job CSV from cached results."""
try:
session_id = request.args.get('session_id')
if not session_id:
return jsonify({'error': 'Session ID required'}), 400
cached = cache_get(f"search_{session_id}")
if not cached or not cached.get('is_multi'):
return jsonify({'error': 'Multi-job results not found or expired'}), 404
result = cached['result']
all_rows = []
for job_number, job_data in result['per_job'].items():
rows = _build_csv_rows(job_number, job_data['parsed_reports'])
all_rows.extend(rows)
if not all_rows:
return jsonify({'error': 'No data to export'}), 404
output = _generate_csv(all_rows)
job_list = '_'.join(cached['job_numbers'][:3])
if len(cached['job_numbers']) > 3:
job_list += f'_+{len(cached["job_numbers"]) - 3}'
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'QC_Report_Multi_{job_list}_{timestamp}.csv'
return send_file(
BytesIO(output.encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=filename
)
except Exception as e:
logger.error(f"Error exporting multi-job CSV: {e}")
return jsonify({'error': str(e)}), 500
# --- Helper functions ---
def _get_parsed_reports(job_number, session_id=None):
"""Get parsed reports from cache or by fetching fresh."""
# Try cache
if session_id:
cached = cache_get(f"search_{session_id}")
if cached and not cached.get('is_multi'):
return cached['result']['parsed_reports']
# Fetch fresh
box_client = current_app.get_box_client()
aggregator = ReportAggregator(box_client)
result = aggregator.get_consolidated_reports_with_progress(job_number)
return result['parsed_reports']
def _build_csv_rows(job_number, parsed_reports, errors_only=False):
"""Build CSV rows from parsed reports."""
rows = []
for report in parsed_reports:
filename = report.get('filename', report.get('box_name', 'Unknown'))
box_id = report.get('box_id')
box_link = f'https://app.box.com/file/{box_id}' if box_id else ''
checks = report.get('checks', [])
for check in checks:
sanitized = sanitize_error_for_display(check)
if errors_only and sanitized['status'] in ('passed', 'skipped'):
continue
rows.append({
'Job Number': job_number,
'Filename': filename,
'Box Link': box_link,
'Check Name': sanitized['check_name_display'],
'Status': sanitized['status_text'],
'Issue Description': sanitized['error_summary'],
'Action Required': sanitized['action_required'] if sanitized['status'] in ('error', 'warning', 'failed') else ''
})
return rows
def _generate_csv(rows):
"""Generate CSV string from row dictionaries."""
if not rows:
return ''
output = io.StringIO()
fieldnames = ['Job Number', 'Filename', 'Box Link', 'Check Name', 'Status', 'Issue Description', 'Action Required']
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return output.getvalue()