v2.2.0: Gemini video, batch grouping, thumbnails, speed, price fix, printer check

- Video QC: Switch to Google Gemini direct video analysis as default (OpenAI frame grid fallback)
- HM QC: Group reports by batch with collapsible sections, ZIP download per batch
- HM QC: Generate asset thumbnails (150px) displayed in report listings
- Speed: Remove artificial delays, add ThreadPoolExecutor(2) for parallel batch processing
- Price detection: Improved prompt with country context, detect all prices, increased text limit
- New Printer Check module: CSV-to-PDF cross-referencing ported from CrossMatch Rust app

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
nickviljoen 2026-04-16 13:56:07 +02:00
parent 472862329c
commit d036752d17
24 changed files with 2275 additions and 112 deletions

5
app.py
View file

@ -125,6 +125,11 @@ def create_app(config_class=app_config.Config):
app.register_blueprint(campaigns_bp)
logger.info("Campaigns blueprint registered at /campaigns")
# Printer Check
from modules.printer_check import printer_check_bp
app.register_blueprint(printer_check_bp)
logger.info("Printer Check blueprint registered at /printer-check")
@app.route('/')
def root():
"""Render reporting index at root."""

View file

@ -1,4 +1,5 @@
"""QC Report database model."""
import json
from datetime import datetime
from .database import db
@ -79,6 +80,64 @@ class QCReport(db.Model):
query = query.filter_by(report_type=report_type)
return query.order_by(QCReport.created_at.desc()).all()
@staticmethod
def get_by_batch_id(batch_id, report_type=None):
"""Get all reports for a specific batch_id (stored in metadata_json)."""
query = QCReport.query.filter(
QCReport.metadata_json.like(f'%"batch_id": "{batch_id}"%')
)
if report_type:
query = query.filter_by(report_type=report_type)
return query.order_by(QCReport.created_at.asc()).all()
@staticmethod
def get_recent_grouped(limit=100, report_type=None):
"""Get recent reports grouped by batch_id for display."""
reports = QCReport.get_recent(limit=limit, report_type=report_type)
batches = {}
individual = []
for report in reports:
batch_id = None
if report.metadata_json:
try:
meta = json.loads(report.metadata_json)
batch_id = meta.get('batch_id')
except (json.JSONDecodeError, TypeError):
pass
if batch_id:
if batch_id not in batches:
batches[batch_id] = {
'batch_id': batch_id,
'reports': [],
'created_at': report.created_at,
'total': 0,
'passed': 0,
'failed': 0,
'warnings': 0,
'avg_score': 0
}
batches[batch_id]['reports'].append(report)
batches[batch_id]['total'] += 1
if report.status == 'passed':
batches[batch_id]['passed'] += 1
elif report.status in ('failed', 'error'):
batches[batch_id]['failed'] += 1
elif report.status == 'warning':
batches[batch_id]['warnings'] += 1
else:
individual.append(report)
# Calculate average scores
for batch in batches.values():
scores = [r.score for r in batch['reports'] if r.score is not None]
batch['avg_score'] = round(sum(scores) / len(scores), 1) if scores else 0
# Sort batches by most recent first
sorted_batches = sorted(batches.values(), key=lambda b: b['created_at'], reverse=True)
return sorted_batches, individual
@staticmethod
def get_recent(limit=50, report_type=None):
"""

View file

@ -238,6 +238,128 @@ class LLMConfig:
f"Vision API call failed after {max_retries} attempts: {str(last_exception)}"
)
@classmethod
def call_video_api(
cls,
prompt: str,
video_path: str,
provider: str = 'google',
model: str = 'gemini-2.5-flash',
max_retries: int = 3,
log_usage: bool = True,
usage_context: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""
Unified video analysis API call. Currently only Google Gemini supports
direct video file analysis.
Args:
prompt: Text prompt for the LLM
video_path: Path to the video file on disk
provider: LLM provider name (only 'google' supported for video)
model: Model name to use
max_retries: Maximum number of retry attempts
log_usage: Whether to log usage to database
usage_context: Context for usage logging
Returns:
Dictionary with response text and metadata
"""
if provider != 'google':
raise ConfigurationError(
f"Direct video analysis is only supported with Google Gemini. "
f"Provider '{provider}' does not support video input."
)
cls.validate_configuration(provider)
last_exception = None
for attempt in range(max_retries):
try:
response = cls._call_google_video(prompt, video_path, model)
if log_usage:
try:
from core.models.usage_log import UsageLog
ctx = usage_context or {}
UsageLog.log_call(
provider=provider,
model=model,
tokens=response.get('tokens_used'),
user=ctx.get('user'),
module=ctx.get('module', 'video_qc'),
check_name=ctx.get('check_name'),
session_id=ctx.get('session_id'),
success=True
)
except Exception as log_err:
print(f"Warning: Failed to log usage: {log_err}")
return response
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = 2 ** attempt
time.sleep(wait_time)
raise Exception(
f"Video API call failed after {max_retries} attempts: {str(last_exception)}"
)
@classmethod
def _call_google_video(
cls,
prompt: str,
video_path: str,
model: str
) -> Dict[str, Any]:
"""Upload video to Google Gemini and analyze it directly."""
import google.generativeai as genai
api_key = os.getenv('GOOGLE_API_KEY')
genai.configure(api_key=api_key)
# Upload video file to Gemini
print(f"Uploading video to Gemini: {video_path}")
video_file = genai.upload_file(path=video_path)
# Wait for the file to be processed
import time as _time
while video_file.state.name == "PROCESSING":
print("Waiting for video processing...")
_time.sleep(2)
video_file = genai.get_file(video_file.name)
if video_file.state.name == "FAILED":
raise RuntimeError(f"Gemini video processing failed: {video_file.state.name}")
print(f"Video uploaded and ready: {video_file.uri}")
# Generate content with the video
gen_model = genai.GenerativeModel(model)
response = gen_model.generate_content([prompt, video_file])
# Clean up the uploaded file
try:
genai.delete_file(video_file.name)
except Exception:
pass # Best-effort cleanup
tokens_used = None
if hasattr(response, 'usage_metadata') and response.usage_metadata:
try:
tokens_used = response.usage_metadata.total_token_count
except Exception:
pass
return {
'text': response.text,
'model': model,
'provider': 'google',
'tokens_used': tokens_used
}
@classmethod
def _call_openai_vision(
cls,

View file

@ -7,6 +7,7 @@ and rate limiting between batches.
import os
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Any
from .executor import QCExecutor
from core.utils.progress_tracker import UnifiedProgressTracker
@ -15,8 +16,9 @@ logger = logging.getLogger(__name__)
# Default batch configuration
DEFAULT_BATCH_SIZE = 10
BATCH_COOLDOWN_SECONDS = 2
BATCH_COOLDOWN_SECONDS = 1
MAX_FILES = 100
MAX_CONCURRENT_FILES = 2
class BatchQCExecutor:
@ -34,7 +36,8 @@ class BatchQCExecutor:
profile: Dict[str, Any],
job_number: str = None,
batch_size: int = DEFAULT_BATCH_SIZE,
campaign_id: str = None
campaign_id: str = None,
batch_id: str = None
):
"""
Initialize batch executor.
@ -46,12 +49,14 @@ class BatchQCExecutor:
job_number: Optional job number for reporting
batch_size: Number of files per batch (default 10)
campaign_id: Optional campaign ID to load presentation guidelines
batch_id: Optional batch ID for grouping reports from the same upload
"""
self.session_id = session_id
self.file_paths = file_paths[:MAX_FILES]
self.profile = profile
self.job_number = job_number
self.campaign_id = campaign_id
self.batch_id = batch_id
self.batch_size = batch_size
self.progress = UnifiedProgressTracker(session_id)
self.results = []
@ -96,24 +101,30 @@ class BatchQCExecutor:
}
)
# Process each file in the batch
for file_path in batch:
file_result = self._process_single_file(file_path, completed_files, total_files)
self.results.append(file_result)
completed_files += 1
# Process files in the batch concurrently (up to MAX_CONCURRENT_FILES at a time)
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_FILES) as pool:
future_to_path = {
pool.submit(self._process_single_file, fp, completed_files + idx, total_files): fp
for idx, fp in enumerate(batch)
}
for future in as_completed(future_to_path):
file_path = future_to_path[future]
file_result = future.result()
self.results.append(file_result)
completed_files += 1
# Update progress per file
self.progress.update(
(completed_files / total_files) * 95,
f"Batch {batch_num}/{total_batches}: Processed {completed_files} of {total_files} files",
details={
'batch': batch_num,
'total_batches': total_batches,
'files_completed': completed_files,
'total_files': total_files,
'current_file': os.path.basename(file_path)
}
)
# Update progress per file
self.progress.update(
(completed_files / total_files) * 95,
f"Batch {batch_num}/{total_batches}: Processed {completed_files} of {total_files} files",
details={
'batch': batch_num,
'total_batches': total_batches,
'files_completed': completed_files,
'total_files': total_files,
'current_file': os.path.basename(file_path)
}
)
# Cooldown between batches (skip after last batch)
if batch_idx < total_batches - 1:
@ -175,7 +186,8 @@ class BatchQCExecutor:
file_path=file_path,
profile=self.profile,
job_number=self.job_number,
campaign_id=self.campaign_id
campaign_id=self.campaign_id,
batch_id=self.batch_id
)
result = executor.execute()

View file

