""" Reporting Module Routes. Handles all reporting-related endpoints: - Search and dashboard (sync and async) - Report viewing (parsed and raw) - Report export (HTML, CSV, errors-only) - Progress tracking (SSE and polling) - Multi-job support """ import csv import io import logging import threading import uuid from datetime import datetime from io import BytesIO from flask import ( render_template, request, jsonify, send_file, current_app, Response, stream_with_context ) from .blueprint import reporting_bp from .aggregator import ReportAggregator from .result_cache import cache_set, cache_get from core.utils.report_parser import ( QCReportParser, aggregate_reports, sanitize_error_for_display, get_designer_friendly_checks ) from core.utils.progress_tracker import UnifiedProgressTracker from core.models.qc_report import QCReport from core.models.database import db from sqlalchemy import func logger = logging.getLogger(__name__) def _save_search_results(result, job_numbers, is_multi): """Save Box reports from a search to the database for history.""" try: import os import json if is_multi: all_parsed = [] for jn, job_data in result.get('per_job', {}).items(): for report in job_data.get('parsed_reports', []): report['_job_number'] = jn all_parsed.append(report) else: all_parsed = result.get('parsed_reports', []) for report in all_parsed: report['_job_number'] = job_numbers[0] if job_numbers else None saved = 0 for report in all_parsed: if report.get('source') != 'box': continue box_id = report.get('box_id') filename = report.get('box_name') or report.get('filename', 'unknown') job_number = report.get('_job_number') html_content = report.get('html_content', '') # Skip if already saved (check by box_id in metadata) existing = QCReport.query.filter( QCReport.metadata_json.like(f'%"box_id": "{box_id}"%') ).first() if existing: continue # Save HTML to disk report_dir = os.path.join('storage', 'reports', 'box', job_number or 'no_job') os.makedirs(report_dir, exist_ok=True) report_path = os.path.join(report_dir, filename) if html_content: with open(report_path, 'w', encoding='utf-8') as f: f.write(html_content) # Save to database db_report = QCReport( report_type='box_import', job_number=job_number, filename=filename, file_path=report_path, score=None, status='imported', created_by='box_search', metadata_json=json.dumps({'box_id': box_id, 'source': 'box'}) ) db.session.add(db_report) saved += 1 if saved > 0: db.session.commit() logger.info(f"Saved {saved} Box reports to database") except Exception as e: logger.error(f"Error saving search results: {e}") db.session.rollback() @reporting_bp.route('/') @reporting_bp.route('/index') def index(): """Render main reporting page with search interface and recent reports.""" try: # Box reports grouped by job number — aggregate in SQL so every job is # represented (a per-row LIMIT could otherwise hide jobs whose rows fall # outside the window when one job dominates the recent reports). rows = db.session.query( QCReport.job_number, func.count(QCReport.id).label('count'), func.max(QCReport.created_at).label('latest'), ).filter_by(report_type='box_import').group_by( QCReport.job_number ).order_by(func.max(QCReport.created_at).desc()).all() box_jobs = {} for job_number, count, latest in rows: if isinstance(latest, str): try: latest = datetime.fromisoformat(latest) except ValueError: latest = None box_jobs[job_number or 'No Job Number'] = { 'count': count, 'latest': latest } except Exception: box_jobs = {} return render_template( 'reporting/index.html', active_tab='reporting', box_jobs=box_jobs, report_folder_id=current_app.config.get('BOX_REPORT_FOLDER_ID') ) @reporting_bp.route('/history/delete/', methods=['DELETE']) def delete_box_reports(job_number): """Delete all saved Box reports for a job number.""" import os try: reports = QCReport.query.filter_by( job_number=job_number, report_type='box_import' ).all() for report in reports: if report.file_path and os.path.exists(report.file_path): os.remove(report.file_path) db.session.delete(report) db.session.commit() return jsonify({'success': True, 'deleted': len(reports)}) except Exception as e: db.session.rollback() logger.error(f"Error deleting box reports for {job_number}: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/history/') def history_dashboard(job_number): """View previously saved reports for a job number from the database (no Box re-fetch).""" try: reports = QCReport.query.filter_by(job_number=job_number).order_by( QCReport.created_at.desc() ).all() if not reports: return render_template( 'reporting/dashboard.html', job_number=job_number, error=f'No saved reports found for job {job_number}' ) # Parse saved HTML files parsed_reports = [] for report in reports: try: import os if os.path.exists(report.file_path): with open(report.file_path, 'r', encoding='utf-8') as f: html_content = f.read() parser = QCReportParser(html_content) parsed_data = parser.parse() parsed_data['source'] = report.report_type parsed_data['source_label'] = 'Box.com' if report.report_type == 'box_import' else 'QC Tool' parsed_data['filename'] = report.filename parsed_data['html_content'] = html_content parsed_data['score'] = report.score parsed_data['status'] = report.status parsed_data['created_at'] = report.created_at.isoformat() if report.created_at else None parsed_reports.append(parsed_data) else: logger.warning(f"Report file not found: {report.file_path}") except Exception as e: logger.error(f"Error parsing saved report {report.filename}: {e}") aggregated = aggregate_reports(parsed_reports) for report in parsed_reports: if 'checks' in report: report['friendly_checks'] = get_designer_friendly_checks(report) return render_template( 'reporting/dashboard.html', job_number=job_number, reports=parsed_reports, aggregated=aggregated, statistics={'total': len(parsed_reports)}, from_history=True ) except Exception as e: logger.error(f"Error loading history for {job_number}: {e}") return render_template( 'reporting/dashboard.html', job_number=job_number, error=f'Error loading reports: {str(e)}' ) @reporting_bp.route('/search', methods=['POST']) def search(): """ Search for reports by job number (synchronous). Returns JSON with consolidated reports from Box.com and database. """ try: data = request.get_json() job_number = data.get('job_number', '').strip() if not job_number: return jsonify({'error': 'Job number is required'}), 400 logger.info(f"Searching for job number: {job_number}") # Get Box client from app context box_client = current_app.get_box_client() # Create aggregator and get consolidated reports aggregator = ReportAggregator(box_client) reports = aggregator.get_consolidated_reports(job_number) if not reports: return jsonify({ 'job_number': job_number, 'reports': [], 'statistics': {'total': 0}, 'message': f'No reports found for job number: {job_number}' }) # Calculate statistics statistics = aggregator.get_report_statistics(reports) return jsonify({ 'job_number': job_number, 'reports': reports, 'statistics': statistics, 'count': len(reports) }) except Exception as e: logger.error(f"Error searching for reports: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/search/async', methods=['POST']) def search_async(): """ Start async search for reports by job number(s). Supports comma-separated job numbers for multi-job searches. Returns session_id for progress tracking. """ try: data = request.get_json() raw_input = data.get('job_number', '').strip() if not raw_input: return jsonify({'error': 'Job number is required'}), 400 # Parse comma-separated job numbers job_numbers = [j.strip() for j in raw_input.split(',') if j.strip()] if not job_numbers: return jsonify({'error': 'No valid job numbers provided'}), 400 session_id = str(uuid.uuid4()) is_multi = len(job_numbers) > 1 logger.info(f"Async search started: {job_numbers} (session: {session_id})") # Start background thread app = current_app._get_current_object() def run_search(): with app.app_context(): try: tracker = UnifiedProgressTracker(session_id) box_client = app.get_box_client() aggregator = ReportAggregator(box_client) if is_multi: result = aggregator.get_consolidated_reports_multi(job_numbers, tracker) else: result = aggregator.get_consolidated_reports_with_progress( job_numbers[0], tracker ) # Cache results cache_set(f"search_{session_id}", { 'result': result, 'job_numbers': job_numbers, 'is_multi': is_multi }) # Save Box reports to database for history _save_search_results(result, job_numbers, is_multi) tracker.complete("Search complete") except Exception as e: logger.error(f"Async search error: {e}", exc_info=True) tracker = UnifiedProgressTracker(session_id) tracker.fail(f"Search failed: {str(e)}") thread = threading.Thread(target=run_search) thread.daemon = True thread.start() return jsonify({ 'success': True, 'session_id': session_id, 'job_numbers': job_numbers, 'is_multi': is_multi, 'progress_url': f'/reporting/progress/{session_id}' }) except Exception as e: logger.error(f"Error starting async search: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/progress/') def progress_stream(session_id): """SSE stream for search progress tracking.""" try: tracker = UnifiedProgressTracker(session_id) return Response( stream_with_context(tracker.stream_progress()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no' } ) except Exception as e: logger.error(f"Progress stream error: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/api/progress/') def progress_poll(session_id): """Polling fallback for progress tracking.""" try: tracker = UnifiedProgressTracker(session_id) progress = tracker.get_progress() return jsonify(progress) except Exception as e: logger.error(f"Progress poll error: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/dashboard/') def dashboard(job_number): """ Render consolidated dashboard for a specific job number. Tries to load from cache (async search result) first, falls back to synchronous fetch. """ try: logger.info(f"Loading dashboard for job number: {job_number}") # Check for cached async result via session_id query param session_id = request.args.get('session_id') cached = None if session_id: cached = cache_get(f"search_{session_id}") if cached and not cached.get('is_multi'): result = cached['result'] parsed_reports = result['parsed_reports'] aggregated = result['aggregated'] statistics = result['statistics'] else: # Synchronous fallback box_client = current_app.get_box_client() aggregator = ReportAggregator(box_client) result = aggregator.get_consolidated_reports_with_progress(job_number) parsed_reports = result['parsed_reports'] aggregated = result['aggregated'] statistics = result['statistics'] if not parsed_reports: return render_template( 'reporting/dashboard.html', active_tab='reporting', job_number=job_number, error=f'No reports found for job number: {job_number}' ) # Generate designer-friendly check data for each report for report in parsed_reports: if 'checks' in report: report['friendly_checks'] = get_designer_friendly_checks(report) return render_template( 'reporting/dashboard.html', active_tab='reporting', job_number=job_number, reports=parsed_reports, aggregated=aggregated, statistics=statistics, report_count=len(parsed_reports) ) except Exception as e: logger.error(f"Error loading dashboard: {e}") return render_template( 'reporting/dashboard.html', active_tab='reporting', job_number=job_number, error=str(e) ) @reporting_bp.route('/dashboard/multi') def dashboard_multi(): """ Render multi-job dashboard. Requires session_id query parameter pointing to cached multi-job result. """ try: session_id = request.args.get('session_id') if not session_id: return render_template( 'reporting/dashboard_multi.html', active_tab='reporting', error='No session ID provided. Please search first.' ) cached = cache_get(f"search_{session_id}") if not cached or not cached.get('is_multi'): return render_template( 'reporting/dashboard_multi.html', active_tab='reporting', error='Search results expired or not found. Please search again.' ) result = cached['result'] job_numbers = cached['job_numbers'] # Generate friendly checks for each report for job_number, job_data in result['per_job'].items(): for report in job_data['parsed_reports']: if 'checks' in report: report['friendly_checks'] = get_designer_friendly_checks(report) return render_template( 'reporting/dashboard_multi.html', active_tab='reporting', result=result, job_numbers=job_numbers, session_id=session_id ) except Exception as e: logger.error(f"Error loading multi-job dashboard: {e}") return render_template( 'reporting/dashboard_multi.html', active_tab='reporting', error=str(e) ) @reporting_bp.route('/api/report/') def get_report(file_id): """Get parsed report data as JSON.""" try: box_client = current_app.get_box_client() content = box_client.download_file(file_id) html_content = content.decode('utf-8') parser = QCReportParser(html_content) parsed_data = parser.parse() return jsonify(parsed_data) except Exception as e: logger.error(f"Error getting report {file_id}: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/api/report//raw') def get_raw_report(file_id): """Get raw HTML report.""" try: box_client = current_app.get_box_client() content = box_client.download_file(file_id) return content, 200, {'Content-Type': 'text/html; charset=utf-8'} except Exception as e: logger.error(f"Error getting raw report {file_id}: {e}") return f'

