hm_ai_qc_report_tool/modules/printer_check/routes.py
nickviljoen d036752d17 v2.2.0: Gemini video, batch grouping, thumbnails, speed, price fix, printer check
- Video QC: Switch to Google Gemini direct video analysis as default (OpenAI frame grid fallback)
- HM QC: Group reports by batch with collapsible sections, ZIP download per batch
- HM QC: Generate asset thumbnails (150px) displayed in report listings
- Speed: Remove artificial delays, add ThreadPoolExecutor(2) for parallel batch processing
- Price detection: Improved prompt with country context, detect all prices, increased text limit
- New Printer Check module: CSV-to-PDF cross-referencing ported from CrossMatch Rust app

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 13:56:07 +02:00

242 lines
8.6 KiB
Python

"""
Printer Check Module Routes.
Handles CSV upload, PDF ZIP upload, region/campaign selection,
processing, and XLSX export.
"""
import os
import json
import uuid
import shutil
import zipfile
import logging
from io import BytesIO
from flask import (
render_template, request, jsonify, send_file, current_app
)
from werkzeug.utils import secure_filename
from .blueprint import printer_check_bp
logger = logging.getLogger(__name__)
UPLOAD_BASE = 'uploads/printer_check'
REGIONS_CONFIG_PATH = os.path.join(
os.path.dirname(__file__), 'regions.json'
)
def _load_regions():
"""Load regions configuration."""
if os.path.exists(REGIONS_CONFIG_PATH):
with open(REGIONS_CONFIG_PATH, 'r') as f:
return json.load(f)
# Fallback defaults
return {
"EEU": {
"name": "Eastern Europe",
"countries": ["AL","BA","BG","CY","CZ","EE","GE","GR","HU","KZ",
"LT","LV","MK","ME","PL","RO","RS","SK","TR","UA","XK"],
"groups": [
{"id": "kz_ua", "label": "KZ, UA", "countries": ["KZ","UA"]},
{"id": "tr", "label": "TR", "countries": ["TR"]},
{"id": "rest", "label": "Rest",
"countries": ["AL","BA","BG","CY","CZ","EE","GE","GR","HU",
"LT","LV","MK","ME","PL","RO","RS","SK","XK"]}
]
},
"CEU": {
"name": "Central Europe",
"countries": ["DE","AT","CH","NL","SI"],
"groups": [
{"id": "arian", "label": "Arian (AT, CH, SI)", "countries": ["AT","CH","SI"]},
{"id": "kurten", "label": "Kurten (DE, NL)", "countries": ["DE","NL"]}
]
}
}
@printer_check_bp.route('/')
@printer_check_bp.route('/index')
def index():
"""Main Printer Check page."""
regions = _load_regions()
return render_template(
'printer_check/index.html',
active_tab='printer-check',
regions=regions
)
@printer_check_bp.route('/api/regions')
def api_regions():
"""Return regions config as JSON."""
return jsonify(_load_regions())
@printer_check_bp.route('/process', methods=['POST'])
def process():
"""
Process CSV + PDF ZIP for a given region and country selection.
Expects multipart form with:
- csv_file: CSV file
- pdf_zip: ZIP file containing PDF folder structure
- region_code: Selected region code (e.g., "EEU")
- selected_countries: JSON array of country codes
- selected_campaigns: JSON array of campaign tokens (optional)
"""
from .services.csv_parser import parse_csv
from .services.region_filter import (
filter_by_region, detect_campaigns, filter_by_campaign,
normalize_language_column, find_longest_common_string
)
from .services.folder_scanner import scan_folder, is_gen_file
from .services.matcher import match_rows
try:
# Validate inputs
csv_file = request.files.get('csv_file')
pdf_zip = request.files.get('pdf_zip')
region_code = request.form.get('region_code')
selected_countries_json = request.form.get('selected_countries', '[]')
selected_campaigns_json = request.form.get('selected_campaigns', '[]')
if not csv_file or not pdf_zip or not region_code:
return jsonify({'error': 'Missing required fields: csv_file, pdf_zip, region_code'}), 400
regions = _load_regions()
region = regions.get(region_code)
if not region:
return jsonify({'error': f'Unknown region: {region_code}'}), 400
selected_countries = json.loads(selected_countries_json)
selected_campaigns = json.loads(selected_campaigns_json)
if not selected_countries:
selected_countries = region['countries']
# Create session directory
session_id = str(uuid.uuid4())
session_dir = os.path.join(UPLOAD_BASE, session_id)
os.makedirs(session_dir, exist_ok=True)
# Save and parse CSV
csv_path = os.path.join(session_dir, secure_filename(csv_file.filename))
csv_file.save(csv_path)
headers, all_rows = parse_csv(csv_path)
# Extract PDF ZIP
pdf_dir = os.path.join(session_dir, 'pdfs')
os.makedirs(pdf_dir, exist_ok=True)
zip_path = os.path.join(session_dir, secure_filename(pdf_zip.filename))
pdf_zip.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(pdf_dir)
# Handle macOS __MACOSX artifacts
macosx_dir = os.path.join(pdf_dir, '__MACOSX')
if os.path.exists(macosx_dir):
shutil.rmtree(macosx_dir)
# If ZIP contains a single root folder, use that as the PDF root
pdf_root = pdf_dir
entries = [e for e in os.listdir(pdf_dir) if not e.startswith('.')]
if len(entries) == 1 and os.path.isdir(os.path.join(pdf_dir, entries[0])):
pdf_root = os.path.join(pdf_dir, entries[0])
# Filter rows by region
filtered_rows = filter_by_region(all_rows, selected_countries)
# Normalize language column
normalize_language_column(filtered_rows)
# Detect and filter campaigns
detected_campaigns = detect_campaigns(filtered_rows)
if selected_campaigns:
filtered_rows = filter_by_campaign(filtered_rows, selected_campaigns)
# Scan PDF folder
all_region_codes = list(regions.keys())
scan_result = scan_folder(pdf_root, region_code, selected_countries, all_region_codes)
# Match rows to PDFs
match_result = match_rows(filtered_rows, scan_result)
# Count stats
matched = match_result['statuses'].count('MATCHED')
missing = match_result['statuses'].count('MISSING')
total = len(match_result['statuses'])
# Check for GEN-related warnings
has_gen_rows = any(
is_gen_file(row[0]) for row in filtered_rows if row
)
missing_root_gen = has_gen_rows and not scan_result.get('root_gen_exists', False)
# Build response
result = {
'success': True,
'session_id': session_id,
'headers': headers,
'total_rows': total,
'matched': matched,
'missing': missing,
'extra_count': len(match_result['extra']),
'detected_campaigns': detected_campaigns,
'selected_campaigns': selected_campaigns,
'folder_layout': scan_result.get('layout', 'unknown'),
'pdfs_found': match_result.get('country_pdfs_total', 0) + len(match_result.get('referenced_gen_pdfs', set())),
'gen_total_in_folder': len(scan_result.get('gen_pdfs', set())),
'match_statuses': match_result['statuses'],
'missing_files': match_result.get('missing_info', []),
'extra_files': match_result.get('extra_info', []),
'misplaced_gen': scan_result.get('misplaced_gen', []),
'duplicate_gen': scan_result.get('duplicate_gen_files', []),
'misplaced_country_files': scan_result.get('misplaced_country_files', []),
'files_at_wrong_level': scan_result.get('files_at_wrong_level', []),
'missing_root_gen': missing_root_gen,
'warnings_count': (
len(scan_result.get('misplaced_gen', [])) +
len(scan_result.get('duplicate_gen_files', [])) +
len(scan_result.get('misplaced_country_files', [])) +
len(scan_result.get('files_at_wrong_level', [])) +
(1 if missing_root_gen else 0)
),
'filtered_rows': filtered_rows
}
return jsonify(result)
except Exception as e:
logger.error(f"Printer check processing error: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
@printer_check_bp.route('/export', methods=['POST'])
def export_xlsx():
"""Export filtered results to XLSX."""
from .services.xlsx_writer import write_xlsx
try:
data = request.get_json()
headers = data.get('headers', [])
rows = data.get('rows', [])
filename = data.get('filename', 'PrinterCheck_Export.xlsx')
if not headers or not rows:
return jsonify({'error': 'No data to export'}), 400
buffer = BytesIO()
write_xlsx(buffer, headers, rows)
buffer.seek(0)
return send_file(
buffer,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
logger.error(f"XLSX export error: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500