ford_qc/checks/extra_carousel_validation_check.py

234 lines
No EOL
8.3 KiB
Python

import os
import json
import logging
from typing import Dict, Any, List, Tuple
def run_check(config):
"""
Enhanced Extra Carousel Images Validation Check
Validates that carousel/extra images match their WERS codes and meet coverage requirements:
- BAU: 0-49% fail, 50-99% warning, 100% pass
- MEC: 0-99% fail, 100% pass
Expected config:
- working_dir (str): Directory where linkingrecord.json and images reside
- linkingrecord_filename (str): Name of the linking record file, default: linkingrecord.json
Returns:
- "passed" if coverage meets requirements
- "warning" for BAU packs with 50-99% coverage
- "failed" if coverage below requirements or missing features
- "error" for structural issues
"""
working_dir = config.get("working_dir", "working")
linkingrecord_filename = config.get("linkingrecord_filename", "linkingrecord.json")
linkingrecord_path = os.path.join(working_dir, linkingrecord_filename)
if not os.path.exists(linkingrecord_path):
return {
"status": "error",
"error_message": f"Linking record '{linkingrecord_filename}' not found in {working_dir}."
}
try:
with open(linkingrecord_path, 'r', encoding='utf-8') as f:
linkingrecord = json.load(f)
except json.JSONDecodeError as e:
return {
"status": "error",
"error_message": f"Invalid JSON in {linkingrecord_filename}: {str(e)}"
}
except Exception as e:
return {
"status": "error",
"error_message": f"Error reading {linkingrecord_filename}: {str(e)}"
}
if "items" not in linkingrecord or not isinstance(linkingrecord["items"], list):
return {
"status": "error",
"error_message": "Invalid linkingrecord.json structure: 'items' missing or not a list."
}
# Detect pack type (MEC vs BAU)
is_mec = any(
item.get("conditions", {}).get("experienceCondition") == "2d-background"
for item in linkingrecord["items"]
)
pack_type = "MEC" if is_mec else "BAU"
# Find all carousel/extra items
carousel_extra_items = []
for item_index, item in enumerate(linkingrecord["items"]):
conditions = item.get("conditions", {})
viewtype = conditions.get("viewtype")
imagetype = conditions.get("imagetype")
if viewtype == "carousel" and imagetype == "extra":
carousel_extra_items.append((item_index, item))
# If no carousel/extra items found, this pack doesn't use extra carousel images
if not carousel_extra_items:
return {
"status": "passed",
"details": {
"message": "No carousel/extra items found - validation not applicable.",
"pack_type": pack_type,
"validation_type": "not_applicable"
}
}
# Validate each carousel/extra item
record_results = []
all_features = []
all_matched_features = []
for item_index, item in carousel_extra_items:
records = item.get("records", [])
for record_index, record in enumerate(records):
result = validate_record_features(record, working_dir, item_index, record_index)
if result["status"] == "error":
return {
"status": "error",
"error_message": result["error_message"]
}
record_results.append(result)
all_features.extend(result["features"])
all_matched_features.extend(result["matched_features"])
# Calculate overall coverage
if not all_features:
return {
"status": "error",
"error_message": "No features found in any carousel/extra records. Features array cannot be empty."
}
total_features = len(all_features)
matched_count = len(all_matched_features)
overall_coverage = (matched_count / total_features) * 100
# Apply validation rules based on pack type
if pack_type == "MEC":
# MEC: Must have 100% coverage
if overall_coverage < 100:
status = "failed"
message = f"MEC pack requires 100% extra carousel coverage, found {overall_coverage:.1f}%"
else:
status = "passed"
message = f"MEC pack extra carousel validation passed - 100% coverage achieved"
else:
# BAU: 0-49% fail, 50-99% warning, 100% pass
if overall_coverage < 50:
status = "failed"
message = f"BAU pack extra carousel coverage too low: {overall_coverage:.1f}% (minimum 50% required)"
elif overall_coverage < 100:
status = "warning"
message = f"BAU pack extra carousel coverage at {overall_coverage:.1f}% - consider improving to 100%"
else:
status = "passed"
message = f"BAU pack extra carousel validation passed - 100% coverage achieved"
# Compile missing features summary
missing_features = []
for result in record_results:
missing_features.extend(result["missing_features"])
# Build comprehensive result
result = {
"status": status,
"details": {
"message": message,
"pack_type": pack_type,
"overall_coverage": round(overall_coverage, 1),
"total_features_across_all_records": total_features,
"matched_features_count": matched_count,
"validation_summary": f"{matched_count}/{total_features} features matched ({overall_coverage:.1f}% coverage)",
"record_results": record_results
}
}
# Add missing features summary if any
if missing_features:
result["details"]["missing_features_summary"] = sorted(list(set(missing_features)))
return result
def validate_record_features(record: Dict[str, Any], working_dir: str, item_index: int, record_index: int) -> Dict[str, Any]:
"""
Validate features in a single record against available image files.
Args:
record: Single record from carousel/extra item
working_dir: Directory containing image files
item_index: Index of the parent item
record_index: Index of this record within the item
Returns:
Dict with validation results for this record
"""
features = record.get("features", [])
# Check for empty or missing features
if not features:
return {
"status": "error",
"error_message": f"Empty or missing 'features' array in carousel/extra record (item {item_index}, record {record_index})"
}
# Get all image files in working directory
available_files = []
try:
for root, dirs, files in os.walk(working_dir):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png', '.avif')):
available_files.append(file)
except Exception as e:
return {
"status": "error",
"error_message": f"Error scanning for image files: {str(e)}"
}
# Check each feature for matching images using substring search
# Only match files containing 'extra' to avoid false matches with MEC/BAU backgrounds
matched_features = []
missing_features = []
for feature in features:
# Case-sensitive substring search in filenames containing 'extra'
matching_files = [f for f in available_files if feature in f and 'extra' in f.lower()]
if matching_files:
matched_features.append(feature)
else:
missing_features.append(feature)
# Calculate coverage for this record
coverage = (len(matched_features) / len(features)) * 100 if features else 0
return {
"status": "success",
"record_index": record_index,
"item_index": item_index,
"total_features": len(features),
"coverage": round(coverage, 1),
"features": features, # Include full features list for aggregation
"matched_features": matched_features,
"missing_features": missing_features
}
def check_feature_in_filename(feature: str, filename: str) -> bool:
"""
Simple case-sensitive substring search.
Args:
feature: WERS code to search for
filename: Filename to search in
Returns:
bool: True if feature appears anywhere in filename
"""
return feature in filename