Error

{str(e)}

', 500 @reporting_bp.route('/export/html/') def export_html(job_number): """Export combined report as HTML.""" try: logger.info(f"Exporting HTML for job number: {job_number}") box_client = current_app.get_box_client() reports = box_client.search_by_job_number(job_number) if not reports: return jsonify({'error': f'No reports found for job number: {job_number}'}), 404 parsed_reports = [] for report_info in reports: try: content = box_client.download_file(report_info['id']) html_content = content.decode('utf-8') parser = QCReportParser(html_content) parsed_data = parser.parse() parsed_data['html_content'] = html_content parsed_reports.append(parsed_data) except Exception as e: logger.error(f"Error parsing report for HTML export: {e}") aggregated = aggregate_reports(parsed_reports) html_string = render_template( 'reporting/pdf_export.html', job_number=job_number, reports=parsed_reports, aggregated=aggregated, generated_at=datetime.now().strftime('%Y-%m-%d %H:%M:%S') ) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'QC_Report_{job_number}_{timestamp}.html' return send_file( BytesIO(html_string.encode('utf-8')), mimetype='text/html', as_attachment=True, download_name=filename ) except Exception as e: logger.error(f"Error exporting HTML: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/export/html//errors') def export_html_errors_only(job_number): """Export error reports only as HTML.""" try: logger.info(f"Exporting error reports only for job number: {job_number}") box_client = current_app.get_box_client() reports = box_client.search_by_job_number(job_number) if not reports: return jsonify({'error': f'No reports found for job number: {job_number}'}), 404 parsed_reports = [] for report_info in reports: try: content = box_client.download_file(report_info['id']) html_content = content.decode('utf-8') parser = QCReportParser(html_content) parsed_data = parser.parse() parsed_data['html_content'] = html_content if parsed_data.get('summary', {}).get('error', 0) > 0: parsed_reports.append(parsed_data) except Exception as e: logger.error(f"Error parsing report for error export: {e}") if not parsed_reports: return jsonify({'error': f'No error reports found for job number: {job_number}'}), 404 aggregated = aggregate_reports(parsed_reports) html_string = render_template( 'reporting/pdf_export.html', job_number=job_number, reports=parsed_reports, aggregated=aggregated, generated_at=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), errors_only=True ) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'QC_Report_{job_number}_ERRORS_ONLY_{timestamp}.html' return send_file( BytesIO(html_string.encode('utf-8')), mimetype='text/html', as_attachment=True, download_name=filename ) except Exception as e: logger.error(f"Error exporting error reports: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/export/csv/') def export_csv(job_number): """ Export all checks as CSV for a job number. CSV columns: Job Number, Filename, Box Link, Check Name, Status, Issue Description, Action Required """ try: logger.info(f"Exporting CSV for job number: {job_number}") # Try cache first session_id = request.args.get('session_id') parsed_reports = _get_parsed_reports(job_number, session_id) if not parsed_reports: return jsonify({'error': f'No reports found for job number: {job_number}'}), 404 rows = _build_csv_rows(job_number, parsed_reports) output = _generate_csv(rows) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'QC_Report_{job_number}_{timestamp}.csv' return send_file( BytesIO(output.encode('utf-8-sig')), mimetype='text/csv', as_attachment=True, download_name=filename ) except Exception as e: logger.error(f"Error exporting CSV: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/export/csv//errors') def export_csv_errors(job_number): """Export only error/warning checks as CSV.""" try: logger.info(f"Exporting error CSV for job number: {job_number}") session_id = request.args.get('session_id') parsed_reports = _get_parsed_reports(job_number, session_id) if not parsed_reports: return jsonify({'error': f'No reports found for job number: {job_number}'}), 404 rows = _build_csv_rows(job_number, parsed_reports, errors_only=True) if not rows: return jsonify({'error': f'No errors found for job number: {job_number}'}), 404 output = _generate_csv(rows) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'QC_Report_{job_number}_ERRORS_{timestamp}.csv' return send_file( BytesIO(output.encode('utf-8-sig')), mimetype='text/csv', as_attachment=True, download_name=filename ) except Exception as e: logger.error(f"Error exporting error CSV: {e}") return jsonify({'error': str(e)}), 500 @reporting_bp.route('/export/csv/multi') def export_csv_multi(): """Export multi-job CSV from cached results.""" try: session_id = request.args.get('session_id') if not session_id: return jsonify({'error': 'Session ID required'}), 400 cached = cache_get(f"search_{session_id}") if not cached or not cached.get('is_multi'): return jsonify({'error': 'Multi-job results not found or expired'}), 404 result = cached['result'] all_rows = [] for job_number, job_data in result['per_job'].items(): rows = _build_csv_rows(job_number, job_data['parsed_reports']) all_rows.extend(rows) if not all_rows: return jsonify({'error': 'No data to export'}), 404 output = _generate_csv(all_rows) job_list = '_'.join(cached['job_numbers'][:3]) if len(cached['job_numbers']) > 3: job_list += f'_+{len(cached["job_numbers"]) - 3}' timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'QC_Report_Multi_{job_list}_{timestamp}.csv' return send_file( BytesIO(output.encode('utf-8-sig')), mimetype='text/csv', as_attachment=True, download_name=filename ) except Exception as e: logger.error(f"Error exporting multi-job CSV: {e}") return jsonify({'error': str(e)}), 500 # --- Helper functions --- def _get_parsed_reports(job_number, session_id=None): """Get parsed reports from cache or by fetching fresh.""" # Try cache if session_id: cached = cache_get(f"search_{session_id}") if cached and not cached.get('is_multi'): return cached['result']['parsed_reports'] # Fetch fresh box_client = current_app.get_box_client() aggregator = ReportAggregator(box_client) result = aggregator.get_consolidated_reports_with_progress(job_number) return result['parsed_reports'] def _build_csv_rows(job_number, parsed_reports, errors_only=False): """Build CSV rows from parsed reports.""" rows = [] for report in parsed_reports: filename = report.get('filename', report.get('box_name', 'Unknown')) box_id = report.get('box_id') box_link = f'https://app.box.com/file/{box_id}' if box_id else '' checks = report.get('checks', []) for check in checks: sanitized = sanitize_error_for_display(check) if errors_only and sanitized['status'] in ('passed', 'skipped'): continue rows.append({ 'Job Number': job_number, 'Filename': filename, 'Box Link': box_link, 'Check Name': sanitized['check_name_display'], 'Status': sanitized['status_text'], 'Issue Description': sanitized['error_summary'], 'Action Required': sanitized['action_required'] if sanitized['status'] in ('error', 'warning', 'failed') else '' }) return rows def _generate_csv(rows): """Generate CSV string from row dictionaries.""" if not rows: return '' output = io.StringIO() fieldnames = ['Job Number', 'Filename', 'Box Link', 'Check Name', 'Status', 'Issue Description', 'Action Required'] writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) return output.getvalue()