@ -102,11 +102,13 @@ class PriceCurrencyCheck(BaseCheck):
# Build result
score = 100.0
issues = []
all_prices = price_info.get('all_prices', [])
details = {
'language': language,
'country_code': country_code,
'detected_currency': currency,
'detected_price': price_info.get('price_value'),
'all_prices_found': all_prices,
'confidence': price_info.get('confidence', 0),
**validation
}
@ -166,19 +168,34 @@ class PriceCurrencyCheck(BaseCheck):
def _detect_prices(self, file_path: str, context: Dict[str, Any]) -> dict:
"""Use LLM to detect prices and currency in the image."""
prompt = """Analyze this image for price and currency information.
filename_data = context.get('filename_data', {})
language = filename_data.get('language', '')
country_code = filename_data.get('country_code', '')
Extract any prices shown and identify the currency used.
Be flexible in recognizing different formats (e.g., "$100", "LE 699", "€20", "29,99 лв.").
Look for prices on product labels, overlays, banners, or any text in the image.
country_hint = ""
if language or country_code:
country_hint = f"""
CONTEXT: This image is for the market/language "{language or country_code}".
Use this context to help identify the correct currency and price format for this region.
"""
prompt = f"""Analyze this marketing image carefully for ALL price and currency information.
{country_hint}
INSTRUCTIONS:
1. Look at the ENTIRE image for ALL visible prices product prices, sale prices, original prices, promotional prices
2. Focus on the MAIN/PRIMARY price (usually the largest or most prominently displayed)
3. Do NOT confuse dates, phone numbers, product codes, or percentages with prices
4. Be flexible recognizing different formats (e.g., "$100", "LE 699", "€20", "29,99 лв.", "Kr 199", "99.90 zł")
5. If there are multiple prices, report the PRIMARY one but list all others in all_prices
Return ONLY valid JSON (no markdown fences) with:
- currency_found: 3-letter currency code (e.g., "BGN", "EUR", "USD") or "NOT_FOUND"
- currency_symbol: the actual symbol shown (e.g., "лв.", "", "$") or null
- price_value: detected numerical value as string or null
- currency_found: 3-letter ISO currency code (e.g., "BGN", "EUR", "USD", "PLN", "SEK") or "NOT_FOUND" if no price visible
- currency_symbol: the actual symbol/text shown (e.g., "лв.", "", "$", "", "Kr") or null
- price_value: the primary detected numerical value as string or null
- symbol_position: "before" or "after" the price, or null
- format_valid: boolean - is the price properly formatted
- confidence: confidence score 0-1
- format_valid: boolean - is the price properly formatted for the detected currency
- confidence: confidence score 0.0-1.0 for the primary price detection
- all_prices: array of all detected prices as strings (e.g., ["29,99 лв.", "39,99 лв."]) or empty array
"""
try:
response = LLMConfig.call_vision_api(
@ -280,17 +297,23 @@ Return ONLY valid JSON (no markdown fences) with:
currency = price_info.get('currency_found', '')
price_value = price_info.get('price_value', '')
prompt = f"""Compare the price detected in this asset against the campaign media plan / pricing sheet.
all_prices = price_info.get('all_prices', [])
all_prices_str = ', '.join(all_prices) if all_prices else price_value
Detected price: {price_value} {currency}
prompt = f"""Compare the price(s) detected in this asset against the campaign media plan / pricing sheet.
Detected primary price: {price_value} {currency}
All prices found in image: {all_prices_str}
Region/Language: {language}
Campaign pricing sheet:
{pricing_text[:6000]}
{pricing_text[:10000]}
Check if ANY of the detected prices match what the campaign sheet specifies for this region/language.
Return JSON with:
- price_matches_campaign: true/false
- expected_price: the expected price from the campaign sheet for this region (or null)
- matched_price: which detected price matched (or null)
- reason: brief explanation
"""
client = LLMConfig.get_client('openai', 'gpt-4o')

View file

