""" Printer Check Module Routes. Handles CSV upload, PDF ZIP upload, region/campaign selection, processing, and XLSX export. """ import os import json import uuid import shutil import zipfile import logging from io import BytesIO from flask import ( render_template, request, jsonify, send_file, current_app ) from werkzeug.utils import secure_filename from .blueprint import printer_check_bp logger = logging.getLogger(__name__) UPLOAD_BASE = 'uploads/printer_check' REGIONS_CONFIG_PATH = os.path.join( os.path.dirname(__file__), 'regions.json' ) def _load_regions(): """Load regions configuration.""" if os.path.exists(REGIONS_CONFIG_PATH): with open(REGIONS_CONFIG_PATH, 'r') as f: return json.load(f) # Fallback defaults return { "EEU": { "name": "Eastern Europe", "countries": ["AL","BA","BG","CY","CZ","EE","GE","GR","HU","KZ", "LT","LV","MK","ME","PL","RO","RS","SK","TR","UA","XK"], "groups": [ {"id": "kz_ua", "label": "KZ, UA", "countries": ["KZ","UA"]}, {"id": "tr", "label": "TR", "countries": ["TR"]}, {"id": "rest", "label": "Rest", "countries": ["AL","BA","BG","CY","CZ","EE","GE","GR","HU", "LT","LV","MK","ME","PL","RO","RS","SK","XK"]} ] }, "CEU": { "name": "Central Europe", "countries": ["DE","AT","CH","NL","SI"], "groups": [ {"id": "arian", "label": "Arian (AT, CH, SI)", "countries": ["AT","CH","SI"]}, {"id": "kurten", "label": "Kurten (DE, NL)", "countries": ["DE","NL"]} ] } } @printer_check_bp.route('/') @printer_check_bp.route('/index') def index(): """Main Printer Check page.""" regions = _load_regions() return render_template( 'printer_check/index.html', active_tab='printer-check', regions=regions ) @printer_check_bp.route('/api/regions') def api_regions(): """Return regions config as JSON.""" return jsonify(_load_regions()) @printer_check_bp.route('/process', methods=['POST']) def process(): """ Process CSV + PDF ZIP for a given region and country selection. Expects multipart form with: - csv_file: CSV file - pdf_zip: ZIP file containing PDF folder structure - region_code: Selected region code (e.g., "EEU") - selected_countries: JSON array of country codes - selected_campaigns: JSON array of campaign tokens (optional) """ from .services.csv_parser import parse_csv from .services.region_filter import ( filter_by_region, detect_campaigns, filter_by_campaign, normalize_language_column, find_longest_common_string ) from .services.folder_scanner import scan_folder, is_gen_file from .services.matcher import match_rows try: # Validate inputs csv_file = request.files.get('csv_file') pdf_zip = request.files.get('pdf_zip') region_code = request.form.get('region_code') selected_countries_json = request.form.get('selected_countries', '[]') selected_campaigns_json = request.form.get('selected_campaigns', '[]') if not csv_file or not pdf_zip or not region_code: return jsonify({'error': 'Missing required fields: csv_file, pdf_zip, region_code'}), 400 regions = _load_regions() region = regions.get(region_code) if not region: return jsonify({'error': f'Unknown region: {region_code}'}), 400 selected_countries = json.loads(selected_countries_json) selected_campaigns = json.loads(selected_campaigns_json) if not selected_countries: selected_countries = region['countries'] # Create session directory session_id = str(uuid.uuid4()) session_dir = os.path.join(UPLOAD_BASE, session_id) os.makedirs(session_dir, exist_ok=True) # Save and parse CSV csv_path = os.path.join(session_dir, secure_filename(csv_file.filename)) csv_file.save(csv_path) headers, all_rows = parse_csv(csv_path) # Extract PDF ZIP pdf_dir = os.path.join(session_dir, 'pdfs') os.makedirs(pdf_dir, exist_ok=True) zip_path = os.path.join(session_dir, secure_filename(pdf_zip.filename)) pdf_zip.save(zip_path) with zipfile.ZipFile(zip_path, 'r') as zf: zf.extractall(pdf_dir) # Handle macOS __MACOSX artifacts macosx_dir = os.path.join(pdf_dir, '__MACOSX') if os.path.exists(macosx_dir): shutil.rmtree(macosx_dir) # If ZIP contains a single root folder, use that as the PDF root pdf_root = pdf_dir entries = [e for e in os.listdir(pdf_dir) if not e.startswith('.')] if len(entries) == 1 and os.path.isdir(os.path.join(pdf_dir, entries[0])): pdf_root = os.path.join(pdf_dir, entries[0]) # Filter rows by region filtered_rows = filter_by_region(all_rows, selected_countries) # Normalize language column normalize_language_column(filtered_rows) # Detect and filter campaigns detected_campaigns = detect_campaigns(filtered_rows) if selected_campaigns: filtered_rows = filter_by_campaign(filtered_rows, selected_campaigns) # Scan PDF folder all_region_codes = list(regions.keys()) scan_result = scan_folder(pdf_root, region_code, selected_countries, all_region_codes) # Match rows to PDFs match_result = match_rows(filtered_rows, scan_result) # Count stats matched = match_result['statuses'].count('MATCHED') missing = match_result['statuses'].count('MISSING') total = len(match_result['statuses']) # Check for GEN-related warnings has_gen_rows = any( is_gen_file(row[0]) for row in filtered_rows if row ) missing_root_gen = has_gen_rows and not scan_result.get('root_gen_exists', False) # Build response result = { 'success': True, 'session_id': session_id, 'headers': headers, 'total_rows': total, 'matched': matched, 'missing': missing, 'extra_count': len(match_result['extra']), 'detected_campaigns': detected_campaigns, 'selected_campaigns': selected_campaigns, 'folder_layout': scan_result.get('layout', 'unknown'), 'pdfs_found': match_result.get('country_pdfs_total', 0) + len(match_result.get('referenced_gen_pdfs', set())), 'gen_total_in_folder': len(scan_result.get('gen_pdfs', set())), 'match_statuses': match_result['statuses'], 'missing_files': match_result.get('missing_info', []), 'extra_files': match_result.get('extra_info', []), 'misplaced_gen': scan_result.get('misplaced_gen', []), 'duplicate_gen': scan_result.get('duplicate_gen_files', []), 'misplaced_country_files': scan_result.get('misplaced_country_files', []), 'files_at_wrong_level': scan_result.get('files_at_wrong_level', []), 'missing_root_gen': missing_root_gen, 'warnings_count': ( len(scan_result.get('misplaced_gen', [])) + len(scan_result.get('duplicate_gen_files', [])) + len(scan_result.get('misplaced_country_files', [])) + len(scan_result.get('files_at_wrong_level', [])) + (1 if missing_root_gen else 0) ), 'filtered_rows': filtered_rows } return jsonify(result) except Exception as e: logger.error(f"Printer check processing error: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 @printer_check_bp.route('/export', methods=['POST']) def export_xlsx(): """Export filtered results to XLSX.""" from .services.xlsx_writer import write_xlsx try: data = request.get_json() headers = data.get('headers', []) rows = data.get('rows', []) filename = data.get('filename', 'PrinterCheck_Export.xlsx') if not headers or not rows: return jsonify({'error': 'No data to export'}), 400 buffer = BytesIO() write_xlsx(buffer, headers, rows) buffer.seek(0) return send_file( buffer, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=filename ) except Exception as e: logger.error(f"XLSX export error: {e}", exc_info=True) return jsonify({'error': str(e)}), 500