@ -6,7 +6,6 @@ Supports context sharing between checks and parallel execution where possible.
"""
import os
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Any
@ -25,7 +24,7 @@ class QCExecutor:
"""
def __init__(self, session_id: str, file_path: str, profile: Dict[str, Any],
job_number: str = None, campaign_id: str = None):
job_number: str = None, campaign_id: str = None, batch_id: str = None):
"""
Initialize executor.
@ -35,12 +34,14 @@ class QCExecutor:
profile: Profile configuration
job_number: Optional job number for reporting
campaign_id: Optional campaign ID to load presentation guidelines
batch_id: Optional batch ID for grouping reports from the same upload
"""
self.session_id = session_id
self.file_path = file_path
self.profile = profile
self.job_number = job_number
self.campaign_id = campaign_id
self.batch_id = batch_id
self.context = {} # Shared context between checks
self.results = {} # Check results
@ -90,9 +91,6 @@ class QCExecutor:
logger.info(f"Check {check.name}: {result['status']} (score: {result['score']})")
# Small delay for demo purposes
time.sleep(0.5)
# Calculate overall score
self.progress.update(85, "Calculating overall score...")
overall_score = ScoringEngine.calculate_overall_score(self.results, self.profile)
@ -104,9 +102,13 @@ class QCExecutor:
self.progress.update(90, "Generating report...")
report_path = self._generate_report(overall_score, overall_status)
# Generate thumbnail
thumbnail_path = self._generate_thumbnail()
# Save to database
self.progress.update(95, "Saving to database...")
db_report = self._save_to_database(overall_score, overall_status, report_path)
db_report = self._save_to_database(overall_score, overall_status, report_path,
thumbnail_path=thumbnail_path)
# Complete
self.progress.complete(f"QC completed with score: {overall_score}")
@ -206,6 +208,50 @@ class QCExecutor:
except Exception as e:
logger.error(f"Failed to load campaign context: {e}")
def _generate_thumbnail(self) -> str:
"""Generate a thumbnail for the asset being checked."""
try:
from PIL import Image
ext = os.path.splitext(self.file_path)[1].lower()
pil_img = None
if ext in ('.jpg', '.jpeg', '.png'):
pil_img = Image.open(self.file_path)
elif ext == '.pdf':
try:
from pdf2image import convert_from_path
pages = convert_from_path(self.file_path, first_page=1, last_page=1, dpi=72)
if pages:
pil_img = pages[0]
except Exception:
pass
if pil_img is None:
return None
# Resize to 150px wide, maintaining aspect ratio
target_w = 150
ratio = target_w / pil_img.width
target_h = int(pil_img.height * ratio)
pil_img = pil_img.resize((target_w, target_h), Image.Resampling.LANCZOS)
if pil_img.mode not in ('RGB',):
pil_img = pil_img.convert('RGB')
thumb_dir = os.path.join('storage', 'thumbnails')
os.makedirs(thumb_dir, exist_ok=True)
thumb_filename = f"{self.session_id}.jpg"
thumb_path = os.path.join(thumb_dir, thumb_filename)
pil_img.save(thumb_path, 'JPEG', quality=80)
logger.info(f"Thumbnail generated: {thumb_path}")
return thumb_path
except Exception as e:
logger.warning(f"Thumbnail generation failed: {e}")
return None
def _create_checks(self) -> List[Any]:
"""
Create check instances from profile.
@ -360,7 +406,8 @@ class QCExecutor:
return html
def _save_to_database(self, overall_score: float, overall_status: str, report_path: str) -> QCReport:
def _save_to_database(self, overall_score: float, overall_status: str, report_path: str,
thumbnail_path: str = None) -> QCReport:
"""
Save report to database.
@ -368,6 +415,7 @@ class QCExecutor:
overall_score: Overall score
overall_status: Overall status
report_path: Path to report file
thumbnail_path: Optional path to thumbnail image
Returns:
QCReport instance
@ -377,7 +425,9 @@ class QCExecutor:
'profile': self.profile.get('name'),
'checks_run': len(self.results),
'session_id': self.session_id,
'campaign_id': self.campaign_id
'campaign_id': self.campaign_id,
'batch_id': self.batch_id,
'thumbnail_path': thumbnail_path
}
report = QCReport(

View file

@ -38,15 +38,18 @@ def allowed_file(filename):
@hm_qc_bp.route('/')
@hm_qc_bp.route('/index')
def index():
"""Main HM QC page with recent reports."""
"""Main HM QC page with recent reports grouped by batch."""
try:
recent_reports = QCReport.get_recent(limit=20, report_type='hm_qc')
batches, individual_reports = QCReport.get_recent_grouped(
limit=100, report_type='hm_qc'
)
except Exception:
recent_reports = []
batches, individual_reports = [], []
return render_template(
'hm_qc/index.html',
active_tab='hm-qc',
recent_reports=recent_reports
batches=batches,
individual_reports=individual_reports
)
@ -312,13 +315,15 @@ def execute_batch():
logger.info(f"Starting batch QC for {len(files)} files (session: {session_id})")
campaign_id = data.get('campaign_id')
batch_id = str(uuid.uuid4())
batch_executor = BatchQCExecutor(
session_id=session_id,
file_paths=file_paths,
profile=profile,
job_number=job_number,
campaign_id=campaign_id
campaign_id=campaign_id,
batch_id=batch_id
)
app = current_app._get_current_object()
@ -408,6 +413,29 @@ def delete_report(report_id):
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/thumbnail/<int:report_id>')
def thumbnail(report_id):
"""Serve a thumbnail image for a report."""
import json as json_module
try:
report = QCReport.query.get(report_id)
if not report or not report.metadata_json:
return '', 404
meta = json_module.loads(report.metadata_json)
thumb_path = meta.get('thumbnail_path')
if not thumb_path or not os.path.exists(thumb_path):
return '', 404
return send_file(
os.path.abspath(thumb_path),
mimetype='image/jpeg',
max_age=86400 # Cache for 24 hours
)
except Exception:
return '', 404
@hm_qc_bp.route('/report/<int:report_id>/download')
def download_report(report_id):
"""Download a QC report HTML file."""
@ -430,6 +458,36 @@ def download_report(report_id):
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/report/batch/<batch_id>/download')
def download_batch(batch_id):
"""Download all reports from a batch as a ZIP file."""
import zipfile
from io import BytesIO
try:
reports = QCReport.get_by_batch_id(batch_id, report_type='hm_qc')
if not reports:
return jsonify({'error': 'No reports found for this batch'}), 404
buffer = BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for report in reports:
if report.file_path and os.path.exists(report.file_path):
zf.write(report.file_path, os.path.basename(report.file_path))
buffer.seek(0)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
return send_file(
buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f'QC_Batch_{batch_id[:8]}_{timestamp}.zip'
)
except Exception as e:
logger.error(f"Error downloading batch {batch_id}: {e}")
return jsonify({'error': str(e)}), 500
@hm_qc_bp.route('/report/<int:report_id>')
def view_report(report_id):
"""View a saved QC report by database ID."""

View file

@ -65,15 +65,95 @@
</div>
</div>
<!-- Previous QC Reports -->
{% if recent_reports %}
<!-- Batch QC Reports -->
{% if batches %}
<div class="row mt-5">
<div class="col-12">
<h4><i class="bi bi-clipboard-check me-2"></i>Previous QC Reports</h4>
<h4><i class="bi bi-collection me-2"></i>Batch Reports</h4>
{% for batch in batches %}
<div class="card mb-3">
<div class="card-header d-flex justify-content-between align-items-center" role="button"
data-bs-toggle="collapse" data-bs-target="#batch-{{ batch.batch_id[:8] }}">
<div>
<i class="bi bi-folder2-open me-2"></i>
<strong>Batch {{ batch.created_at.strftime('%Y-%m-%d %H:%M') }}</strong>
<span class="text-muted ms-2">({{ batch.total }} files)</span>
<span class="badge bg-success ms-2">{{ batch.passed }} passed</span>
{% if batch.warnings %}<span class="badge bg-warning text-dark">{{ batch.warnings }} warnings</span>{% endif %}
{% if batch.failed %}<span class="badge bg-danger">{{ batch.failed }} failed</span>{% endif %}
<span class="ms-2 text-muted">Avg: <strong>{{ batch.avg_score }}</strong></span>
</div>
<div>
<a href="{{ url_for('hm_qc.download_batch', batch_id=batch.batch_id) }}"
class="btn btn-sm btn-outline-primary" title="Download all as ZIP" onclick="event.stopPropagation();">
<i class="bi bi-download me-1"></i>Download All
</a>
</div>
</div>
<div class="collapse" id="batch-{{ batch.batch_id[:8] }}">
<div class="card-body p-0">
<table class="table table-hover mb-0">
<thead class="table-light">
<tr>
<th style="width:50px"></th>
<th>Filename</th>
<th>Score</th>
<th>Status</th>
<th></th>
</tr>
</thead>
<tbody>
{% for report in batch.reports %}
<tr>
<td>
<img src="{{ url_for('hm_qc.thumbnail', report_id=report.id) }}"
alt="" style="width:40px;height:40px;object-fit:cover;border-radius:4px;"
onerror="this.style.display='none'">
</td>
<td>{{ report.filename }}</td>
<td>
{% if report.score is not none %}
<strong>{{ '%.0f' % report.score }}</strong>
{% else %}-{% endif %}
</td>
<td>
<span class="badge bg-{{ 'success' if report.status == 'passed' else 'warning' if report.status == 'warning' else 'danger' }}">
{{ report.status }}
</span>
</td>
<td class="text-nowrap">
<a href="{{ url_for('hm_qc.view_report', report_id=report.id) }}" class="btn btn-sm btn-outline-primary me-1" title="View">
<i class="bi bi-eye"></i>
</a>
<a href="{{ url_for('hm_qc.download_report', report_id=report.id) }}" class="btn btn-sm btn-outline-secondary me-1" title="Download">
<i class="bi bi-download"></i>
</a>
<button class="btn btn-sm btn-outline-danger delete-report-btn" data-id="{{ report.id }}" data-name="{{ report.filename }}" title="Delete">
<i class="bi bi-trash"></i>
</button>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
{% endfor %}
</div>
</div>
{% endif %}
<!-- Individual QC Reports -->
{% if individual_reports %}
<div class="row mt-4">
<div class="col-12">
<h4><i class="bi bi-clipboard-check me-2"></i>Individual Reports</h4>
<div class="table-responsive">
<table class="table table-hover">
<thead class="table-dark">
<tr>
<th style="width:50px"></th>
<th>Filename</th>
<th>Job #</th>
<th>Score</th>
@ -83,16 +163,19 @@
</tr>
</thead>
<tbody>
{% for report in recent_reports %}
{% for report in individual_reports %}
<tr>
<td>
<img src="{{ url_for('hm_qc.thumbnail', report_id=report.id) }}"
alt="" style="width:40px;height:40px;object-fit:cover;border-radius:4px;"
onerror="this.style.display='none'">
</td>
<td>{{ report.filename }}</td>
<td>{{ report.job_number or '-' }}</td>
<td>
{% if report.score is not none %}
<strong>{{ '%.0f' % report.score }}</strong>
{% else %}
-
{% endif %}
{% else %}-{% endif %}
</td>
<td>
<span class="badge bg-{{ 'success' if report.status == 'passed' else 'warning' if report.status == 'warning' else 'danger' }}">

View file

@ -0,0 +1,4 @@
"""Printer Check Module - CSV-to-PDF cross-referencing for print orders."""
from .blueprint import printer_check_bp
__all__ = ['printer_check_bp']

View file

@ -0,0 +1,18 @@
"""
Printer Check Module Blueprint.
Cross-references CSV order sheets against PDF folder structures
to identify matched, missing, and extra files per region/country.
"""
from flask import Blueprint
printer_check_bp = Blueprint(
'printer_check',
__name__,
template_folder='templates',
static_folder='static',
static_url_path='/printer-check/static',
url_prefix='/printer-check'
)
from . import routes

View file

@ -0,0 +1,98 @@
{
"EEU": {
"name": "Eastern Europe",
"countries": [
"AL",
"BA",
"BG",
"CY",
"CZ",
"EE",
"GE",
"GR",
"HU",
"KZ",
"LT",
"LV",
"MK",
"ME",
"PL",
"RO",
"RS",
"SK",
"TR",
"UA",
"XK"
],
"groups": [
{
"id": "kz_ua",
"label": "KZ, UA",
"countries": [
"KZ",
"UA"
]
},
{
"id": "tr",
"label": "TR",
"countries": [
"TR"
]
},
{
"id": "rest",
"label": "Rest",
"countries": [
"AL",
"BA",
"BG",
"CY",
"CZ",
"EE",
"GE",
"GR",
"HU",
"LT",
"LV",
"MK",
"ME",
"PL",
"RO",
"RS",
"SK",
"XK"
]
}
]
},
"CEU": {
"name": "Central Europe",
"countries": [
"DE",
"AT",
"CH",
"NL",
"SI"
],
"groups": [
{
"id": "arian",
"label": "Arian (AT, CH, SI)",
"countries": [
"AT",
"CH",
"SI"
]
},
{
"id": "kurten",
"label": "Kurten (DE, NL)",
"countries": [
"DE",
"NL"
]
}
]
}
}

View file

@ -0,0 +1,242 @@
"""
Printer Check Module Routes.
Handles CSV upload, PDF ZIP upload, region/campaign selection,
processing, and XLSX export.
"""
import os
import json
import uuid
import shutil
import zipfile
import logging
from io import BytesIO
from flask import (
render_template, request, jsonify, send_file, current_app
)
from werkzeug.utils import secure_filename
from .blueprint import printer_check_bp
logger = logging.getLogger(__name__)
UPLOAD_BASE = 'uploads/printer_check'
REGIONS_CONFIG_PATH = os.path.join(
os.path.dirname(__file__), 'regions.json'
)
def _load_regions():
"""Load regions configuration."""
if os.path.exists(REGIONS_CONFIG_PATH):
with open(REGIONS_CONFIG_PATH, 'r') as f:
return json.load(f)
# Fallback defaults
return {
"EEU": {
"name": "Eastern Europe",
"countries": ["AL","BA","BG","CY","CZ","EE","GE","GR","HU","KZ",
"LT","LV","MK","ME","PL","RO","RS","SK","TR","UA","XK"],
"groups": [
{"id": "kz_ua", "label": "KZ, UA", "countries": ["KZ","UA"]},
{"id": "tr", "label": "TR", "countries": ["TR"]},
{"id": "rest", "label": "Rest",
"countries": ["AL","BA","BG","CY","CZ","EE","GE","GR","HU",
"LT","LV","MK","ME","PL","RO","RS","SK","XK"]}
]
},
"CEU": {
"name": "Central Europe",
"countries": ["DE","AT","CH","NL","SI"],
"groups": [
{"id": "arian", "label": "Arian (AT, CH, SI)", "countries": ["AT","CH","SI"]},
{"id": "kurten", "label": "Kurten (DE, NL)", "countries": ["DE","NL"]}
]
}
}
@printer_check_bp.route('/')
@printer_check_bp.route('/index')
def index():
"""Main Printer Check page."""
regions = _load_regions()
return render_template(
'printer_check/index.html',
active_tab='printer-check',
regions=regions
)
@printer_check_bp.route('/api/regions')
def api_regions():
"""Return regions config as JSON."""
return jsonify(_load_regions())
@printer_check_bp.route('/process', methods=['POST'])
def process():
"""
Process CSV + PDF ZIP for a given region and country selection.
Expects multipart form with:
- csv_file: CSV file
- pdf_zip: ZIP file containing PDF folder structure
- region_code: Selected region code (e.g., "EEU")
- selected_countries: JSON array of country codes
- selected_campaigns: JSON array of campaign tokens (optional)
"""
from .services.csv_parser import parse_csv
from .services.region_filter import (
filter_by_region, detect_campaigns, filter_by_campaign,
normalize_language_column, find_longest_common_string
)
from .services.folder_scanner import scan_folder, is_gen_file
from .services.matcher import match_rows
try:
# Validate inputs
csv_file = request.files.get('csv_file')
pdf_zip = request.files.get('pdf_zip')
region_code = request.form.get('region_code')
selected_countries_json = request.form.get('selected_countries', '[]')
selected_campaigns_json = request.form.get('selected_campaigns', '[]')
if not csv_file or not pdf_zip or not region_code:
return jsonify({'error': 'Missing required fields: csv_file, pdf_zip, region_code'}), 400
regions = _load_regions()
region = regions.get(region_code)
if not region:
return jsonify({'error': f'Unknown region: {region_code}'}), 400
selected_countries = json.loads(selected_countries_json)
selected_campaigns = json.loads(selected_campaigns_json)
if not selected_countries:
selected_countries = region['countries']
# Create session directory
session_id = str(uuid.uuid4())
session_dir = os.path.join(UPLOAD_BASE, session_id)
os.makedirs(session_dir, exist_ok=True)
# Save and parse CSV
csv_path = os.path.join(session_dir, secure_filename(csv_file.filename))
csv_file.save(csv_path)
headers, all_rows = parse_csv(csv_path)
# Extract PDF ZIP
pdf_dir = os.path.join(session_dir, 'pdfs')
os.makedirs(pdf_dir, exist_ok=True)
zip_path = os.path.join(session_dir, secure_filename(pdf_zip.filename))
pdf_zip.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(pdf_dir)
# Handle macOS __MACOSX artifacts
macosx_dir = os.path.join(pdf_dir, '__MACOSX')
if os.path.exists(macosx_dir):
shutil.rmtree(macosx_dir)
# If ZIP contains a single root folder, use that as the PDF root
pdf_root = pdf_dir
entries = [e for e in os.listdir(pdf_dir) if not e.startswith('.')]
if len(entries) == 1 and os.path.isdir(os.path.join(pdf_dir, entries[0])):
pdf_root = os.path.join(pdf_dir, entries[0])
# Filter rows by region
filtered_rows = filter_by_region(all_rows, selected_countries)
# Normalize language column
normalize_language_column(filtered_rows)
# Detect and filter campaigns
detected_campaigns = detect_campaigns(filtered_rows)
if selected_campaigns:
filtered_rows = filter_by_campaign(filtered_rows, selected_campaigns)
# Scan PDF folder
all_region_codes = list(regions.keys())
scan_result = scan_folder(pdf_root, region_code, selected_countries, all_region_codes)
# Match rows to PDFs
match_result = match_rows(filtered_rows, scan_result)
# Count stats
matched = match_result['statuses'].count('MATCHED')
missing = match_result['statuses'].count('MISSING')
total = len(match_result['statuses'])
# Check for GEN-related warnings
has_gen_rows = any(
is_gen_file(row[0]) for row in filtered_rows if row
)
missing_root_gen = has_gen_rows and not scan_result.get('root_gen_exists', False)
# Build response
result = {
'success': True,
'session_id': session_id,
'headers': headers,
'total_rows': total,
'matched': matched,
'missing': missing,
'extra_count': len(match_result['extra']),
'detected_campaigns': detected_campaigns,
'selected_campaigns': selected_campaigns,
'folder_layout': scan_result.get('layout', 'unknown'),
'pdfs_found': match_result.get('country_pdfs_total', 0) + len(match_result.get('referenced_gen_pdfs', set())),
'gen_total_in_folder': len(scan_result.get('gen_pdfs', set())),
'match_statuses': match_result['statuses'],
'missing_files': match_result.get('missing_info', []),
'extra_files': match_result.get('extra_info', []),
'misplaced_gen': scan_result.get('misplaced_gen', []),
'duplicate_gen': scan_result.get('duplicate_gen_files', []),
'misplaced_country_files': scan_result.get('misplaced_country_files', []),
'files_at_wrong_level': scan_result.get('files_at_wrong_level', []),
'missing_root_gen': missing_root_gen,
'warnings_count': (
len(scan_result.get('misplaced_gen', [])) +
len(scan_result.get('duplicate_gen_files', [])) +
len(scan_result.get('misplaced_country_files', [])) +
len(scan_result.get('files_at_wrong_level', [])) +
(1 if missing_root_gen else 0)
),
'filtered_rows': filtered_rows
}
return jsonify(result)
except Exception as e:
logger.error(f"Printer check processing error: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
@printer_check_bp.route('/export', methods=['POST'])
def export_xlsx():
"""Export filtered results to XLSX."""
from .services.xlsx_writer import write_xlsx
try:
data = request.get_json()
headers = data.get('headers', [])
rows = data.get('rows', [])
filename = data.get('filename', 'PrinterCheck_Export.xlsx')
if not headers or not rows:
return jsonify({'error': 'No data to export'}), 400
buffer = BytesIO()
write_xlsx(buffer, headers, rows)
buffer.seek(0)
return send_file(
buffer,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
logger.error(f"XLSX export error: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500

View file

@ -0,0 +1,92 @@
"""CSV parser with automatic delimiter detection.
Port of the Rust CrossMatch parser.rs module. Parses CSV/TSV files
with auto-detected delimiters, trimming whitespace and skipping empty rows.
"""
import csv
import logging
from typing import List, Tuple
logger = logging.getLogger(__name__)
def detect_delimiter(first_line: str) -> str:
"""Detect whether a CSV file uses tabs or commas as the delimiter.
Counts occurrences of tabs vs commas in the first line and returns
whichever appears more frequently. Defaults to comma if equal.
Args:
first_line: The first line of the CSV file.
Returns:
'\\t' if tabs are more frequent, ',' otherwise.
"""
tab_count = first_line.count('\t')
comma_count = first_line.count(',')
delimiter = '\t' if tab_count > comma_count else ','
logger.debug(
"Delimiter detection: tabs=%d, commas=%d -> using %r",
tab_count, comma_count, delimiter
)
return delimiter
def parse_csv(file_path: str) -> Tuple[List[str], List[List[str]]]:
"""Parse a CSV file with auto-detected delimiter.
Reads the file, detects the delimiter from the first line, then
parses all rows using Python's csv.reader. Trims whitespace from
every cell and skips rows that are entirely empty after trimming.
Args:
file_path: Path to the CSV or TSV file.
Returns:
A tuple of (headers, rows) where headers is a list of column
names and rows is a list of lists of cell values.
Raises:
FileNotFoundError: If the file does not exist.
ValueError: If the file is empty or has no header row.
"""
logger.info("Parsing CSV file: %s", file_path)
with open(file_path, 'r', encoding='utf-8-sig') as f:
raw_lines = f.readlines()
if not raw_lines:
raise ValueError(f"CSV file is empty: {file_path}")
# Detect delimiter from the first line
delimiter = detect_delimiter(raw_lines[0])
# Re-parse with csv.reader for proper quoting support
reader = csv.reader(raw_lines, delimiter=delimiter)
headers: List[str] = []
rows: List[List[str]] = []
for i, row in enumerate(reader):
# Trim whitespace from every cell
trimmed = [cell.strip() for cell in row]
if i == 0:
headers = trimmed
continue
# Skip rows that are entirely empty
if all(cell == '' for cell in trimmed):
continue
rows.append(trimmed)
if not headers:
raise ValueError(f"No header row found in CSV file: {file_path}")
logger.info(
"Parsed %d data rows with %d columns from %s",
len(rows), len(headers), file_path
)
return headers, rows

View file

@ -0,0 +1,357 @@
"""Folder scanner for detecting PDF file layout and cataloguing files.
Port of the Rust CrossMatch scanner.rs module. Scans a folder structure
to detect multi-region or country-level layouts, catalogues PDFs by
country, identifies GEN files, and flags misplaced or duplicate files.
"""
import logging
import os
import re
from pathlib import Path
from typing import Dict, List, Optional, Set
logger = logging.getLogger(__name__)
# Pattern for locale suffix in filenames: _xx-CC.pdf
_LOCALE_SUFFIX_PATTERN = re.compile(r'_([a-z]{2})-([A-Z]{2})\.pdf$', re.IGNORECASE)
def is_gen_file(filename: str) -> bool:
"""Check whether a filename is a GEN (generic) file.
A file is considered GEN if its stem (without extension) ends with
'_gen' (case-insensitive).
Args:
filename: The filename to check (with or without extension).
Returns:
True if the file is a GEN file.
"""
stem = Path(filename).stem
return stem.lower().endswith('_gen')
def extract_country_from_filename(filename: str) -> Optional[str]:
"""Extract the country code from a filename's locale suffix.
Looks for the pattern _xx-CC.pdf at the end of the filename,
where xx is a 2-letter language code and CC is a 2-letter
country code.
Args:
filename: The filename to parse.
Returns:
The country code in uppercase (e.g. 'DE', 'SE') or None
if no locale suffix is found.
"""
match = _LOCALE_SUFFIX_PATTERN.search(filename)
if match:
return match.group(2).upper()
return None
def scan_folder(
root_path: str,
region_code: str,
region_countries: List[str],
all_region_codes: List[str]
) -> Dict:
"""Scan a folder tree of PDFs and catalogue files by country.
Detects the folder layout as one of:
- "multi-region": root contains subdirectories matching region codes
(e.g. EMEA/, APAC/), with country subfolders inside the target region
- "country-level": root directly contains country-code subdirectories
- "flat": no recognized subfolder structure
Within the detected structure, catalogues:
- Country PDFs (per-country sets of lowercase filenames)
- GEN PDFs (from a root-level or region-level GEN folder)
- Misplaced GEN files (GEN files found outside the GEN folder)
- Duplicate GEN files (same GEN file in multiple locations)
- Misplaced country files (files in the wrong country folder)
- Files at wrong level (PDFs found at root or region level, not in
a country/GEN subfolder)
Args:
root_path: Path to the root folder to scan.
region_code: The target region code (e.g. 'EMEA').
region_countries: List of country codes belonging to the target region.
all_region_codes: List of all known region codes for layout detection.
Returns:
Dictionary with keys:
layout (str): 'multi-region', 'country-level', or 'flat'
country_pdfs (dict): Mapping of country code -> set of lowercase PDF names
gen_pdfs (set): Set of lowercase GEN PDF filenames
root_gen_exists (bool): Whether a GEN folder was found
duplicate_gen_files (list): List of filenames found in multiple GEN locations
misplaced_gen (list): Dicts with 'filename' and 'location' for misplaced GEN files
misplaced_country_files (list): Dicts with 'filename', 'expected_country',
'found_in' for country-mismatched files
files_at_wrong_level (list): Dicts with 'filename' and 'location' for PDFs
found at root or region level
all_pdf_locations (dict): Mapping of lowercase filename -> list of locations
country_folder_labels (dict): Mapping of country code -> folder name as-is
"""
logger.info("Scanning folder: %s (region=%s)", root_path, region_code)
result = {
'layout': 'flat',
'country_pdfs': {},
'gen_pdfs': set(),
'root_gen_exists': False,
'duplicate_gen_files': [],
'misplaced_gen': [],
'misplaced_country_files': [],
'files_at_wrong_level': [],
'all_pdf_locations': {},
'country_folder_labels': {},
}
if not os.path.isdir(root_path):
logger.warning("Root path does not exist or is not a directory: %s", root_path)
return result
region_countries_upper = {c.upper() for c in region_countries}
all_region_codes_upper = {r.upper() for r in all_region_codes}
# List immediate subdirectories
try:
entries = os.listdir(root_path)
except OSError as e:
logger.error("Failed to list directory %s: %s", root_path, e)
return result
subdirs = [
e for e in entries
if os.path.isdir(os.path.join(root_path, e))
]
subdir_names_upper = {d.upper() for d in subdirs}
# Detect layout
if subdir_names_upper & all_region_codes_upper:
result['layout'] = 'multi-region'
logger.info("Detected multi-region layout")
_scan_multi_region(
root_path, subdirs, region_code, region_countries_upper, result
)
elif subdir_names_upper & region_countries_upper:
result['layout'] = 'country-level'
logger.info("Detected country-level layout")
_scan_country_level(root_path, subdirs, region_countries_upper, result)
else:
result['layout'] = 'flat'
logger.info("Detected flat layout (no recognized subfolders)")
# Scan root-level GEN folder (applies to all layouts)
_scan_gen_folder(root_path, result)
# Scan for PDFs at the root level (wrong level)
_scan_root_level_pdfs(root_path, result)
# Detect duplicate GEN files
_detect_duplicate_gen(result)
logger.info(
"Scan complete: layout=%s, %d countries, %d GEN files, "
"%d misplaced_gen, %d misplaced_country, %d wrong_level",
result['layout'],
len(result['country_pdfs']),
len(result['gen_pdfs']),
len(result['misplaced_gen']),
len(result['misplaced_country_files']),
len(result['files_at_wrong_level']),
)
return result
def _scan_multi_region(
root_path: str,
subdirs: List[str],
region_code: str,
region_countries_upper: Set[str],
result: Dict
) -> None:
"""Scan multi-region layout: find region subfolder, then scan country subfolders."""
region_upper = region_code.upper()
# Find the matching region subfolder
region_folder = None
for d in subdirs:
if d.upper() == region_upper:
region_folder = d
break
if not region_folder:
logger.warning("Region folder '%s' not found in %s", region_code, root_path)
return
region_path = os.path.join(root_path, region_folder)
# Scan for PDFs at the region level (wrong level)
_scan_level_pdfs(region_path, f"{region_folder}/", result)
# List country subfolders within the region
try:
region_entries = os.listdir(region_path)
except OSError as e:
logger.error("Failed to list region directory %s: %s", region_path, e)
return
for entry in region_entries:
entry_path = os.path.join(region_path, entry)
if not os.path.isdir(entry_path):
continue
entry_upper = entry.upper()
# GEN subfolder within region
if entry_upper == 'GEN':
_collect_gen_pdfs(entry_path, result)
continue
# Country subfolder
if entry_upper in region_countries_upper:
result['country_folder_labels'][entry_upper] = entry
_scan_country_folder(
entry_path, entry_upper,
f"{region_folder}/{entry}/", result
)
def _scan_country_level(
root_path: str,
subdirs: List[str],
region_countries_upper: Set[str],
result: Dict
) -> None:
"""Scan country-level layout: country subfolders directly at root."""
for d in subdirs:
d_upper = d.upper()
if d_upper == 'GEN':
continue # Handled separately by _scan_gen_folder
if d_upper in region_countries_upper:
result['country_folder_labels'][d_upper] = d
folder_path = os.path.join(root_path, d)
_scan_country_folder(folder_path, d_upper, f"{d}/", result)
def _scan_country_folder(
folder_path: str,
expected_country: str,
location_prefix: str,
result: Dict
) -> None:
"""Scan a single country folder for PDFs."""
if expected_country not in result['country_pdfs']:
result['country_pdfs'][expected_country] = set()
for dirpath, _dirnames, filenames in os.walk(folder_path):
for fn in filenames:
if not fn.lower().endswith('.pdf'):
continue
fn_lower = fn.lower()
rel_path = os.path.relpath(
os.path.join(dirpath, fn), os.path.dirname(folder_path)
)
location = location_prefix + os.path.relpath(dirpath, folder_path)
if location.endswith('.'):
location = location_prefix.rstrip('/')
# Track all PDF locations
result['all_pdf_locations'].setdefault(fn_lower, []).append(location)
# Check if this is a GEN file in a country folder (misplaced)
if is_gen_file(fn):
result['misplaced_gen'].append({
'filename': fn,
'location': location,
})
continue
# Check if file belongs to this country folder
file_country = extract_country_from_filename(fn)
if file_country and file_country != expected_country:
result['misplaced_country_files'].append({
'filename': fn,
'expected_country': file_country,
'found_in': expected_country,
})
result['country_pdfs'][expected_country].add(fn_lower)
def _scan_gen_folder(root_path: str, result: Dict) -> None:
"""Scan the root-level GEN folder for GEN PDFs."""
gen_path = os.path.join(root_path, 'GEN')
if not os.path.isdir(gen_path):
# Try case-insensitive match
for entry in os.listdir(root_path):
if entry.upper() == 'GEN' and os.path.isdir(os.path.join(root_path, entry)):
gen_path = os.path.join(root_path, entry)
break
else:
return
result['root_gen_exists'] = True
_collect_gen_pdfs(gen_path, result)
def _collect_gen_pdfs(gen_path: str, result: Dict) -> None:
"""Collect GEN PDFs from a GEN folder into the result."""
for dirpath, _dirnames, filenames in os.walk(gen_path):
for fn in filenames:
if fn.lower().endswith('.pdf'):
fn_lower = fn.lower()
result['gen_pdfs'].add(fn_lower)
location = f"GEN/{os.path.relpath(dirpath, gen_path)}"
if location.endswith('.'):
location = 'GEN'
result['all_pdf_locations'].setdefault(fn_lower, []).append(location)
def _scan_root_level_pdfs(root_path: str, result: Dict) -> None:
"""Flag any PDFs found directly at the root level (wrong level)."""
try:
for entry in os.listdir(root_path):
entry_path = os.path.join(root_path, entry)
if os.path.isfile(entry_path) and entry.lower().endswith('.pdf'):
result['files_at_wrong_level'].append({
'filename': entry,
'location': 'root',
})
result['all_pdf_locations'].setdefault(
entry.lower(), []
).append('root')
except OSError as e:
logger.error("Error scanning root for PDFs: %s", e)
def _scan_level_pdfs(folder_path: str, location_label: str, result: Dict) -> None:
"""Flag PDFs found directly at a given folder level (not in subfolders)."""
try:
for entry in os.listdir(folder_path):
entry_path = os.path.join(folder_path, entry)
if os.path.isfile(entry_path) and entry.lower().endswith('.pdf'):
result['files_at_wrong_level'].append({
'filename': entry,
'location': location_label.rstrip('/'),
})
result['all_pdf_locations'].setdefault(
entry.lower(), []
).append(location_label.rstrip('/'))
except OSError as e:
logger.error("Error scanning %s for PDFs: %s", folder_path, e)
def _detect_duplicate_gen(result: Dict) -> None:
"""Detect GEN files that appear in multiple locations."""
for fn_lower, locations in result['all_pdf_locations'].items():
if is_gen_file(fn_lower) and len(locations) > 1:
result['duplicate_gen_files'].append(fn_lower)

View file

@ -0,0 +1,161 @@
"""CSV-to-folder matcher for printer check cross-referencing.
Port of the Rust CrossMatch matcher.rs module. Matches CSV rows against
scanned PDF files to identify matched, missing, and extra files.
"""
import logging
from typing import Dict, List, Set
from .folder_scanner import is_gen_file
logger = logging.getLogger(__name__)
# Match status constants
STATUS_MATCHED = 'MATCHED'
STATUS_MISSING = 'MISSING'
def match_rows(
filtered_rows: List[List[str]],
scan_result: Dict,
filename_col: int = 0,
country_col: int = 7
) -> Dict:
"""Match CSV rows against scanned PDF folder contents.
For each row, determines whether the expected PDF file exists in the
scanned folder structure. GEN files are matched against the gen_pdfs
set; country files are matched against the corresponding country in
country_pdfs.
Also identifies extra files that exist in the folder but are not
referenced by any CSV row.
Args:
filtered_rows: List of row data from the CSV (already filtered).
scan_result: Dictionary returned by folder_scanner.scan_folder().
filename_col: Column index for the filename (default 0).
country_col: Column index for the country code (default 7).
Returns:
Dictionary with keys:
statuses (list): List of dicts with 'filename', 'country', 'status'
for each row.
missing (list): List of lowercase filenames that are missing.
missing_info (list): List of dicts with 'filename', 'country',
'expected_location' for missing files.
extra (list): List of lowercase filenames found in folders but
not in the CSV.
extra_info (list): List of dicts with 'filename', 'country',
'location' for extra files.
referenced_gen_pdfs (set): Set of lowercase GEN filenames that
were referenced by CSV rows.
country_pdfs_total (int): Total count of country PDFs across
all scanned countries.
"""
logger.info("Matching %d rows against scan result", len(filtered_rows))
country_pdfs: Dict[str, Set[str]] = scan_result.get('country_pdfs', {})
gen_pdfs: Set[str] = scan_result.get('gen_pdfs', set())
statuses: List[Dict] = []
missing: List[str] = []
missing_info: List[Dict] = []
referenced_gen_pdfs: Set[str] = set()
referenced_country_pdfs: Dict[str, Set[str]] = {}
for row in filtered_rows:
if len(row) <= max(filename_col, country_col):
continue
filename = row[filename_col].strip()
country = row[country_col].strip().upper()
fn_lower = filename.lower()
# Ensure filename has .pdf extension for matching
if not fn_lower.endswith('.pdf'):
fn_lower += '.pdf'
filename_display = filename + '.pdf'
else:
filename_display = filename
if is_gen_file(filename):
# GEN file: look in gen_pdfs
referenced_gen_pdfs.add(fn_lower)
if fn_lower in gen_pdfs:
statuses.append({
'filename': filename_display,
'country': 'GEN',
'status': STATUS_MATCHED,
})
else:
statuses.append({
'filename': filename_display,
'country': 'GEN',
'status': STATUS_MISSING,
})
missing.append(fn_lower)
missing_info.append({
'filename': filename_display,
'country': 'GEN',
'expected_location': 'GEN/',
})
else:
# Country file: look in country_pdfs[country]
referenced_country_pdfs.setdefault(country, set()).add(fn_lower)
country_files = country_pdfs.get(country, set())
if fn_lower in country_files:
statuses.append({
'filename': filename_display,
'country': country,
'status': STATUS_MATCHED,
})
else:
statuses.append({
'filename': filename_display,
'country': country,
'status': STATUS_MISSING,
})
missing.append(fn_lower)
missing_info.append({
'filename': filename_display,
'country': country,
'expected_location': f'{country}/',
})
# Find extra files: in country folders but not referenced by CSV
extra: List[str] = []
extra_info: List[Dict] = []
for country, pdf_set in country_pdfs.items():
referenced = referenced_country_pdfs.get(country, set())
for fn_lower in pdf_set:
if fn_lower not in referenced:
extra.append(fn_lower)
extra_info.append({
'filename': fn_lower,
'country': country,
'location': f'{country}/',
})
# Count total country PDFs
country_pdfs_total = sum(len(s) for s in country_pdfs.values())
matched_count = sum(1 for s in statuses if s['status'] == STATUS_MATCHED)
missing_count = len(missing)
logger.info(
"Match complete: %d matched, %d missing, %d extra, %d GEN referenced",
matched_count, missing_count, len(extra), len(referenced_gen_pdfs)
)
return {
'statuses': statuses,
'missing': missing,
'missing_info': missing_info,
'extra': extra,
'extra_info': extra_info,
'referenced_gen_pdfs': referenced_gen_pdfs,
'country_pdfs_total': country_pdfs_total,
}

View file

@ -0,0 +1,283 @@
"""Region-based filtering and campaign detection for printer check CSV data.
Port of the Rust CrossMatch filter.rs module. Provides functions for filtering
rows by region/country, detecting campaigns from filenames, normalizing
language columns, and finding common filename substrings.
"""
import logging
import re
from typing import List, Optional
logger = logging.getLogger(__name__)
# Column indices matching the expected CSV layout
FILENAME_COL_INDEX = 0
LANGUAGE_COL_INDEX = 6
COUNTRY_COL_INDEX = 7
CODE_COL_INDEX = 3
ALT_LANGUAGE_SOURCE_COL_INDEX = 10
# Substrings to ignore when computing the longest common string
_IGNORED_SUBSTRINGS = [
"100cm_cut_out_vinyl_black",
"100cm_cut_out_vinyl_red",
"50x70cm_Poster",
"62x80cm_Poster",
"70x100cm_Poster",
]
# Pattern matching product IDs like _XXXXX-XX (5+ digits, hyphen, 2+ digits)
_PRODUCT_ID_PATTERN = re.compile(r'_\d{5,}-\d{2,}')
# Campaign token pattern: 4 digits optionally followed by one uppercase letter
_CAMPAIGN_PATTERN = re.compile(r'_(\d{4}[A-Z]?)_')
def filter_by_region(
rows: List[List[str]],
countries: List[str]
) -> List[List[str]]:
"""Filter rows where country column matches the given countries and code is non-empty.
A row is included if:
- Column 7 (country) matches one of the given countries (case-insensitive)
- Column 3 (code) is non-empty after stripping whitespace
Args:
rows: List of row data (list of cell values).
countries: List of country codes to include.
Returns:
Filtered list of rows.
"""
countries_upper = {c.upper() for c in countries}
filtered = []
for row in rows:
if len(row) <= max(COUNTRY_COL_INDEX, CODE_COL_INDEX):
continue
country = row[COUNTRY_COL_INDEX].strip().upper()
code = row[CODE_COL_INDEX].strip()
if country in countries_upper and code:
filtered.append(row)
logger.info(
"Region filter: %d rows -> %d rows (countries: %s)",
len(rows), len(filtered), countries
)
return filtered
def extract_campaign_from_filename(filename: str) -> Optional[str]:
"""Extract a campaign token from a filename.
Scans for the pattern _DDDD_ or _DDDDA_ (4 digits optionally followed
by one uppercase letter, between underscores). Returns the token
without the surrounding underscores.
Args:
filename: The filename to scan.
Returns:
The campaign token (e.g. '1022B', '4116') or None if not found.
"""
match = _CAMPAIGN_PATTERN.search(filename)
if match:
return match.group(1)
return None
def detect_campaigns(rows: List[List[str]]) -> List[str]:
"""Extract unique campaign tokens from filenames, sorted ascending.
Reads column 0 (filename) of each row, extracts campaign tokens,
and returns a sorted deduplicated list.
Args:
rows: List of row data.
Returns:
Sorted list of unique campaign tokens.
"""
campaigns = set()
for row in rows:
if not row:
continue
token = extract_campaign_from_filename(row[FILENAME_COL_INDEX])
if token:
campaigns.add(token)
result = sorted(campaigns)
logger.info("Detected %d campaigns: %s", len(result), result)
return result
def filter_by_campaign(
rows: List[List[str]],
selected_campaigns: List[str]
) -> List[List[str]]:
"""Filter rows by selected campaign tokens.
Only includes rows whose filename contains one of the selected
campaign tokens.
Args:
rows: List of row data.
selected_campaigns: Campaign tokens to include.
Returns:
Filtered list of rows.
"""
campaign_set = set(selected_campaigns)
filtered = []
for row in rows:
if not row:
continue
token = extract_campaign_from_filename(row[FILENAME_COL_INDEX])
if token and token in campaign_set:
filtered.append(row)
logger.info(
"Campaign filter: %d rows -> %d rows (campaigns: %s)",
len(rows), len(filtered), selected_campaigns
)
return filtered
def normalize_language_column(rows: List[List[str]]) -> None:
"""Normalize language column values in-place based on filename and country rules.
Applies the following mutations:
- If filename ends with '_GEN.pdf' (case-insensitive), set language col to 'GEN'.
- For country KZ: inspect ALT_LANGUAGE_SOURCE column (10) to determine locale.
If it contains 'RU' but not 'KZ', force locale to 'ru-KZ'.
If it contains 'KZ', force locale to 'kk-KZ'.
Also rewrites the filename locale suffix.
- For country MK: if ALT col contains 'AL', force 'sq-MK'; else 'mk-MK'.
Also rewrites the filename locale suffix.
Args:
rows: List of row data, mutated in-place.
"""
locale_suffix_pattern = re.compile(r'_([a-z]{2}-[A-Z]{2})(\.pdf)$', re.IGNORECASE)
for row in rows:
if len(row) <= max(FILENAME_COL_INDEX, LANGUAGE_COL_INDEX, COUNTRY_COL_INDEX):
continue
filename = row[FILENAME_COL_INDEX].strip()
# Handle _GEN.pdf files
if filename.upper().endswith('_GEN.PDF'):
row[LANGUAGE_COL_INDEX] = 'GEN'
continue
country = row[COUNTRY_COL_INDEX].strip().upper()
alt_col = row[ALT_LANGUAGE_SOURCE_COL_INDEX].strip().upper() \
if len(row) > ALT_LANGUAGE_SOURCE_COL_INDEX else ''
new_locale: Optional[str] = None
# Special handling for Kazakhstan
if country == 'KZ':
if 'RU' in alt_col and 'KZ' not in alt_col:
new_locale = 'ru-KZ'
elif 'KZ' in alt_col:
new_locale = 'kk-KZ'
# Special handling for North Macedonia
elif country == 'MK':
if 'AL' in alt_col:
new_locale = 'sq-MK'
else:
new_locale = 'mk-MK'
# Apply locale rewrite
if new_locale:
row[LANGUAGE_COL_INDEX] = new_locale
# Rewrite the filename locale suffix
new_filename = locale_suffix_pattern.sub(
f'_{new_locale}\\2', filename
)
if new_filename != filename:
row[FILENAME_COL_INDEX] = new_filename
logger.debug(
"Rewrote filename locale: %s -> %s", filename, new_filename
)
logger.info("Normalized language column for %d rows", len(rows))
def find_longest_common_string(filenames: List[str]) -> str:
"""Find the longest common substring across all filenames.
Pre-processes filenames by:
1. Stripping .pdf extension
2. Stripping trailing _xx-XX locale suffix
3. Removing ignored substrings (poster sizes, vinyl types)
4. Removing _XXXXX-XX product ID patterns
Then finds the longest substring (minimum 3 characters) that appears
in ALL processed filenames. Skips candidates that are entirely
digits and underscores.
Args:
filenames: List of filenames to analyze.
Returns:
The longest common substring, or empty string if none found.
"""
if not filenames:
return ''
# Pre-process filenames
processed = []
locale_suffix = re.compile(r'_[a-z]{2}-[A-Z]{2}$')
for fn in filenames:
# Strip .pdf extension (case-insensitive)
if fn.lower().endswith('.pdf'):
fn = fn[:-4]
# Strip trailing locale suffix
fn = locale_suffix.sub('', fn)
# Remove ignored substrings
for ignored in _IGNORED_SUBSTRINGS:
fn = fn.replace(ignored, '')
# Remove product ID patterns
fn = _PRODUCT_ID_PATTERN.sub('', fn)
processed.append(fn)
if not processed:
return ''
# Use the shortest string as the basis for candidate substrings
shortest = min(processed, key=len)
best = ''
# Generate all substrings of the shortest processed filename
for start in range(len(shortest)):
for end in range(start + 3, len(shortest) + 1):
candidate = shortest[start:end]
# Skip candidates that are all digits and underscores
if all(c.isdigit() or c == '_' for c in candidate):
continue
# Only consider if longer than current best
if len(candidate) <= len(best):
continue
# Check if candidate appears in all processed filenames
if all(candidate in p for p in processed):
best = candidate
logger.debug("Longest common string across %d filenames: %r", len(filenames), best)
return best

View file

@ -0,0 +1,64 @@
"""XLSX writer for exporting filtered printer check data.
Port of the Rust CrossMatch writer.rs module. Writes filtered CSV data
to an Excel workbook using openpyxl with formatted headers and
auto-sized columns.
"""
import logging
from typing import List
from openpyxl import Workbook
from openpyxl.styles import Font
from openpyxl.utils import get_column_letter
logger = logging.getLogger(__name__)
def write_xlsx(
output_path: str,
headers: List[str],
filtered_rows: List[List[str]]
) -> None:
"""Write headers and rows to an XLSX file.
Creates a single sheet named "Filtered Data" with bold headers
and auto-sized columns based on header length.
Args:
output_path: File path for the output .xlsx file.
headers: List of column header strings.
filtered_rows: List of row data (list of cell values).
Raises:
OSError: If the file cannot be written.
"""
logger.info(
"Writing XLSX: %d rows, %d columns -> %s",
len(filtered_rows), len(headers), output_path
)
wb = Workbook()
ws = wb.active
ws.title = "Filtered Data"
bold_font = Font(bold=True)
# Write header row
for col_idx, header in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = bold_font
# Write data rows
for row_idx, row in enumerate(filtered_rows, start=2):
for col_idx, value in enumerate(row, start=1):
ws.cell(row=row_idx, column=col_idx, value=value)
# Auto-size columns based on header length
for col_idx, header in enumerate(headers, start=1):
col_letter = get_column_letter(col_idx)
# Use header length with some padding, minimum width of 8
width = max(len(header) + 4, 8)
ws.column_dimensions[col_letter].width = width
wb.save(output_path)
logger.info("XLSX written successfully: %s", output_path)

View file

@ -0,0 +1,362 @@
{% extends "base.html" %}
{% block title %}Printer Check{% endblock %}
{% block content %}
<div class="container-fluid mt-4">
<div class="row">
<!-- Left Panel: Configuration -->
<div class="col-md-4">
<div class="card">
<div class="card-header">
<h5 class="mb-0"><i class="bi bi-printer me-2"></i>Printer Check</h5>
</div>
<div class="card-body">
<form id="printerCheckForm">
<!-- Region Selection -->
<div class="mb-3">
<label for="regionSelect" class="form-label">Region</label>
<select class="form-select" id="regionSelect">
<option value="" disabled selected>Choose a region...</option>
{% for code, region in regions.items() %}
<option value="{{ code }}" data-countries='{{ region.countries | tojson }}'
data-groups='{{ region.groups | tojson if region.groups else "[]" }}'>
{{ region.name }} ({{ code }})
</option>
{% endfor %}
</select>
</div>
<!-- Country Groups -->
<div class="mb-3" id="groupsSection" style="display:none;">
<label class="form-label">Country Groups</label>
<div id="countryGroups"></div>
</div>
<!-- Campaign Filter -->
<div class="mb-3" id="campaignSection" style="display:none;">
<label class="form-label">Campaigns</label>
<div class="text-muted small mb-1" id="campaignInfo"></div>
<div id="campaignGroups"></div>
</div>
<!-- CSV Upload -->
<div class="mb-3">
<label class="form-label">CSV File</label>
<input type="file" class="form-control" id="csvFile" accept=".csv,.tsv,.txt">
</div>
<!-- PDF ZIP Upload -->
<div class="mb-3">
<label class="form-label">PDF Folder (ZIP)</label>
<input type="file" class="form-control" id="pdfZip" accept=".zip">
<div class="form-text">Upload a ZIP containing the PDF folder structure.</div>
</div>
<button type="submit" class="btn btn-hm-primary w-100" id="processBtn" disabled>
<i class="bi bi-play-circle me-2"></i>Process
</button>
</form>
</div>
</div>
</div>
<!-- Right Panel: Results -->
<div class="col-md-8">
<!-- Welcome State -->
<div id="welcomeState" class="card">
<div class="card-body text-center p-5">
<i class="bi bi-printer" style="font-size: 4rem; color: var(--hm-yellow);"></i>
<h3 class="mt-3">Ready to Scan</h3>
<p class="text-muted">Select a region, upload your CSV and PDF folder, then click Process.</p>
</div>
</div>
<!-- Loading State -->
<div id="loadingState" class="card" style="display:none;">
<div class="card-body text-center p-5">
<div class="spinner-border text-warning" role="status"></div>
<p class="mt-3">Processing files...</p>
</div>
</div>
<!-- Results State -->
<div id="resultsState" style="display:none;">
<!-- Stats Cards -->
<div class="row mb-3">
<div class="col-3">
<div class="card text-center stat-card" data-filter="all" role="button">
<div class="card-body py-2">
<h4 id="statTotal" class="mb-0">0</h4>
<small class="text-muted">All</small>
</div>
</div>
</div>
<div class="col-3">
<div class="card text-center stat-card" data-filter="matched" role="button">
<div class="card-body py-2">
<h4 id="statMatched" class="mb-0 text-success">0</h4>
<small class="text-muted">Matched</small>
</div>
</div>
</div>
<div class="col-3">
<div class="card text-center stat-card" data-filter="missing" role="button">
<div class="card-body py-2">
<h4 id="statMissing" class="mb-0 text-danger">0</h4>
<small class="text-muted">Missing</small>
</div>
</div>
</div>
<div class="col-3">
<div class="card text-center stat-card" data-filter="extra" role="button">
<div class="card-body py-2">
<h4 id="statExtra" class="mb-0 text-warning">0</h4>
<small class="text-muted">Extra</small>
</div>
</div>
</div>
</div>
<!-- Warnings -->
<div id="warningsSection" class="alert alert-warning" style="display:none;">
<h6><i class="bi bi-exclamation-triangle me-2"></i>Warnings</h6>
<ul id="warningsList" class="mb-0"></ul>
</div>
<!-- Results Table -->
<div class="card">
<div class="card-header d-flex justify-content-between align-items-center">
<span>Results</span>
<button class="btn btn-sm btn-outline-primary" id="exportBtn" style="display:none;">
<i class="bi bi-download me-1"></i>Export XLSX
</button>
</div>
<div class="card-body p-0">
<div class="table-responsive" style="max-height:500px;overflow-y:auto;">
<table class="table table-sm table-hover mb-0" id="resultsTable">
<thead class="table-dark sticky-top">
<tr id="resultsHead"></tr>
</thead>
<tbody id="resultsBody"></tbody>
</table>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block extra_scripts %}
<script>
const regionSelect = document.getElementById('regionSelect');
const groupsSection = document.getElementById('groupsSection');
const countryGroups = document.getElementById('countryGroups');
const campaignSection = document.getElementById('campaignSection');
const processBtn = document.getElementById('processBtn');
let lastResult = null;
let currentFilter = 'all';
// Region selection
regionSelect.addEventListener('change', function() {
const opt = this.options[this.selectedIndex];
const groups = JSON.parse(opt.dataset.groups || '[]');
countryGroups.innerHTML = '';
if (groups.length > 0) {
groups.forEach(g => {
const div = document.createElement('div');
div.className = 'form-check';
div.innerHTML = `<input class="form-check-input country-group-cb" type="checkbox" value='${JSON.stringify(g.countries)}' id="grp-${g.id}" checked>
<label class="form-check-label" for="grp-${g.id}">${g.label}</label>`;
countryGroups.appendChild(div);
});
groupsSection.style.display = '';
} else {
groupsSection.style.display = 'none';
}
updateProcessBtn();
});
function getSelectedCountries() {
const checked = document.querySelectorAll('.country-group-cb:checked');
if (checked.length === 0) {
const opt = regionSelect.options[regionSelect.selectedIndex];
return JSON.parse(opt.dataset.countries || '[]');
}
const countries = new Set();
checked.forEach(cb => JSON.parse(cb.value).forEach(c => countries.add(c)));
return [...countries];
}
function updateProcessBtn() {
const hasRegion = regionSelect.value;
const hasCsv = document.getElementById('csvFile').files.length > 0;
const hasZip = document.getElementById('pdfZip').files.length > 0;
processBtn.disabled = !(hasRegion && hasCsv && hasZip);
processBtn.textContent = hasRegion ? 'Process' : 'Select a region to start';
}
document.getElementById('csvFile').addEventListener('change', updateProcessBtn);
document.getElementById('pdfZip').addEventListener('change', updateProcessBtn);
// Process form
document.getElementById('printerCheckForm').addEventListener('submit', async function(e) {
e.preventDefault();
document.getElementById('welcomeState').style.display = 'none';
document.getElementById('resultsState').style.display = 'none';
document.getElementById('loadingState').style.display = '';
processBtn.disabled = true;
const formData = new FormData();
formData.append('csv_file', document.getElementById('csvFile').files[0]);
formData.append('pdf_zip', document.getElementById('pdfZip').files[0]);
formData.append('region_code', regionSelect.value);
formData.append('selected_countries', JSON.stringify(getSelectedCountries()));
const selCampaigns = [];
document.querySelectorAll('.campaign-cb:checked').forEach(cb => selCampaigns.push(cb.value));
formData.append('selected_campaigns', JSON.stringify(selCampaigns));
try {
const resp = await fetch(`${BASE_URL}/printer-check/process`, {
method: 'POST',
body: formData
});
const data = await resp.json();
if (!data.success) throw new Error(data.error || 'Processing failed');
lastResult = data;
renderResults(data);
} catch (err) {
alert('Error: ' + err.message);
document.getElementById('welcomeState').style.display = '';
} finally {
document.getElementById('loadingState').style.display = 'none';
processBtn.disabled = false;
}
});
function renderResults(data) {
document.getElementById('resultsState').style.display = '';
document.getElementById('statTotal').textContent = data.total_rows;
document.getElementById('statMatched').textContent = data.matched;
document.getElementById('statMissing').textContent = data.missing;
document.getElementById('statExtra').textContent = data.extra_count;
document.getElementById('exportBtn').style.display = '';
// Show campaigns if detected
if (data.detected_campaigns && data.detected_campaigns.length > 1) {
campaignSection.style.display = '';
const info = document.getElementById('campaignInfo');
info.textContent = `${data.detected_campaigns.length} campaigns detected`;
const grp = document.getElementById('campaignGroups');
grp.innerHTML = '';
data.detected_campaigns.forEach(c => {
const div = document.createElement('div');
div.className = 'form-check form-check-inline';
div.innerHTML = `<input class="form-check-input campaign-cb" type="checkbox" value="${c}" id="camp-${c}" checked>
<label class="form-check-label" for="camp-${c}">${c}</label>`;
grp.appendChild(div);
});
}
// Warnings
const warnings = [];
(data.misplaced_gen || []).forEach(w => warnings.push(`Misplaced GEN: ${w.filename} in ${w.found_in} (expected ${w.expected_in})`));
(data.duplicate_gen || []).forEach(w => warnings.push(`Duplicate GEN: ${w.filename} in ${w.locations.join(', ')}`));
(data.misplaced_country_files || []).forEach(w => warnings.push(`Wrong folder: ${w.filename} in ${w.found_in} (expected ${w.expected_in})`));
(data.files_at_wrong_level || []).forEach(w => warnings.push(`Wrong level: ${w.filename} in ${w.found_in} (expected ${w.expected_in})`));
if (data.missing_root_gen) warnings.push('Missing Root/GEN folder — GEN assets referenced in CSV but no GEN folder found');
const warningsSection = document.getElementById('warningsSection');
if (warnings.length > 0) {
warningsSection.style.display = '';
document.getElementById('warningsList').innerHTML = warnings.map(w => `<li>${w}</li>`).join('');
} else {
warningsSection.style.display = 'none';
}
// Table
renderTable(data, 'all');
// Stat card click filtering
document.querySelectorAll('.stat-card').forEach(card => {
card.addEventListener('click', function() {
currentFilter = this.dataset.filter;
document.querySelectorAll('.stat-card').forEach(c => c.classList.remove('border-primary'));
this.classList.add('border-primary');
renderTable(lastResult, currentFilter);
});
});
}
function renderTable(data, filter) {
const head = document.getElementById('resultsHead');
const body = document.getElementById('resultsBody');
if (filter === 'extra') {
head.innerHTML = '<th>Filename</th><th>Found In</th>';
body.innerHTML = (data.extra_files || []).map(f =>
`<tr><td>${f.filename}</td><td>${f.found_in}</td></tr>`
).join('');
return;
}
head.innerHTML = '<th>Status</th><th>Filename</th><th>Country</th>';
const rows = data.filtered_rows || [];
const statuses = data.match_statuses || [];
body.innerHTML = '';
rows.forEach((row, i) => {
const status = statuses[i] || 'UNKNOWN';
if (filter === 'matched' && status !== 'MATCHED') return;
if (filter === 'missing' && status !== 'MISSING') return;
const statusBadge = status === 'MATCHED'
? '<span class="badge bg-success">MATCHED</span>'
: '<span class="badge bg-danger">MISSING</span>';
body.innerHTML += `<tr>
<td>${statusBadge}</td>
<td>${row[0] || ''}</td>
<td>${row[7] || ''}</td>
</tr>`;
});
}
// Export
document.getElementById('exportBtn').addEventListener('click', async function() {
if (!lastResult) return;
try {
const resp = await fetch(`${BASE_URL}/printer-check/export`, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
headers: lastResult.headers,
rows: lastResult.filtered_rows,
filename: 'PrinterCheck_Export.xlsx'
})
});
if (!resp.ok) throw new Error('Export failed');
const blob = await resp.blob();
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'PrinterCheck_Export.xlsx';
a.click();
URL.revokeObjectURL(url);
} catch (err) {
alert('Export error: ' + err.message);
}
});
</script>
{% endblock %}

View file

@ -6,6 +6,7 @@ Consolidates reports from multiple sources:
- Database reports (HM QC reports generated in-platform)
"""
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import List, Dict, Any, Optional
from core.models.qc_report import QCReport
@ -64,20 +65,26 @@ class ReportAggregator:
_update(15, f"Found {len(consolidated_reports)} reports. Downloading...")
# Download and parse each report
parsed_reports = []
# Download and parse reports in parallel (up to 2 at a time)
parsed_reports = [None] * len(consolidated_reports)
total = len(consolidated_reports)
for i, report_info in enumerate(consolidated_reports):
progress_pct = 15 + ((i + 1) / total) * 75 # 15% to 90%
_update(
progress_pct,
f"Downloading/parsing report {i + 1} of {total}: {report_info['filename']}",
details={'current_file': report_info['filename'], 'file_index': i + 1, 'total_files': total}
)
parsed = self._download_and_parse_report(report_info)
parsed_reports.append(parsed)
with ThreadPoolExecutor(max_workers=2) as pool:
future_to_idx = {
pool.submit(self._download_and_parse_report, report_info): i
for i, report_info in enumerate(consolidated_reports)
}
completed = 0
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
parsed_reports[idx] = future.result()
completed += 1
progress_pct = 15 + (completed / total) * 75 # 15% to 90%
_update(
progress_pct,
f"Downloaded/parsed {completed} of {total} reports",
details={'files_completed': completed, 'total_files': total}
)
# Generate summary
_update(95, "Generating summary...")

View file

@ -23,7 +23,7 @@ class VideoQCExecutor:
"""Execute video QC checks with frame extraction and AI analysis."""
def __init__(self, session_id: str, file_path: str, job_number: str = None,
llm_provider: str = 'openai', llm_model: str = 'gpt-4o',
llm_provider: str = 'google', llm_model: str = 'gemini-2.5-flash',
user: str = None, campaign_id: str = None):
self.session_id = session_id
self.file_path = file_path
@ -36,6 +36,11 @@ class VideoQCExecutor:
self.results = {}
self.campaign_context = {}
@property
def _use_direct_video(self) -> bool:
"""Whether to use direct video analysis (Gemini) vs frame grid fallback."""
return self.llm_provider == 'google'
def execute(self) -> Dict[str, Any]:
"""Run the full video QC pipeline."""
try:
@ -45,19 +50,35 @@ class VideoQCExecutor:
if self.campaign_id:
self._load_campaign_context()
# Step 1: Extract frames (0-30%)
self.progress.update(5, "Extracting frames from video...")
frame_paths, duration = self._extract_frames()
grid_path = None
frame_paths = []
duration = 0
if not frame_paths:
self.progress.fail("Failed to extract frames from video")
return {'error': 'Frame extraction failed'}
if self._use_direct_video:
# Direct video analysis via Gemini — skip frame extraction
self.progress.update(5, "Preparing video for Gemini analysis...")
self.progress.update(20, f"Extracted {len(frame_paths)} frames. Creating grid...")
# Still need duration for report metadata
try:
from modules.video_qc.checks.legacy.video_parse import get_video_metadata
metadata = get_video_metadata(self.file_path)
duration = metadata['duration']
except Exception:
duration = 0
# Step 2: Create grid image (30-35%)
grid_path = self._create_grid(frame_paths, duration)
self.progress.update(35, "Frame grid created. Running AI checks...")
self.progress.update(15, "Uploading video to Gemini for direct analysis...")
else:
# Fallback: Extract frames and create grid (OpenAI path)
self.progress.update(5, "Extracting frames from video...")
frame_paths, duration = self._extract_frames()
if not frame_paths:
self.progress.fail("Failed to extract frames from video")
return {'error': 'Frame extraction failed'}
self.progress.update(20, f"Extracted {len(frame_paths)} frames. Creating grid...")
grid_path = self._create_grid(frame_paths, duration)
self.progress.update(35, "Frame grid created. Running AI checks...")
# Step 3: Visual Quality check (35-60%)
self.progress.update(40, "Running visual quality check (language & legibility)...")
@ -240,8 +261,8 @@ class VideoQCExecutor:
grid.save(grid_path, 'JPEG', quality=90)
return grid_path
def _run_visual_quality_check(self, grid_path: str) -> Dict[str, Any]:
"""Run visual quality AI check on the frame grid."""
def _run_visual_quality_check(self, grid_path: str = None) -> Dict[str, Any]:
"""Run visual quality AI check on the video (direct) or frame grid (fallback)."""
# Build campaign guidelines section if available
campaign_guidelines = ""
if self.campaign_context.get('parsed_content'):
@ -260,8 +281,13 @@ When evaluating, specifically check:
- Are fonts and typography consistent with the guidelines?
"""
if self._use_direct_video:
video_context = "Watch this video carefully from start to finish."
else:
video_context = "You are looking at a grid of frames extracted from a video (1 frame per second, labeled with timestamps)."
prompt = f"""You are a strict visual quality control inspector for H&M marketing video content.
You are looking at a grid of frames extracted from a video (1 frame per second, labeled with timestamps).
{video_context}
{campaign_guidelines}
EVALUATE THE FOLLOWING (in order of importance):
@ -312,18 +338,29 @@ Respond in JSON:
}"""
try:
response = LLMConfig.call_vision_api(
prompt=prompt,
image_asset=grid_path,
provider=self.llm_provider,
model=self.llm_model,
usage_context={
'module': 'video_qc',
'check_name': 'visual_quality',
'user': self.user,
'session_id': self.session_id
}
)
usage_context = {
'module': 'video_qc',
'check_name': 'visual_quality',
'user': self.user,
'session_id': self.session_id
}
if self._use_direct_video:
response = LLMConfig.call_video_api(
prompt=prompt,
video_path=self.file_path,
provider=self.llm_provider,
model=self.llm_model,
usage_context=usage_context
)
else:
response = LLMConfig.call_vision_api(
prompt=prompt,
image_asset=grid_path,
provider=self.llm_provider,
model=self.llm_model,
usage_context=usage_context
)
result_data = self._parse_response(response.get('text', ''))
score = result_data.get('score', 75.0)
@ -338,6 +375,7 @@ Respond in JSON:
'language_detected': result_data.get('language_detected'),
'language_consistent': result_data.get('language_consistent'),
'issues': result_data.get('issues', []),
'analysis_method': 'direct_video' if self._use_direct_video else 'frame_grid',
'llm_provider': self.llm_provider,
'llm_model': self.llm_model,
'tokens_used': response.get('tokens_used')
@ -357,10 +395,15 @@ Respond in JSON:
'weight': 50
}
def _run_censorship_check(self, grid_path: str) -> Dict[str, Any]:
"""Run censorship/body coverage AI check on the frame grid."""
prompt = """You are a content compliance inspector for H&M marketing video content.
You are looking at a grid of frames extracted from a video (1 frame per second).
def _run_censorship_check(self, grid_path: str = None) -> Dict[str, Any]:
"""Run censorship/body coverage AI check on the video (direct) or frame grid (fallback)."""
if self._use_direct_video:
video_context = "Watch this video carefully from start to finish."
else:
video_context = "You are looking at a grid of frames extracted from a video (1 frame per second)."
prompt = f"""You are a content compliance inspector for H&M marketing video content.
{video_context}
EVALUATE BODY COVERAGE AND CONTENT APPROPRIATENESS:
@ -395,18 +438,29 @@ Respond in JSON:
}"""
try:
response = LLMConfig.call_vision_api(
prompt=prompt,
image_asset=grid_path,
provider=self.llm_provider,
model=self.llm_model,
usage_context={
'module': 'video_qc',
'check_name': 'censorship',
'user': self.user,
'session_id': self.session_id
}
)
usage_context = {
'module': 'video_qc',
'check_name': 'censorship',
'user': self.user,
'session_id': self.session_id
}
if self._use_direct_video:
response = LLMConfig.call_video_api(
prompt=prompt,
video_path=self.file_path,
provider=self.llm_provider,
model=self.llm_model,
usage_context=usage_context
)
else:
response = LLMConfig.call_vision_api(
prompt=prompt,
image_asset=grid_path,
provider=self.llm_provider,
model=self.llm_model,
usage_context=usage_context
)
result_data = self._parse_response(response.get('text', ''))
score = result_data.get('score', 75.0)
@ -508,7 +562,7 @@ Respond in JSON:
<div class="header">
<h1>Video QC Report</h1>
<p><strong>File:</strong> {os.path.basename(self.file_path)}</p>
<p><strong>Duration:</strong> {duration:.1f}s | <strong>Frames Analyzed:</strong> {frame_count}</p>
<p><strong>Duration:</strong> {duration:.1f}s | <strong>Analysis:</strong> {'Direct Video (Gemini)' if self._use_direct_video else f'{frame_count} Frames Grid'}</p>
<p><strong>Job Number:</strong> {self.job_number or 'N/A'}</p>
<p><strong>Date:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<div class="score">{score:.1f}</div>

View file

@ -115,13 +115,13 @@ def execute():
data = request.get_json()
session_id = data.get('session_id')
job_number = data.get('job_number')
llm_provider = data.get('llm_provider', 'openai')
llm_provider = data.get('llm_provider', 'google')
if not session_id:
return jsonify({'error': 'Missing session_id'}), 400
provider_models = {'openai': 'gpt-4o', 'google': 'gemini-2.5-flash'}
llm_model = provider_models.get(llm_provider, 'gpt-4o')
llm_model = provider_models.get(llm_provider, 'gemini-2.5-flash')
upload_path = os.path.join(
current_app.config['VIDEO_QC_UPLOAD_PATH'], session_id

View file

@ -31,8 +31,8 @@
<div class="mb-3">
<label for="llmProvider" class="form-label">AI Provider</label>
<select class="form-select" id="llmProvider">
<option value="openai" selected>OpenAI GPT-4o</option>
<option value="google">Google Gemini</option>
<option value="google" selected>Google Gemini (Direct Video)</option>
<option value="openai">OpenAI GPT-4o (Frame Grid)</option>
</select>
</div>
@ -66,7 +66,7 @@
<div class="card-header"><i class="bi bi-camera-video me-2"></i>Video File</div>
<div class="card-body">
<p><i class="bi bi-file-earmark-play me-2"></i><strong>{{ filename }}</strong></p>
<small class="text-muted">Frames will be extracted at 1 per second and analyzed by AI</small>
<small class="text-muted">Gemini analyzes the video directly. OpenAI extracts frames at 1 per second.</small>
</div>
</div>
</div>

View file

@ -87,6 +87,15 @@
<span class="badge bg-warning text-dark ms-2 beta-badge">BETA</span>
</a>
</li>
<li class="nav-item" role="presentation">
<a class="nav-link {% if active_tab == 'printer-check' %}active{% endif %}"
href="{{ url_for('printer_check.index') }}"
data-tab="printer-check"
role="tab">
<i class="bi bi-printer me-2"></i>
Printer Check
</a>
</li>
<li class="nav-item" role="presentation">
<a class="nav-link {% if active_tab == 'campaigns' %}active{% endif %}"
href="{{ url_for('campaigns.index') }}"