initial commit

This commit is contained in:
michael 2025-09-30 10:37:12 -05:00
commit 9d33b62ff2
85 changed files with 3191 additions and 0 deletions

197
.gitignore vendored Normal file
View file

@ -0,0 +1,197 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
PIPFILE.lock
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# poetry
poetry.lock
# pdm
.pdm.toml
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# macOS
.DS_Store
.AppleDouble
.LSOverride
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
.project
.pydevproject
.settings/
# Project-specific
input_bucket/
/tmp/HM_working/
/opt/QC/reports/
# API Keys and Credentials
*.key
*.pem
credentials.json
config.json
secrets.json
# Working directories
tmp/
temp/
*.tmp
# Reports and outputs
*.html
reports/
# PDFs (likely input files for testing)
*.pdf
# Box SDK cache
.box_cache/

70
CLAUDE.md Normal file
View file

@ -0,0 +1,70 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
The H&M Quality Control (HMQC) system is a modular Python application designed to perform quality control checks on PDF files for H&M marketing assets. It uses a modular approach with different specialized check modules to validate assets against criteria like filename formatting, imprint verification, language validation, pricing, censorship requirements, and more.
## Key Components
- **Core Module**: `qc_module.py` - The main engine that loads and runs QC checks based on profiles.
- **Launchers**: Scripts in `/launchers` that execute the QC process, including CLI and Box hotfolder integration.
- **Check Modules**: Individual validation components in `/checks` that implement specific QC criteria.
- **Profiles**: JSON configuration files in `/profiles` that define which checks to run and their parameters.
- **HTML Reporting**: Generated reports showing check results for each processed file.
## Development Commands
### Running QC Checks
Run QC checks on a specific file:
```bash
python launchers/HM_launcher_CLI.py <path_to_input_file> <path_to_save_report_html>
```
### Box Integration
Run the Box hotfolder integration (polls for files and processes them):
```bash
python launchers/ford_qc_box_hotfolder_process.py
```
## Architecture Notes
### Check Module Pattern
All check modules must implement the standard `run_check(config, context, check_id)` function:
- `config`: Dict with the check's configuration parameters from the profile JSON.
- `context`: Shared context dictionary between checks containing results from previous checks.
- `check_id`: String identifier for the specific check being run.
Check modules should return a dictionary with at least a `status` key that can be:
- `passed`: Check succeeded
- `error`: Check failed with an error
- `skipped`: Check was intentionally skipped
### Context Sharing
Results from each check are stored in a shared context dictionary, allowing subsequent checks to build on prior results.
### API Dependencies
- **LlamaParse**: Used for PDF parsing and text extraction
- **DSPy**: Used for AI-based image analysis and content validation
- **BoxSDK**: Used for Box integration in the hotfolder processor
## Important Implementation Details
1. **API Keys**: The code contains hardcoded API keys for OpenAI and LlamaParse that should be properly managed.
2. **Working Directories**: Most checks operate on files in the `/tmp/HM_working` directory.
3. **Report Generation**: The HTML reporter creates reports in both the specified output directory and copies them to `/opt/QC/reports/`.
4. **Path Configuration**: Many paths are hardcoded, like `/opt/QC` which may need adjustment for different environments.
5. **Error Handling**: The system uses a standardized error reporting format in check results, which should be maintained.
6. **AI Integration**: Several checks use OpenAI's GPT models for complex validation tasks through the DSPy framework.

212
checks/HM_censorship.py Executable file
View file

@ -0,0 +1,212 @@
import base64
import glob
import os
import io
from datetime import datetime
from PIL import Image
import dspy
from dspy.teleprompt import MIPROv2
import logging
os.environ["OPENAI_API_KEY"] = "sk-proj-LaFeLI2v1p9TkGOIifAJT3BlbkFJk7SuBc0VkmmrRt5y9cQg"
logging.basicConfig(level=logging.WARN)
class ImageDescriptionSignature(dspy.Signature):
"""Generate a text description from an image."""
image: dspy.Image = dspy.InputField(desc="Base64-encoded image to describe.")
description: str = dspy.OutputField(desc="Text description of clothing coverage.")
class ImageDescription(dspy.Module):
"""Module for generating image descriptions."""
def __init__(self):
super().__init__()
self.describe = dspy.ChainOfThought(ImageDescriptionSignature)
def forward(self, image: str) -> dict:
return self.describe(image=image)
class ImageCensorshipDetectionSignature(dspy.Signature):
"""Determine if image is censored based on description."""
description: str = dspy.InputField(desc="Description of clothing coverage.")
is_censored: bool = dspy.OutputField(desc="True if censored (arms, legs, midriff covered).")
class ImageCensorshipDetection(dspy.Module):
"""Module for censorship detection."""
def __init__(self):
super().__init__()
self.classify = dspy.ChainOfThought(ImageCensorshipDetectionSignature)
def forward(self, description: str) -> dict:
return self.classify(description=description)
def resize_and_encode_image(image_input: str | Image.Image, max_size: int = 1024) -> str:
"""Process and encode image to base64. Accepts file path or PIL Image."""
try:
if isinstance(image_input, str):
# Handle file path input
with Image.open(image_input) as img:
img = img.copy()
else:
# Handle PIL Image input
img = image_input.copy()
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
if img.mode != 'RGB':
img = img.convert('RGB')
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85, optimize=True)
buffer.seek(0)
return f"data:image/jpeg;base64,{base64.b64encode(buffer.read()).decode('utf-8')}"
except Exception as e:
print(f"Error processing image: {e}")
return ""
def build_trainset_from_directory(images_dir: str, describer: ImageDescription):
"""Build training set from image directory."""
trainset = []
for img_path in glob.glob(os.path.join(images_dir, "*.png")):
filename = os.path.basename(img_path)
is_censored = "-C" in filename
image_data_uri = resize_and_encode_image(img_path)
if not image_data_uri:
continue
description = describer(image=image_data_uri).description
trainset.append(dspy.Example(
description=description,
is_censored=is_censored
).with_inputs("description"))
return trainset
def run_check(config: dict, context: dict, check_id: str):
"""Censorship check using image from context."""
# Get context data
hm_filename_data = context.get("HM_filename_parse", {})
hm_lang_data = context.get("HM_language_validate", {})
hm_parse_data = context.get("HM_parse", {})
# Get and normalize expected language
raw_language = hm_filename_data.get("parsed", {}).get("language", "")
expected_language = raw_language.lower().replace("_", "-")
censorship_required = hm_lang_data.get("isCensorshipRequired", False)
parsed_image = hm_parse_data.get("parsed_image")
filename = hm_parse_data.get("filename", "unknown_filename")
# Prepare base result
context_result = {
"expected_language": expected_language.upper(),
"censorship_required": censorship_required,
"check_performed": False,
"check_outcome": None,
"test_results": None,
"error": None,
"filename": filename
}
# Validate language data
if not expected_language:
context_result.update({
"error": "Missing language data",
"check_outcome": "invalid_context"
})
context[check_id] = context_result
return {
"status": "error",
"error_message": "Missing language context data",
"details": context_result
}
# Skip non-CEN/GEN checks (case-insensitive)
expected_language_lower = expected_language.lower()
if expected_language_lower not in ["cen", "gen"]:
context_result.update({
"check_outcome": "skipped",
"check_skipped_reason": f"Language {expected_language.upper()} doesn't require censorship checks"
})
context[check_id] = context_result
return {
"status": "skipped",
"details": context_result
}
try:
# Validate config inputs and context image
images_dir = config.get("images_dir")
if not images_dir or not os.path.isdir(images_dir):
raise ValueError(f"Invalid images directory: {images_dir}")
if not parsed_image:
raise ValueError("No parsed image found in HM_parse context")
# Configure DSPy
dspy.configure(lm=dspy.LM(model="openai/gpt-4o-mini"))
dspy.settings.experimental = True
# Train detector
describer = ImageDescription()
trainset = build_trainset_from_directory(images_dir, describer)
teleprompter = MIPROv2(
metric=lambda pred, ex, trace=None: pred.is_censored == ex.is_censored,
auto="light",
num_threads=20,
)
detector = teleprompter.compile(
student=ImageCensorshipDetection().deepcopy(),
trainset=trainset,
requires_permission_to_run=False,
)
# Analyze image from context
image_data_uri = resize_and_encode_image(parsed_image)
if not image_data_uri:
raise ValueError("Failed to process image from context")
description = describer(image=image_data_uri).description
inference = detector(description=description)
test_results = {
"is_censored": inference.is_censored,
"required_censorship": censorship_required,
"censorship_match": inference.is_censored == censorship_required,
"reasoning": inference.reasoning,
"test_image": filename
}
# Update context and determine status
context_result.update({
"check_performed": True,
"test_results": test_results,
"check_outcome": "passed" if test_results["censorship_match"] else "failed"
})
context[check_id] = context_result
if test_results["censorship_match"]:
return {
"status": "passed",
"details": context_result
}
else:
return {
"status": "error",
"error_message": "Censorship requirement mismatch",
"details": {
**context_result,
"expected_censored": censorship_required,
"actual_censored": test_results["is_censored"]
}
}
except Exception as e:
context_result.update({
"error": str(e),
"check_outcome": "errored"
})
context[check_id] = context_result
return {
"status": "error",
"error_message": f"Censorship check failed: {str(e)}",
"details": context_result
}

97
checks/HM_filename_parse.py Executable file
View file

@ -0,0 +1,97 @@
import os
import json
from checks.analyze_with_gpt import analyze_with_gpt # Imports the FUNCTION
import re
def run_check(config: dict, context: dict, check_id: str) -> dict:
"""
QC check that parses filename using GPT and shares results via context.
Now gets filename from HM_parse context instead of direct file access.
"""
# Get parsed filename from HM_parse context
hm_parse_data = context.get("HM_parse", {})
filename = hm_parse_data.get("filename")
if not filename:
return {
"status": "error",
"error_message": "Filename not found in HM_parse context. Ensure HM_parse check runs first."
}
# Remove extension from the filename we got from context
pattern = r'^(?:[^_]+_){5}'
base_name, ext = os.path.splitext(os.path.basename(filename))
short_name = re.sub(pattern, '', base_name)
# GPT prompt to parse the filename
prompt = f"""
Parse this H&M artwork filename: {short_name}
H&M filenames follow this format:
dimensions_format_year_reference-number_language-country.pdf
Example: 21.6x27.9cm_letter_2028_10062-01_en-us.pdf
In this case language = en-us
OR
dimensions_format_reference-number_(GEN|CEN).pdf
Example: 04_10.8x14cm_quarter_letter_1001D_10004-02_GEN.pdf
In this case language = GEN or CEN
Return only a JSON object with these exact keys:
dimensions, format, year, reference, language
For any component that can't be identified, use an empty string.
"""
try:
# Get GPT analysis
gpt_response = analyze_with_gpt(
prompt=prompt,
content="",
images=None,
expect_json=True
)
# Parse and store results in context
parsed = json.loads(gpt_response)
context[check_id] = {
"filename": filename,
"short_name": short_name,
"parsed": {
"dimensions": parsed.get("dimensions", ""),
"format": parsed.get("format", ""),
"year": parsed.get("year", ""),
"reference": parsed.get("reference", ""),
"language": parsed.get("language", "")
}
}
return {
"status": "passed",
"details": {
"message": "Filename parsed and stored in context",
"filename_source": "HM_parse context",
"gpt_response_summary": f"Parsed {len(parsed)} components",
"parsed": {
"dimensions": parsed.get("dimensions", ""),
"format": parsed.get("format", ""),
"year": parsed.get("year", ""),
"reference": parsed.get("reference", ""),
"language": parsed.get("language", "")
}
}
}
except json.JSONDecodeError as e:
return {
"status": "error",
"error_message": f"GPT returned invalid JSON: {str(e)}",
"raw_response": gpt_response[:200] + "..." if gpt_response else None
}
except Exception as e:
return {
"status": "error",
"error_message": f"Filename parsing failed: {str(e)}"
}

135
checks/HM_imprint_check.py Executable file
View file

@ -0,0 +1,135 @@
import json
from checks.analyze_with_gpt import analyze_with_gpt # Imports the FUNCTION
def run_check(config: dict, context: dict, check_id: str) -> dict:
"""
QC check that verifies imprint codes using context data and stores results in context.
"""
# Get required data from previous checks' context with validation
error_responses = []
# 1. Validate HM_parse data
hm_parse_data = context.get("HM_parse")
if not hm_parse_data or not isinstance(hm_parse_data, dict):
error_responses.append("HM_parse data missing or invalid")
else:
filename = hm_parse_data.get("filename")
text_content = hm_parse_data.get("extracted_text")
if not filename:
error_responses.append("HM_parse: filename missing")
if not text_content:
error_responses.append("HM_parse: extracted text missing")
# 2. Validate HM_filename_parse data
hm_filename_data = context.get("HM_filename_parse")
if not hm_filename_data or not isinstance(hm_filename_data, dict):
error_responses.append("HM_filename_parse data missing or invalid")
else:
parsed_data = hm_filename_data.get("parsed", {})
expected_reference = parsed_data.get("reference", "").strip()
if not expected_reference:
error_responses.append("HM_filename_parse: reference code missing")
if error_responses:
return {
"status": "error",
"error_message": "Missing/invalid context data from previous checks: " + ", ".join(error_responses),
"details": {
"missing_data": error_responses,
"available_context_keys": list(context.keys())
}
}
# Extract validated values
filename = hm_parse_data["filename"]
text_content = hm_parse_data["extracted_text"]
expected_reference = hm_filename_data["parsed"]["reference"].strip()
# Add this line to get the config parameter
skip_imprint_for_ooh = config.get("skip_imprint_for_ooh", True) # <-- THIS WAS MISSING
# Prepare base result structure for context
context_result = {
"filename": filename,
"expected_reference": expected_reference,
"detected_imprint": None,
"match_verified": False,
"skipped": False
}
# 1) Check for OOH skip condition
if skip_imprint_for_ooh and "ooh" in filename.lower():
context_result.update({
"skipped": True,
"skip_reason": "OOH file detected"
})
context[check_id] = context_result
return {
"status": "passed",
"details": {
"message": "Skipped OOH imprint check",
**context_result
}
}
try:
# 2) GPT call to detect imprint
detect_prompt = f"""
Document content:
{text_content}
Find the imprint or reference code in this document.
Return only the code as plain text.
"""
detected_imprint = (analyze_with_gpt(
prompt=detect_prompt,
content="",
images=None
)).strip()
# 3) Substring verification
is_verified = (
str(detected_imprint).strip().lower() in str(expected_reference).strip().lower()
or
str(expected_reference).strip().lower() in str(detected_imprint).strip().lower()
)
verify_response = is_verified
# Store full results in context
context_result.update({
"detected_imprint": detected_imprint,
"match_verified": is_verified,
"verification_response": verify_response
})
context[check_id] = context_result
# Return formatted response
if is_verified and verify_response == True:
return {
"status": "passed",
"details": {
"message": "Imprint reference verified",
**context_result
}
}
else:
error_msg = "Reference validation failed"
if verify_response != "false":
error_msg = f"Invalid verification response: '{verify_response}'"
elif not is_verified:
error_msg = "Reference mismatch or not found"
return {
"status": "error",
"error_message": error_msg,
"details": context_result
}
except Exception as e:
context[check_id] = context_result # Store partial results
return {
"status": "error",
"error_message": f"Imprint check failed: {str(e)}",
"details": context_result
}

179
checks/HM_language_validate.py Executable file
View file

@ -0,0 +1,179 @@
from checks.analyze_with_gpt import analyze_with_gpt
def run_check(config: dict, context: dict, check_id: str) -> dict:
"""
QC check that validates document language against filename code using
context-stored PIL images instead of base64 or filesystem paths.
"""
# --- Validate presence of required context data ---
hm_parse_data = context.get("HM_parse", {})
hm_filename_data = context.get("HM_filename_parse", {})
filename = hm_parse_data.get("filename", "")
text_content = hm_parse_data.get("extracted_text", "")
parsed_image = hm_parse_data.get("parsed_image") # Get PIL image
expected_language = hm_filename_data.get("parsed", {}).get("language", "").lower()
error_messages = []
if not filename:
error_messages.append("Filename missing from HM_parse context")
if not text_content.strip() and not parsed_image:
error_messages.append("Both text content and image data missing from HM_parse context")
if not expected_language:
error_messages.append("Language code missing from HM_filename_parse context")
if error_messages:
return {
"status": "error",
"error_message": "Missing critical context data: " + ", ".join(error_messages),
"details": {
"available_context": {
"HM_parse_keys": list(hm_parse_data.keys()),
"HM_filename_parse_keys": list(hm_filename_data.keys())
}
}
}
# --- Prepare context result structure ---
context_result = {
"filename": filename,
"expected_language": expected_language.upper(),
"detected_language": "UNKNOWN",
"matches": False,
"isCensorshipRequired": False,
"validation_method": None,
"gpt_response": None,
"image_used": False,
"image_format": None
}
# --- Handle special CEN/GEN cases first (case-insensitive) ---
expected_language_lower = expected_language.lower()
if "cen" in expected_language_lower:
context_result.update({
"detected_language": "CEN",
"matches": True,
"isCensorshipRequired": True,
"validation_method": "auto"
})
elif "gen" in expected_language_lower:
context_result.update({
"detected_language": "GEN",
"matches": True,
"isCensorshipRequired": False,
"validation_method": "auto"
})
else:
# --- Prepare multimodal analysis with PIL image ---
images = []
if parsed_image:
images = [parsed_image]
context_result.update({
"image_used": True,
"image_format": parsed_image.format.lower() if parsed_image.format else None
})
# --- GPT-based multimodal language detection ---
try:
prompt = f"""
Analyze both the text content (if present) and document image (if provided) to identify:
1. Primary language (ISO 639-1 code)
2. Regional variation if detectable (ISO 3166-1 alpha-2 country code)
Use format 'xx' or 'xx-YY' (e.g., 'en-EG' for Egyptian English).
Consider:
- Textual content (when available)
- Writing direction
- Typographic conventions
- Cultural references
- Visual layout characteristics
- Any visible text in the image
If uncertain, respond with primary language code only.
If completely unsure, respond 'UNKNOWN'.
Text Content (may be empty):
{text_content}
Respond ONLY with the language code or 'UNKNOWN'.
"""
detected_lang_raw = analyze_with_gpt(
prompt=prompt,
content=text_content,
images=images, # Passing PIL image directly
).strip()
# Handle empty/unexpected responses
if not detected_lang_raw:
raise ValueError("LLM returned an empty or invalid language code.")
# Normalize and validate response
detected_lang = detected_lang_raw.upper()
# Standardize separator for detected language
if "_" in detected_lang:
detected_lang = detected_lang.replace("_", "-")
if "-" in detected_lang:
parts = detected_lang.split("-")
if len(parts[0]) != 2 or len(parts[1]) != 2:
detected_lang = parts[0] # Fall back to primary language
# Standardize expected language to use hyphen
normalized_expected = expected_language.replace("_", "-")
# Compare to expected language (case-insensitive)
expected_primary = normalized_expected.split("-")[0].lower()
detected_primary = detected_lang.split("-")[0].lower()
expected_language_lower = normalized_expected.lower()
detected_lang_lower = detected_lang.lower()
# Check if directly equal (after normalization) or if primary languages match
matches = (
detected_lang_lower == expected_language_lower or
detected_primary == expected_primary
)
context_result.update({
"detected_language": detected_lang,
"matches": matches,
"validation_method": "Multimodal LLM analysis",
"gpt_response": detected_lang_raw
})
except Exception as e:
context_result.update({
"error": str(e),
"validation_method": "failed"
})
context[check_id] = context_result
return {
"status": "error",
"error_message": f"Language detection error: {str(e)}",
"details": context_result
}
# --- Final validation check (case-insensitive) ---
expected_language_upper = context_result["expected_language"].upper()
validation_passed = (
context_result["matches"] or
expected_language_upper in ["CEN", "GEN"]
)
if not validation_passed:
return {
"status": "error",
"error_message": (
f"Language mismatch: Expected {expected_language.upper()}, "
f"detected {context_result['detected_language']}"
),
"details": context_result
}
# --- Successful validation ---
context[check_id] = context_result
return {
"status": "passed",
"details": context_result
}

160
checks/HM_parse.py Executable file
View file

@ -0,0 +1,160 @@
import os
import shutil
import re
import logging
from PIL import Image # Added for PIL image handling
# Optional: If llama_parse does any async under the hood and you're in Jupyter
# or some environment with an existing event loop, nest_asyncio can help.
import nest_asyncio
nest_asyncio.apply()
from llama_parse import LlamaParse
os.environ['LLAMA_CLOUD_API_KEY'] = 'llx-BmHqsgAhrUWpNJDhl25POaxe0WvwwyiwHcRpACKbJch50Lu2'
def _parse_pdf(pdf_path: str, working_dir: str):
"""
Internal helper (synchronous):
1) Extract text from PDF.
2) Generate images for each page and store as PIL images.
3) Return PIL images and extracted text.
"""
# --- 1) TEXT PARSER (synchronous) ---
parser_text = LlamaParse(
result_type="text",
add_page_breaks=False,
parsing_instruction=(
"Extract all text from the PDF. Ignore extra metadata or slug information."
),
premium_mode=False,
)
documents = parser_text.load_data(pdf_path)
if not documents:
raise RuntimeError("No text found or PDF is empty")
extracted_text = "\n".join(doc.text for doc in documents)
text_file_path = os.path.join(working_dir, "extracted_text.txt")
with open(text_file_path, "w", encoding="utf-8") as f:
f.write(extracted_text)
# --- 2) IMAGE GENERATION AND PIL HANDLING ---
parser_multimodal = LlamaParse(
result_type="markdown",
add_page_breaks=False,
parsing_instruction=(
"Generate page images of the PDF document, no special text instructions."
),
use_vendor_multimodal_model=True,
vendor_multimodal_model_name="openai-gpt4o",
premium_mode=False,
)
md_json_objs = parser_multimodal.get_json_result(pdf_path)
image_dicts = parser_multimodal.get_images(md_json_objs, download_path=working_dir)
image_paths = [img_info["path"] for img_info in image_dicts]
pil_images = []
if image_paths:
# Rename first image
original_first_image_path = image_paths[0]
renamed_image_path = os.path.join(working_dir, "parsed_test_image.jpg")
os.rename(original_first_image_path, renamed_image_path)
image_paths[0] = renamed_image_path
# Load images as PIL objects
for img_path in image_paths:
try:
with Image.open(img_path) as img:
pil_images.append(img.copy()) # Copy the image to avoid file handle issues
except Exception as e:
logging.error(f"Error loading image {img_path}: {e}")
continue
return {
"text_file": text_file_path,
"images": pil_images,
"extracted_text": extracted_text
}
def run_check(config: dict, context: dict, check_id: str):
"""
QC check that handles PDF parsing with PIL image storage
"""
input_file = config.get("input_file")
working_dir = config.get("working_dir", "working")
if not input_file or not os.path.isfile(input_file):
return {
"status": "error",
"error_message": f"PDF file '{input_file}' not provided or does not exist."
}
try:
if os.path.exists(working_dir):
for item in os.listdir(working_dir):
item_path = os.path.join(working_dir, item)
if os.path.isfile(item_path) or os.path.islink(item_path):
os.remove(item_path)
else:
shutil.rmtree(item_path)
else:
os.makedirs(working_dir)
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to prepare working directory '{working_dir}': {e}"
}
if not re.search(r"\.pdf$", input_file, re.IGNORECASE):
return {
"status": "error",
"error_message": f"Input file '{input_file}' does not appear to be a PDF."
}
try:
filename = os.path.basename(input_file)
context[check_id] = {
"filename": filename,
"input_file_path": input_file
}
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to write input filename to context: {e}"
}
try:
parse_results = _parse_pdf(input_file, working_dir)
context[check_id].update({
"extracted_text": parse_results["extracted_text"],
"parsed_image": parse_results["images"][0] if parse_results["images"] else None,
"all_images": parse_results["images"],
"text_file_path": parse_results["text_file"]
})
return {
"status": "passed",
"details": {
"message": "PDF parsed successfully.",
"working_dir": working_dir,
"text_file": parse_results["text_file"],
"image_count": len(parse_results["images"])
}
}
except Exception as e:
context[check_id].update({
"error": str(e),
"partial_extraction": True
})
return {
"status": "error",
"error_message": f"Failed to parse PDF '{input_file}': {e}"
}

183
checks/HM_price_currency_check.py Executable file
View file

@ -0,0 +1,183 @@
import json
from checks.analyze_with_gpt import analyze_with_gpt
def run_check(config: dict, context: dict, check_id: str) -> dict:
"""
QC check that validates currency/price using context-stored PIL images,
aligning image handling with how it's done in the language check.
"""
# Get required context data
hm_parse_data = context.get("HM_parse", {})
hm_filename_data = context.get("HM_filename_parse", {})
hm_lang_data = context.get("HM_language_validate", {})
text_content = hm_parse_data.get("extracted_text", "")
parsed_image = hm_parse_data.get("parsed_image") # PIL image
# Get and normalize expected language
raw_language = hm_filename_data.get("parsed", {}).get("language", "")
expected_language = raw_language.lower().replace("_", "-")
lang_matches = hm_lang_data.get("matches", False)
censored = hm_lang_data.get("isCensorshipRequired", False)
# Validate inputs from context
error_messages = []
if not text_content and not parsed_image:
error_messages.append("Both text content and image data missing from HM_parse context")
if not expected_language:
error_messages.append("Missing language code from HM_filename_parse context")
if error_messages:
return {
"status": "error",
"error_message": "Context validation failed: " + ", ".join(error_messages)
}
# Prepare context result structure
context_result = {
"expected_region": expected_language.upper(),
"censorship_required": censored,
"language_matches": lang_matches,
"currency_found": None,
"price_value": None,
"format_valid": False,
"matches_region": False,
"validation_steps": [],
"image_used": False,
"image_format": None
}
try:
# 1) Check language-based pass conditions first (case-insensitive)
expected_language_lower = expected_language.lower()
if expected_language_lower in ["cen", "gen"]:
context_result.update({
"validation_steps": ["Language-based auto-pass"],
"matches_region": True # Bypass currency check
})
context[check_id] = context_result
return {
"status": "skipped",
"details": context_result
}
# 2) Prepare images for analysis using a PIL image
images = []
if parsed_image:
images = [parsed_image]
context_result.update({
"image_used": True,
"image_format": parsed_image.format.lower() if parsed_image.format else None
})
context_result["validation_steps"].append("Including document image for analysis")
# 3) Price detection with multimodal support
price_prompt = f"""I have an image that may contain price and currency information. Please analyze the image and extract any price and currency mentioned. If no price or currency is present in the image, confirm that instead. Ensure the extracted data includes both the numeric value (e.g., “699”) and the currency identifier (e.g., “USD,” “LE,” “EUR”). Be flexible in recognizing different formats (e.g., “$100,” “LE 699,” “€20”) and ensure accuracy.
Analyze both text (if present) and image (if provided) to detect currency and prices.
Document text content (may be empty):
---
{text_content}
---
Return JSON with:
- currency_found: 3-letter currency code or 'NOT_FOUND'
- price_value: detected numerical value or null
- format_valid: boolean indicating proper formatting
- confidence: confidence score 0-1
"""
price_resp = analyze_with_gpt(
prompt=price_prompt,
content=text_content,
images=images,
expect_json=True
)
price_info = json.loads(price_resp)
context_result.update({
"currency_found": price_info.get("currency_found"),
"price_value": price_info.get("price_value"),
"format_valid": price_info.get("format_valid", False),
"confidence_score": price_info.get("confidence", 0),
"validation_steps": ["Multimodal price detection completed"]
})
# 4) Handle no price found case
if price_info.get("currency_found") == "NOT_FOUND":
context_result.update({
"matches_region": True,
"validation_steps": ["No price found - skipping check"]
})
context[check_id] = context_result
return {
"status": "skipped",
"details": context_result
}
# 5) Currency-region validation
currency = price_info["currency_found"]
# Ensure the language code is standardized for the prompt
normalized_language = expected_language.replace("_", "-")
region_prompt = f"""Verify if {currency} matches {normalized_language} region.
Consider:
- Standard currency codes for the region
- Common alternative currency notations
- Historical currency usage if relevant
- Any visual cues from document layout/style
- Note that language codes can use either hyphen or underscore separators (e.g., en-US or en_US)
Return JSON with:
- matches_region: boolean
- reason: brief explanation
"""
match_resp = analyze_with_gpt(
prompt=region_prompt,
content="",
images=None,
expect_json=True
)
match_info = json.loads(match_resp)
context_result.update({
"matches_region": match_info.get("matches_region", False),
"region_validation_reason": match_info.get("reason", ""),
"validation_steps": ["Price detected", "Region validation completed"]
})
context[check_id] = context_result
# Final validation
validation_passed = (
context_result["format_valid"] and
context_result["matches_region"] and
context_result["confidence_score"] >= 0.7
)
if validation_passed:
return {"status": "passed", "details": context_result}
else:
return {
"status": "error" if context_result["confidence_score"] < 0.7 else "failed",
"error_message": "Currency/price validation failed",
"details": context_result
}
except json.JSONDecodeError as e:
context_result["error"] = f"GPT response parse error: {str(e)}"
context[check_id] = context_result
return {
"status": "error",
"error_message": "Failed to parse GPT response",
"details": context_result
}
except Exception as e:
context_result["error"] = str(e)
context[check_id] = context_result
return {
"status": "error",
"error_message": f"Validation process failed: {str(e)}",
"details": context_result
}

0
checks/__init__.py Executable file
View file

62
checks/analyze_with_gpt.py Executable file
View file

@ -0,0 +1,62 @@
from openai import OpenAI # Changed from AsyncOpenAI to OpenAI
import base64
from PIL import Image
from io import BytesIO
import logging
# Set up logging
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
# Initialize OpenAI client with synchronous client
#client = OpenAI(api_key="sk-proj-LaFeLI2v1p9TkGOIifAJT3BlbkFJk7SuBc0VkmmrRt5y9cQg") # Sync client
client = OpenAI(api_key="sk-svcacct-yRvRUPzN0Bq2-CJgZl4tgklRcHCfBsiMUhbK308vyQj91q-Q3wqfEHlBPXZ6QyeryHT3BlbkFJxErLrQ1ycFtrcU0xoXXxweoMwcUKxpQSNiN98L9d4AtIlmnNQtotgeuBf2iqpg7_AA") #QC specific key
def pil_image_to_base64(image):
buffered = BytesIO()
image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
def analyze_with_gpt(prompt: str, content: str, images, expect_json: bool = False) -> str:
"""Synchronous version using regular OpenAI client"""
try:
if expect_json:
system_prompt = "You are a document analysis assistant. Always respond with valid JSON only. Reply with the raw JSON only, no code block."
else:
system_prompt = "You are a document analysis assistant. Provide clear, concise responses."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"{prompt}\n\nDocument content:\n{content}"}
]
logger.info(f"\nIMAGES:\n{images}")
if images:
for image in images:
image_base64 = pil_image_to_base64(image)
messages.append({
"role": "user",
"content": [{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{image_base64}"
}
}]
})
# Synchronous API call
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.2
)
logger.info(f"prompt messages: \n\n {messages}\n\n")
result = response.choices[0].message.content.strip()
logger.info(f"OpenAI result: \n\n{result}\n\n")
return result
except Exception as e:
logger.error(f"OpenAI API error: {str(e)}")
return f"Error making OPENAI call: {str(e)}"

20
checks/business_data_check.py Executable file
View file

@ -0,0 +1,20 @@
def run_check(config):
required_fields = config.get("required_fields", [])
# Pretend we have some data dictionary to validate:
data = {
"campaign_id": "12345",
"budget": 5000,
"start_date": "2024-01-01"
}
# Check if all required fields are present
missing_fields = [field for field in required_fields if field not in data]
passes = len(missing_fields) == 0
return {
"status": "passed" if passes else "failed",
"details": {
"required_fields": required_fields,
"missing_fields": missing_fields,
"data_sample": data
}
}

View file

@ -0,0 +1,81 @@
import os
import json
def run_check(config):
"""
Verify that there is at least one 'colour' imagetype in linkingrecord.json,
and that all referenced colour images exist in the working directory.
Expected config:
- working_dir (str): Directory where linkingrecord.json and images reside.
- linkingrecord_filename (str): The name of the linking record file. Default: linkingrecord.json
Returns:
- "passed" if at least one colour imagetype item is found and all its files exist
- "failed" if no colour item is found or if any referenced colour image files are missing
- "error" if linkingrecord.json is not found or invalid
"""
working_dir = config.get("working_dir", "working")
linkingrecord_filename = config.get("linkingrecord_filename", "linkingrecord.json")
linkingrecord_path = os.path.join(working_dir, linkingrecord_filename)
if not os.path.exists(linkingrecord_path):
return {
"status": "error",
"error_message": f"Linking record '{linkingrecord_filename}' not found in {working_dir}."
}
# Load linkingrecord.json
with open(linkingrecord_path, 'r', encoding='utf-8') as f:
linkingrecord = json.load(f)
if "items" not in linkingrecord or not isinstance(linkingrecord["items"], list):
return {
"status": "error",
"error_message": "Invalid linkingrecord.json structure: 'items' missing or not a list."
}
found_colour = False
missing_files = set()
# Iterate over items to find those with imagetype == "colour"
for item in linkingrecord["items"]:
conditions = item.get("conditions", {})
if conditions.get("imagetype") == "colour":
found_colour = True
records = item.get("records", [])
for record in records:
assets = record.get("assets", [])
for asset in assets:
filename = asset.get("filename")
if filename:
file_path = os.path.join(working_dir, filename)
if not os.path.exists(file_path):
missing_files.add(filename)
if not found_colour:
# No colour imagetype found at all
return {
"status": "failed",
"details": {
"message": "No 'colour' imagetype found in linkingrecord."
}
}
if missing_files:
# Found colour items but some files are missing
return {
"status": "failed",
"details": {
"missing_files": sorted(list(missing_files))
}
}
# If we reach here, we found colour and all files exist
return {
"status": "passed",
"details": {
"message": "At least one 'colour' imagetype found and all colour image files exist."
}
}

152
checks/file_size_check.py Executable file
View file

@ -0,0 +1,152 @@
import os
import json
def run_check(config):
"""
Check that all present images do not exceed their maximum allowed file size.
Expected config:
- working_dir (str): Directory where linkingrecord.json and images reside.
- linkingrecord_filename (str): Name of the linking record file, default: linkingrecord.json
Changes:
- Deduplicate the list of failures by filename.
Logic:
- Determine asset type from conditions (viewtype, imagetype, experienceCondition).
- Map asset type to a max size.
- Check each existing image file. If any exceed max size, fail and list them.
- Ignore missing files.
- If conditions cannot determine asset type, return an error.
"""
working_dir = config.get("working_dir", "working")
linkingrecord_filename = config.get("linkingrecord_filename", "linkingrecord.json")
linkingrecord_path = os.path.join(working_dir, linkingrecord_filename)
if not os.path.exists(linkingrecord_path):
return {
"status": "error",
"error_message": f"Linking record '{linkingrecord_filename}' not found in {working_dir}."
}
# Load the linking record
with open(linkingrecord_path, 'r', encoding='utf-8') as f:
linkingrecord = json.load(f)
if "items" not in linkingrecord or not isinstance(linkingrecord["items"], list):
return {
"status": "error",
"error_message": "Invalid linkingrecord.json: 'items' missing or not a list."
}
# Max sizes in KB
max_sizes_kb = {
"Base Exterior Images": 600,
"Base Interior Images": 1024, # 1MB
"Engine and Transmission Images": 1024, # 1MB
"Interior Layered Option Images": 1024, # 1MB
"Exterior Layered Option Images": 600,
"Option Carousel Images": 500,
"Powertrain Image": 400,
"Showroom Images": 300,
"Colour Chips": 50,
"Bodystyle Images": 200,
"Series Images": 300,
"Trim Images": 700
}
def get_asset_type(conditions):
viewtype = conditions.get("viewtype")
imagetype = conditions.get("imagetype", None)
experience = conditions.get("experienceCondition")
# Determine asset type
if viewtype == "interior" and imagetype == "layeroptint":
return "Interior Layered Option Images"
if viewtype == "exterior" and imagetype == "layeroptext":
return "Exterior Layered Option Images"
if experience == "2d-background" and viewtype in ["exterior", "interior"] and imagetype is None:
return "Engine and Transmission Images"
if viewtype == "exterior" and imagetype is None:
return "Base Exterior Images"
if viewtype == "interior" and imagetype is None:
return "Base Interior Images"
if viewtype == "carousel":
if imagetype == "extra":
return "Option Carousel Images"
if imagetype == "powertrain":
return "Powertrain Image"
if imagetype == "colour":
return "Colour Chips"
if imagetype == "bodystyle":
return "Bodystyle Images"
if imagetype == "series":
return "Series Images"
if imagetype == "trim":
return "Trim Images"
if viewtype == "exterior" and imagetype == "showroom":
return "Showroom Images"
# No match
return None
# Use a dictionary to avoid duplicates (key by filename)
failed_images_dict = {}
# Iterate over items
for item in linkingrecord["items"]:
conditions = item.get("conditions", {})
asset_type = get_asset_type(conditions)
if asset_type is None:
return {
"status": "error",
"error_message": f"Could not determine asset type for conditions: {conditions}"
}
max_size_kb = max_sizes_kb[asset_type]
max_size_bytes = max_size_kb * 1024
records = item.get("records", [])
for record in records:
assets = record.get("assets", [])
for asset in assets:
filename = asset.get("filename")
if not filename:
continue
file_path = os.path.join(working_dir, filename)
if not os.path.exists(file_path):
# Ignore missing files
continue
# Check file size
try:
file_size = os.path.getsize(file_path)
except OSError:
# If we can't get size, skip
continue
if file_size > max_size_bytes:
if filename not in failed_images_dict:
failed_images_dict[filename] = {
"filename": filename,
"asset_type": asset_type,
"file_size_bytes": file_size,
"max_size_bytes": max_size_bytes
}
if failed_images_dict:
return {
"status": "failed",
"details": {
"message": "Some files exceed the maximum allowed size.",
"failed_images": list(failed_images_dict.values())
}
}
return {
"status": "passed",
"details": {
"message": "All present images are within the allowed file size."
}
}

212
checks/html_reporter.py Executable file
View file

@ -0,0 +1,212 @@
import os
import json
from datetime import datetime
class HTMLReporter:
@staticmethod
def generate_report(json_data: dict, reports_dir: str, input_filename: str) -> str:
"""
Updated method signature with proper parameters
"""
try:
# Create directory if needed
os.makedirs(reports_dir, exist_ok=True)
# Generate filename
date_str = datetime.now().isoformat(timespec='seconds').replace(":", "-")
safe_name = (input_filename.rpartition('.')[0].replace('.', '_') + ('.' + input_filename.rpartition('.')[2] if input_filename.rpartition('.')[1] else input_filename)).replace(" ", "_").replace(":", "-").split('.')[0]
report_filename = f"{safe_name}_{date_str}_QC.html"
output_path = os.path.join(reports_dir, report_filename)
# Generate HTML content
html_content = HTMLReporter._build_html_template(json_data, input_filename)
# Write to file
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
# Extract campaign path and create QC directory
# Pattern: /data/BOXSYNC/HM_AME/HM/CAMPAIGNS/{campaign_id}/JOBS/{job_id}/LOGS
path_parts = reports_dir.split('/')
if 'CAMPAIGNS' in path_parts:
campaign_index = path_parts.index('CAMPAIGNS')
if len(path_parts) > campaign_index + 1:
campaign_id = path_parts[campaign_index + 1]
# Create path to QC folder in the campaign directory
qc_dir = '/'.join(path_parts[:campaign_index + 2]) + '/QC/'
# Create QC directory if it doesn't exist
os.makedirs(qc_dir, exist_ok=True)
# Full path for the QC report
qc_report_path = os.path.join(qc_dir, report_filename)
# Write to QC directory
with open(qc_report_path, 'w', encoding='utf-8') as f:
f.write(html_content)
# Write to generic reports dir for testing - comment if not used
report_path = os.path.join("/opt/QC/reports/", report_filename)
with open(report_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return output_path
except Exception as e:
raise RuntimeError(f"HTML report generation failed: {str(e)}") from e
@staticmethod
def _build_html_template(json_data: dict, input_filename: str) -> str:
"""Construct the full HTML template with data"""
return f'''
<!DOCTYPE html>
<html lang="en">
{HTMLReporter._build_head(input_filename, json_data['timestamp'])}
<body>
<div class="container py-4">
{HTMLReporter._build_header(input_filename, json_data['timestamp'])}
<div class="accordion" id="checksAccordion">
{''.join(HTMLReporter._generate_check_html(check) for check in json_data['checks'])}
</div>
</div>
{HTMLReporter._build_scripts()}
</body>
</html>
'''
@staticmethod
def _build_head(filename: str, timestamp: str) -> str:
"""Build the HTML head section"""
return f'''
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>QC Report - {filename}</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
{HTMLReporter._get_css_styles()}
</style>
</head>
'''
@staticmethod
def _get_css_styles() -> str:
"""Return CSS styles"""
return '''
.status-badge { font-size: 0.8rem; padding: 0.35em 0.65em; }
.check-card { margin-bottom: 1rem; }
.details-list { list-style-type: none; padding-left: 1.5rem; }
.details-list li { margin-bottom: 0.5rem; }
.nested-details { padding-left: 1.5rem; margin-top: 0.5rem; border-left: 2px solid #dee2e6; }
.error-section { background-color: #fff3cd; border-radius: 4px; padding: 1rem; margin: 1rem 0; }
'''
@staticmethod
def _build_header(filename: str, timestamp: str) -> str:
"""Build the header section"""
return f'''
<header class="mb-4">
<h1 class="display-4">QC Report: {filename}</h1>
<p class="text-muted">Generated at: {datetime.fromisoformat(timestamp.rstrip('Z')).strftime('%Y-%m-%d %H:%M:%S')}</p>
</header>
'''
@staticmethod
def _build_scripts() -> str:
"""Include required JavaScript"""
return '''
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
'''
@staticmethod
def _generate_check_html(check: dict) -> str:
"""Generate HTML for an individual check"""
status_color = {
'passed': 'success',
'error': 'danger',
'failed': 'warning',
'skipped': 'secondary'
}.get(check['result']['status'].lower(), 'secondary')
return f'''
<div class="accordion-item check-card">
<h2 class="accordion-header" id="heading{check['index']}">
<button class="accordion-button collapsed" type="button" data-bs-toggle="collapse"
data-bs-target="#collapse{check['index']}" aria-expanded="false"
aria-controls="collapse{check['index']}">
<span class="badge bg-{status_color} status-badge me-2">{check['result']['status'].upper()}</span>
{check['id']}: {check['config'].get('description', 'No description')}
</button>
</h2>
<div id="collapse{check['index']}" class="accordion-collapse collapse"
aria-labelledby="heading{check['index']}" data-bs-parent="#checksAccordion">
<div class="accordion-body">
{HTMLReporter._generate_error_section(check['result'])}
<h5>Configuration</h5>
<ul class="details-list">
{HTMLReporter._format_details(check['config'])}
</ul>
<h5>Results</h5>
<ul class="details-list">
{HTMLReporter._format_details(check['result'].get('details', {}))}
</ul>
</div>
</div>
</div>
'''
@staticmethod
def _generate_error_section(result: dict) -> str:
"""Generate error section if present"""
if 'error_message' not in result:
return ''
return f'''
<div class="error-section">
<h5 class="text-danger">Error:</h5>
<p>{result['error_message']}</p>
{HTMLReporter._format_details(result.get('details', {}))}
</div>
'''
@staticmethod
def _format_details(details: dict, level: int = 0) -> str:
"""Recursively format nested details"""
items = []
for key, value in details.items():
if key == 'error_message':
continue
if isinstance(value, dict):
items.append(f'''
<li>
<strong>{key.title()}:</strong>
<div class="nested-details">
{HTMLReporter._format_details(value, level+1)}
</div>
</li>
''')
elif isinstance(value, list):
list_items = ''.join(f'<li>{item}</li>' for item in value)
items.append(f'''
<li>
<strong>{key.title()}:</strong>
<ul>{list_items}</ul>
</li>
''')
else:
items.append(f'<li><strong>{key.title()}:</strong> {value}</li>')
return '\n'.join(items)
if __name__ == "__main__":
# Example usage
import sys
if len(sys.argv) != 3:
print("Usage: python html_reporter.py <input_json> <output_html>")
sys.exit(1)
with open(sys.argv[1]) as f:
data = json.load(f)
HTMLReporter.generate_report(data, sys.argv[2])

95
checks/image_linking_check.py Executable file
View file

@ -0,0 +1,95 @@
import os
import json
def run_check(config):
"""
Check that all .jpg/.jpeg/.png images (case-insensitive) in the working directory (recursively)
are referenced by at least one record in linkingrecord.json, matching the full path
relative to working_dir.
Expected config:
- working_dir: Directory where linkingrecord.json and image files are located.
- linkingrecord_filename: The name of the linking record file (default: 'linkingrecord.json').
Behavior:
1. Load linkingrecord.json from working_dir.
2. Collect all referenced filenames (items->records->assets->filename).
These are stored as relative paths under working_dir, e.g. "dev/cgw07/yyi/images/...png".
3. Recursively walk through working_dir, gathering all files that end with
.jpg/.jpeg/.png (case-insensitive).
4. For each found image, compute its relative path (e.g., "dev/cgw07/.../image.png")
and check if that path is in the referenced set.
5. If any image is not found in the JSON, return "failed" with a list of unreferenced images.
6. Otherwise, return "passed".
"""
working_dir = config.get("working_dir", "working")
linkingrecord_filename = config.get("linkingrecord_filename", "linkingrecord.json")
linkingrecord_path = os.path.join(working_dir, linkingrecord_filename)
# 1. Verify the linkingrecord.json file exists
if not os.path.exists(linkingrecord_path):
return {
"status": "error",
"error_message": f"Linking record file '{linkingrecord_filename}' not found in {working_dir}."
}
# 2. Load linkingrecord.json
with open(linkingrecord_path, 'r', encoding='utf-8') as f:
linkingrecord = json.load(f)
if "items" not in linkingrecord or not isinstance(linkingrecord["items"], list):
return {
"status": "error",
"error_message": "Invalid linkingrecord.json structure: 'items' missing or not a list."
}
# 3. Collect all referenced paths from linkingrecord.json (normalized)
referenced_files = set()
for item in linkingrecord["items"]:
for record in item.get("records", []):
for asset in record.get("assets", []):
filename = asset.get("filename")
if filename:
# Normalize the path exactly as it's stored relative to working_dir
norm_path = os.path.normpath(filename)
referenced_files.add(norm_path)
# 4. Define image extensions to be considered (lowercase for easy comparison)
image_extensions = {'.jpg', '.jpeg', '.png'}
unreferenced_files = []
# 5. Recursively walk through the directory tree
for root, dirs, files in os.walk(working_dir):
for file in files:
# skip the linking record file itself
if file == linkingrecord_filename:
continue
# check if it has an image extension
ext = os.path.splitext(file)[1].lower()
if ext in image_extensions:
# Compute the path relative to working_dir, then normalize it
abs_path = os.path.join(root, file)
rel_path = os.path.relpath(abs_path, working_dir)
norm_rel_path = os.path.normpath(rel_path)
# Check if the relative path is in our referencing set
if norm_rel_path not in referenced_files:
unreferenced_files.append(norm_rel_path)
# 6. Return results
if unreferenced_files:
return {
"status": "failed",
"details": {
"unreferenced_files": sorted(unreferenced_files)
}
}
return {
"status": "passed",
"details": {
"message": "All .jpg/.jpeg/.png images in the folder (and subfolders) match a path in linkingrecord.json."
}
}

203
checks/image_resolution_check.py Executable file
View file

@ -0,0 +1,203 @@
import os
import json
from PIL import Image
def run_check(config):
"""
Check that each image in linkingrecord.json matches the specified resolution
based on (viewtype, imagetype) conditions and an overall MEC-or-BAU pack
designation. However, if the pack is MEC, certain items will still use the
BAU resolution map under these conditions:
1) viewtype in ("exterior", "interior") AND no experienceCondition
2) imagetype == "powertrain"
MEC detection (pack-level): If ANY item has "experienceCondition": "2d-background",
the entire pack is MEC. Otherwise, the entire pack is BAU.
Expected config:
- working_dir (str): Directory where linkingrecord.json and images reside.
- linkingrecord_filename (str): Name of the linking record file. Default: linkingrecord.json
Returns:
- "passed" if all checked images match their expected resolution
- "failed" if any present images don't match, listing each that failed
- "error" if linkingrecord is missing or invalid
"""
working_dir = config.get("working_dir", "working")
linkingrecord_filename = config.get("linkingrecord_filename", "linkingrecord.json")
linkingrecord_path = os.path.join(working_dir, linkingrecord_filename)
if not os.path.exists(linkingrecord_path):
return {
"status": "error",
"error_message": f"Linking record '{linkingrecord_filename}' not found in {working_dir}."
}
# Load linkingrecord.json
with open(linkingrecord_path, 'r', encoding='utf-8') as f:
linkingrecord = json.load(f)
if "items" not in linkingrecord or not isinstance(linkingrecord["items"], list):
return {
"status": "error",
"error_message": "Invalid linkingrecord.json structure: 'items' missing or not a list."
}
# -------------------------------------------------------------------------
# 1) Determine if ANY item has experienceCondition = "2d-background" => MEC
# -------------------------------------------------------------------------
any_mec = any(
item.get("conditions", {}).get("experienceCondition") == "2d-background"
for item in linkingrecord["items"]
)
pack_experience_type = "MEC" if any_mec else "BAU"
# -------------------------------------------------------------------------
# Define BAU and MEC resolution maps
# -------------------------------------------------------------------------
bau_map = {
("exterior", None): (1600, 900),
("interior", None): (1600, 900),
("exterior", "layeroptionext"): (1600, 900),
("interior", "layeroptionint"): (1600, 900),
("exterior", "showroom"): (768, 432),
("carousel", "extra"): (678, 381),
("carousel", "powertrain"): (678, 381),
("carousel", "colour"): (148, 83),
("carousel", "bodystyle"): (678, 381),
("carousel", "series"): (678, 381),
("carousel", "trim"): (678, 381),
}
mec_map = {
("exterior", None): (1600, 1600),
("interior", None): (1600, 1600),
("exterior", "layeroptionext"): (1600, 1600),
("interior", "layeroptionint"): (1600, 1600),
("exterior", "showroom"): (768, 432), # Same as BAU
("carousel", "extra"): (1280, 720),
("carousel", "powertrain"): (1600, 1600),
("carousel", "colour"): (148, 83), # Not changed
("carousel", "bodystyle"): (678, 381),
("carousel", "series"): (678, 381),
("carousel", "trim"): (678, 381),
}
# -------------------------------------------------------------------------
# Helper to decide which resolution map to use for a given item.
# Even if the entire pack is MEC, we might override to BAU for certain items.
# -------------------------------------------------------------------------
def get_resolution_map_for_item(viewtype, imagetype, item_experience_condition, pack_experience_type):
"""
Returns (resolution_map, used_experience_type_string).
If the pack is MEC, we override to BAU when:
- viewtype in ["exterior", "interior"] AND no experienceCondition
OR
- imagetype == "powertrain"
Otherwise, use the pack_experience_type's map (MEC or BAU).
"""
if pack_experience_type == "MEC":
# Condition #1: viewtype is exterior/interior & no experienceCondition
# Condition #2: imagetype is powertrain
if ((item_experience_condition is None and viewtype in ("exterior", "interior"))
or imagetype == "powertrain"):
return bau_map, "BAU"
else:
return mec_map, "MEC"
else:
# If pack is BAU, always BAU
return bau_map, "BAU"
def get_required_resolution(conditions, pack_experience_type):
"""
Determine the required resolution for the item based on the *actual*
resolution map used, which might be MEC or BAU due to overrides.
Returns ( (width, height), used_experience_type ).
"""
viewtype = conditions.get("viewtype")
imagetype = conditions.get("imagetype", None)
item_experience_condition = conditions.get("experienceCondition")
resolution_map, used_experience_type = get_resolution_map_for_item(
viewtype,
imagetype,
item_experience_condition,
pack_experience_type
)
key = (viewtype, imagetype)
# If the exact key isn't in the map, try a fallback without imagetype
if key not in resolution_map:
fallback_key = (viewtype, None)
if fallback_key in resolution_map:
return resolution_map[fallback_key], used_experience_type
else:
return None, used_experience_type
return resolution_map[key], used_experience_type
# -------------------------------------------------------------------------
# Main check logic
# -------------------------------------------------------------------------
failed_images = []
for item in linkingrecord["items"]:
conditions = item.get("conditions", {})
required_res, used_experience_type = get_required_resolution(conditions, pack_experience_type)
if required_res is None:
# If we don't know what resolution to apply for this item => error
return {
"status": "error",
"error_message": f"No known resolution for conditions: {conditions}"
}
expected_width, expected_height = required_res
records = item.get("records", [])
for record in records:
assets = record.get("assets", [])
for asset in assets:
filename = asset.get("filename")
if not filename:
continue
image_path = os.path.join(working_dir, filename)
if not os.path.exists(image_path):
# Ignore missing files for this check
continue
# Open image and check resolution
try:
with Image.open(image_path) as img:
width, height = img.size
except Exception:
# Can't open or read the file => skip
continue
if width != expected_width or height != expected_height:
failed_images.append({
"filename": filename,
"viewtype": conditions.get("viewtype"),
"imagetype": conditions.get("imagetype"),
"used_experience_type": used_experience_type,
"expected_resolution": f"{expected_width}x{expected_height}",
"actual_resolution": f"{width}x{height}"
})
if failed_images:
return {
"status": "failed",
"details": {
"failed_images": failed_images
}
}
return {
"status": "passed",
"details": {
"message": "All present images match their required resolution."
}
}

67
checks/missing_images_check.py Executable file
View file

@ -0,0 +1,67 @@
import os
import json
def run_check(config):
"""
Check for missing images defined in linkingrecord.json.
Expected config:
- working_dir: Directory where linkingrecord.json and extracted files are located.
- linkingrecord_filename: The name of the linking record file (default: 'linkingrecord.json').
Behavior:
- Load linkingrecord.json from working_dir.
- Iterate through items->records->assets->filename.
- Check if file exists at working_dir/filename.
- If any missing files, return "failed" with a list of missing files.
- Otherwise, return "passed".
"""
working_dir = config.get("working_dir", "working")
linkingrecord_filename = config.get("linkingrecord_filename", "linkingrecord.json")
linkingrecord_path = os.path.join(working_dir, linkingrecord_filename)
if not os.path.exists(linkingrecord_path):
return {
"status": "error",
"error_message": f"Linking record file '{linkingrecord_filename}' not found in {working_dir}."
}
# Load the linking record JSON
with open(linkingrecord_path, 'r', encoding='utf-8') as f:
linkingrecord = json.load(f)
if "items" not in linkingrecord or not isinstance(linkingrecord["items"], list):
return {
"status": "error",
"error_message": "Invalid linkingrecord.json structure: 'items' missing or not a list."
}
missing_files = set()
# Iterate over all items and their records
for item in linkingrecord["items"]:
records = item.get("records", [])
for record in records:
assets = record.get("assets", [])
for asset in assets:
filename = asset.get("filename")
if filename:
file_path = os.path.join(working_dir, filename)
if not os.path.exists(file_path):
missing_files.add(filename)
if missing_files:
return {
"status": "failed",
"details": {
"missing_files": sorted(list(missing_files))
}
}
return {
"status": "passed",
"details": {
"message": "All referenced images exist."
}
}

View file

@ -0,0 +1,142 @@
import os
import json
def run_check(config):
"""
Special Requirements MEC/BAU Check
Logic:
- Check if this is MEC scenario (any item has experienceCondition="2d-background").
* If MEC:
- Find exterior item with experienceCondition="2d-background".
- In its records, look for angle=30 and an asset filename containing "powertrain" and "mec_30_0".
- If not found, fail.
* If BAU (no "2d-background"):
- Find carousel/powertrain item.
- Ensure it has at least one asset in its records.
- If not found, fail.
"""
working_dir = config.get("working_dir", "working")
linkingrecord_filename = config.get("linkingrecord_filename", "linkingrecord.json")
linkingrecord_path = os.path.join(working_dir, linkingrecord_filename)
if not os.path.exists(linkingrecord_path):
return {
"status": "error",
"error_message": f"Linking record '{linkingrecord_filename}' not found in {working_dir}."
}
with open(linkingrecord_path, 'r', encoding='utf-8') as f:
linkingrecord = json.load(f)
if "items" not in linkingrecord or not isinstance(linkingrecord["items"], list):
return {
"status": "error",
"error_message": "Invalid linkingrecord.json: 'items' missing or not a list."
}
items = linkingrecord["items"]
# Detect if MEC scenario:
# MEC if any item has experienceCondition="2d-background"
is_mec = any(
item.get("conditions", {}).get("experienceCondition") == "2d-background"
for item in items
)
if is_mec:
# MEC scenario:
# Find exterior item with experienceCondition="2d-background"
exterior_mec_items = [
item for item in items
if item.get("conditions", {}).get("viewtype") == "exterior"
and item.get("conditions", {}).get("experienceCondition") == "2d-background"
]
if not exterior_mec_items:
return {
"status": "failed",
"details": {
"message": "MEC scenario detected but no exterior item with experienceCondition='2d-background' found."
}
}
# Check each exterior MEC items records for angle=30 and a powertrain mec image
found_powertrain_mec_30 = False
for item in exterior_mec_items:
records = item.get("records", [])
for record in records:
if record.get("angle") == 30:
# Check assets for a powertrain mec image
for asset in record.get("assets", []):
filename = asset.get("filename", "")
# Check if filename indicates powertrain mec_30_0 image
# Adjust pattern if needed:
if "powertrain" in filename and ("mec_30_0" in filename or "30_0_mec" in filename):
found_powertrain_mec_30 = True
break
if found_powertrain_mec_30:
break
if found_powertrain_mec_30:
break
if not found_powertrain_mec_30:
return {
"status": "failed",
"details": {
"message": "MEC scenario: No angle=30 powertrain mec_30_0 image found in exterior MEC section."
}
}
# If found, we pass
return {
"status": "passed",
"details": {
"message": "MEC scenario: angle=30 powertrain mec image found."
}
}
else:
# BAU scenario:
# Need to find a carousel/powertrain item with at least one asset
carousel_powertrain_items = [
item for item in items
if item.get("conditions", {}).get("viewtype") == "carousel"
and item.get("conditions", {}).get("imagetype") == "powertrain"
]
if not carousel_powertrain_items:
return {
"status": "failed",
"details": {
"message": "BAU scenario: No carousel/powertrain section found."
}
}
# Check for at least one asset
found_asset = False
for item in carousel_powertrain_items:
records = item.get("records", [])
for record in records:
assets = record.get("assets", [])
if assets:
found_asset = True
break
if found_asset:
break
if not found_asset:
return {
"status": "failed",
"details": {
"message": "BAU scenario: carousel/powertrain found but no assets present."
}
}
return {
"status": "passed",
"details": {
"message": "BAU scenario: carousel/powertrain section with assets found."
}
}

View file

@ -0,0 +1,78 @@
import os
import zipfile
import shutil
def run_check(config):
# We expect config to contain:
# - input_file: The path to the zip file to unzip
# - expected_file: The name of the file we expect to find inside the zip
# - working_dir: The directory to extract files into
input_file = config.get("input_file")
expected_file = config.get("expected_file", "linkingrecord.json")
working_dir = config.get("working_dir", "working")
if not input_file or not os.path.exists(input_file):
return {
"status": "error",
"error_message": "Input file not provided or does not exist."
}
"""
Ensure `working_dir` exists and is empty.
If it doesn't exist, create it.
If it does exist, delete all of its contents.
"""
if os.path.exists(working_dir):
# Remove all contents of the existing directory
try:
for item in os.listdir(working_dir):
item_path = os.path.join(working_dir, item)
if os.path.isfile(item_path) or os.path.islink(item_path):
os.remove(item_path)
else:
shutil.rmtree(item_path)
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to clean working directory: {e}"
}
else:
# Create working directory if it doesn't exist
try:
os.makedirs(working_dir)
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to create working directory: {e}"
}
# Verify that input_file is a zip by attempting to open it
if not zipfile.is_zipfile(input_file):
return {
"status": "error",
"error_message": f"Input file '{input_file}' is not a valid zip file."
}
# Extract the zip contents into the working directory
with zipfile.ZipFile(input_file, 'r') as zf:
zf.extractall(working_dir)
# Check if the expected_file is present in working_dir
expected_path = os.path.join(working_dir, expected_file)
if not os.path.exists(expected_path):
return {
"status": "failed",
"details": {
"missing_file": expected_file,
"message": f"{expected_file} not found after extraction."
}
}
return {
"status": "passed",
"details": {
"message": f"{expected_file} found in {working_dir}.",
"extracted_dir": working_dir
}
}

42
launchers/HM_launcher_CLI.py Executable file
View file

@ -0,0 +1,42 @@
import sys
import os
import logging
sys.path.append('/opt/QC')
sys.path.append('/opt/QC/checks')
import qc_module # Ensure qc_module is on your PYTHONPATH or installed
import json
PROFILE_PATH = "/opt/QC/profiles/HM.json"
REPORT_PATH = sys.argv[2]
def main():
# Optional: Set up basic logging if you want to log any info or debug messages
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
# Check command-line args
if len(sys.argv) != 3:
print(f"Usage: {os.path.basename(sys.argv[0])} <path_to_input_file> <path_to_save_report_html>", file=sys.stderr)
sys.exit(1)
file_path = sys.argv[1]
# Validate that the file exists
if not os.path.isfile(file_path):
print(f"Error: File '{file_path}' does not exist.", file=sys.stderr)
sys.exit(1)
try:
# Run QC checks and capture the JSON string result
qc_report_json_str = qc_module.run_qc_checks(PROFILE_PATH, file_path, REPORT_PATH)
# Print the JSON string to stdout
print(qc_report_json_str)
except Exception as e:
logging.error(f"Error running QC checks on '{file_path}': {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,146 @@
#!/usr/bin/env python3
import os
import json
import datetime
import sys
import fcntl
import logging
from logging.handlers import RotatingFileHandler
from boxsdk import JWTAuth, Client
# This import points to the module containing the run_qc_checks() function.
# Replace "qc_module" with the actual name of your module if it's different.
import qc_module
# ------------------------------------------------------------------------------
# 1. BOX CONFIGURATION & AUTHENTICATION
# ------------------------------------------------------------------------------
BOX_CLI_CONFIG_PATH = 'ford_box_config.json'
# Folder IDs
SOURCE_FOLDER_ID = '303321023292' # Move files *from* this folder...
REPORT_FOLDER_ID = '303321539397' # ...and upload reports *to* this folder
# Local folder path to which files will be downloaded
LOCAL_DOWNLOAD_PATH = 'download_tmp' # <-- Change to a valid directory
# Path to your QC profile used by run_qc_checks()
PROFILE_PATH = "profiles/ford_bnp.json" # <-- Update to your real profile path
# Lock file path
LOCK_FILE_PATH = '/tmp/ford_qc_script.lock' # Prevent concurrent runs
# ------------------------------------------------------------------------------
# 2. SET UP LOGGING
# ------------------------------------------------------------------------------
os.makedirs('log', exist_ok=True)
LOG_FILE_PATH = 'log/ford_qc_script.log'
logging.basicConfig(
filename=LOG_FILE_PATH,
level=logging.INFO, # We'll log at INFO level for our script
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# -------------------------------------------------------------------------
# RAISE BOX PYTHON SDK LOG LEVEL SO WE DON'T SEE THE DETAILED HTTP PAYLOAD
# -------------------------------------------------------------------------
logging.getLogger('boxsdk').setLevel(logging.WARNING)
# ^ This will suppress the verbose "GET/POST" and request/response logs.
# If you still see logs from submodules, you could also do:
# logging.getLogger('boxsdk.network').setLevel(logging.WARNING)
# logging.getLogger('boxsdk.auth').setLevel(logging.WARNING)
def main():
logging.info("Script started.")
# Acquire an exclusive, non-blocking lock so only one instance can run
with open(LOCK_FILE_PATH, 'w') as lock_file:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
logging.warning("Another instance of the script is currently running. Exiting.")
sys.exit(1)
# ------------------------------------------------------------------------------
# 3. AUTHENTICATE WITH BOX
# ------------------------------------------------------------------------------
logging.info("Authenticating with Box...")
auth = JWTAuth.from_settings_file(BOX_CLI_CONFIG_PATH)
client = Client(auth)
# Ensure the local download directory exists
os.makedirs(LOCAL_DOWNLOAD_PATH, exist_ok=True)
# ------------------------------------------------------------------------------
# 4. DOWNLOAD (MOVE) ONLY .ZIP FILES FROM BOX SOURCE FOLDER
# ------------------------------------------------------------------------------
logging.info(f"Retrieving items from Box folder {SOURCE_FOLDER_ID}.")
source_folder = client.folder(folder_id=SOURCE_FOLDER_ID).get()
items = source_folder.get_items(limit=1000)
for item in items:
# Only handle files that end with .zip
if item.type == 'file' and item.name.lower().endswith('.zip'):
local_file_path = os.path.join(LOCAL_DOWNLOAD_PATH, item.name)
logging.info(f"Downloading .zip file from Box: {item.name} -> {local_file_path}")
with open(local_file_path, 'wb') as local_file:
item.download_to(local_file)
# Optionally delete the file from Box to simulate "move"
logging.info(f"Deleting file from Box folder: {item.name}")
item.delete()
# ------------------------------------------------------------------------------
# 5. PROCESS EACH DOWNLOADED .ZIP FILE WITH qc_module.run_qc_checks()
# ------------------------------------------------------------------------------
local_files = os.listdir(LOCAL_DOWNLOAD_PATH)
for file_name in local_files:
file_path = os.path.join(LOCAL_DOWNLOAD_PATH, file_name)
# Skip if it's not a regular file or doesn't end with .zip
if not (os.path.isfile(file_path) and file_name.lower().endswith('.zip')):
continue
logging.info(f"Processing local file: {file_path}")
try:
# run_qc_checks() returns a JSON string with the QC results
qc_report_json_str = qc_module.run_qc_checks(PROFILE_PATH, file_path)
# Create a unique JSON filename
timestamp = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
report_file_name = f"report_{timestamp}_{file_name}.json"
local_report_path = os.path.join(LOCAL_DOWNLOAD_PATH, report_file_name)
# Write the returned JSON string to a local file
with open(local_report_path, 'w', encoding='utf-8') as report_file:
report_file.write(qc_report_json_str)
logging.info(f"QC checks complete. JSON report: {local_report_path}")
# ------------------------------------------------------------------------------
# 6. UPLOAD THE REPORT JSON FILE TO BOX
# ------------------------------------------------------------------------------
logging.info(f"Uploading report to Box folder {REPORT_FOLDER_ID}: {report_file_name}")
client.folder(REPORT_FOLDER_ID).upload(
local_report_path,
file_name=report_file_name
)
# ------------------------------------------------------------------------------
# 7. CLEAN UP
# ------------------------------------------------------------------------------
os.remove(file_path) # Remove the original .zip file
os.remove(local_report_path) # Remove the generated report file
logging.info(f"Removed local files: {file_path} and {local_report_path}")
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
logging.info("All done!")
if __name__ == '__main__':
main()

57
profiles/HM.json Executable file
View file

@ -0,0 +1,57 @@
[
{
"id": "HM_parse",
"script": "checks.HM_parse",
"config": {
"description": "Parses document with Llamaparse, returning both text extraction and image of document",
"input_file": "supplied by launcher script",
"working_dir": "/tmp/HM_working"
}
},
{
"id": "HM_filename_parse",
"script": "checks.HM_filename_parse",
"config": {
"description": "Parses filename into constituent pieces",
"working_dir": "/tmp/HM_working",
"filename_path": "filename.txt"
}
},
{
"id": "HM_imprint_check",
"script": "checks.HM_imprint_check",
"config": {
"description": "Checks imprint on document against relevant portion of filename",
"working_dir": "/tmp/HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg"
}
},
{
"id": "HM_language_validate",
"script": "checks.HM_language_validate",
"config": {
"description": "Validates language of document content against language code in file name",
"working_dir": "/tmp/HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg"
}
},
{
"id": "HM_price_currency_check",
"script": "checks.HM_price_currency_check",
"config": {
"description": "Validates currency of pricing in document against language detected in previous check",
"working_dir": "/tmp/HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg"
}
},
{
"id": "HM_censorship",
"script": "checks.HM_censorship",
"config": {
"description": "Checks CEN and GEN assets (primarily GEN) to determine whether the image(s) are actually censored for conservative markets. Examines images for full coverage of clothing on all body parts (except face and hands) and fail if too much skin is exposed.",
"images_dir": "/opt/QC/supporting/censorship_trainset",
"working_dir": "/tmp/HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg"
}
}
]

54
profiles/ford_bnp.json Executable file
View file

@ -0,0 +1,54 @@
[
{
"script": "checks.unzip_and_verify_check",
"config": {
"expected_file": "linkingrecord.json",
"working_dir": "working"
}
},
{
"script": "checks.colour_existence_check",
"config": {
"working_dir": "working",
"linkingrecord_filename": "linkingrecord.json"
}
},
{
"script": "checks.missing_images_check",
"config": {
"working_dir": "working",
"linkingrecord_filename": "linkingrecord.json"
}
},
{
"script": "checks.image_resolution_check",
"config": {
"min_width": "defined in script for each asset type",
"min_height": "defined in script for each asset type"
}
},
{
"script": "checks.file_size_check",
"config": {
"max_size_mb": "defined in script for each asset type"
}
},
{
"script": "checks.special_requirements_mec_bau",
"config": {
"mec_pack": "checking for MEC vs BAU logic included in the script"
}
},
{
"script": "checks.image_linking_check",
"config": {
"no_config_needed": "logic contained in check file"
}
},
{
"script": "checks.business_data_check",
"config": {
"required_fields": ["campaign_id", "budget", "start_date"]
}
}
]

139
qc_module.py Executable file
View file

@ -0,0 +1,139 @@
import json
import os
import importlib
import datetime
from typing import Dict, Any, Optional, List
import logging
from html_reporter import HTMLReporter
# Add this near the top with other constants
REPORTS_DIR = os.path.join(os.path.dirname(__file__), "reports") # Or your preferred path
os.makedirs(REPORTS_DIR, exist_ok=True) # Create directory if needed
def run_single_check(script: str, config: Dict[str, Any], context: Dict[str, Any], check_id: str) -> Dict[str, Any]:
"""
Import and run a single QC check module with context support.
The module should implement a 'run_check(config: dict, context: dict, check_id: str) -> dict' function.
:param script: The dotted path to the check script module
:param config: Configuration dictionary for the check
:param context: Shared context dictionary between checks
:param check_id: Unique identifier for this check
:return: QC check result dictionary
"""
try:
module = importlib.import_module(script)
except ModuleNotFoundError as e:
return {
"status": "error",
"error_message": f"Module not found: {script}",
"exception": str(e)
}
if not hasattr(module, "run_check"):
return {
"status": "error",
"error_message": f"Module '{script}' does not implement run_check(config, context, check_id)."
}
try:
result = module.run_check(config=config, context=context, check_id=check_id)
except Exception as e:
return {
"status": "error",
"error_message": f"Check '{check_id}' failed with exception",
"exception": str(e)
}
if not isinstance(result, dict):
return {
"status": "error",
"error_message": f"run_check did not return a dictionary for check '{check_id}'."
}
return result
def run_qc_profile(profile_path: str, input_file: Optional[str] = None) -> Dict[str, Any]:
"""
Run all QC checks defined in the given profile with shared context.
:param profile_path: Path to the QC profile JSON file
:param input_file: Optional input file path for checks
:return: Aggregated results with context-aware checks
"""
with open(profile_path, 'r', encoding='utf-8') as f:
profile = json.load(f)
if not isinstance(profile, list):
raise ValueError("QC profile must be a JSON array of check definitions.")
aggregated_results = {
"profile": os.path.basename(profile_path),
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
"checks": [],
"context_snapshot": {} # Final state of the context for debugging
}
context: Dict[str, Any] = {} # Shared context between checks
executed_ids = set() # Track check IDs for uniqueness
for idx, check_def in enumerate(profile, start=1):
if not isinstance(check_def, dict):
raise ValueError(f"Check #{idx} must be a dictionary object.")
script = check_def.get("script")
check_id = check_def.get("id")
config = check_def.get("config", {})
if not script:
raise ValueError(f"Check #{idx} is missing a 'script' value.")
if not check_id:
raise ValueError(f"Check #{idx} is missing required 'id' field.")
if check_id in executed_ids:
raise ValueError(f"Duplicate check ID detected: '{check_id}'")
executed_ids.add(check_id)
# Inject common parameters
if input_file is not None:
config["input_file"] = input_file
# Run the check with shared context
result = run_single_check(script, config, context, check_id)
# Store results with check ID
aggregated_results["checks"].append({
"index": idx,
"id": check_id,
"script": script,
"config": config,
"result": result
})
# Store final context state (consider security implications for production use)
# aggregated_results["context_snapshot"] = context
return aggregated_results
def run_qc_checks(profile_path: str, input_file: str, report_path: str) -> str:
# Your existing QC execution code
json_results = run_qc_profile(profile_path, input_file)
# Generate report path components
reports_dir = report_path
input_filename = os.path.basename(input_file) if input_file else "unknown_file"
#hm_filename_parse_data = context.get("HM_filename_parse", {})
#input_filename = hm_filename_parse_data.get("short_name") if hm_filename_parse_data.get("short_name") else "couldnt_get_short_filename_from_context"
# Call reporter with correct arguments
return HTMLReporter.generate_report(
json_data=json_results,
reports_dir=reports_dir,
input_filename=input_filename
)

118
requirements.txt Executable file
View file

@ -0,0 +1,118 @@
aiohappyeyeballs==2.4.4
aiohttp==3.11.11
aiosignal==1.3.2
alembic==1.14.1
annotated-types==0.7.0
anyio==4.8.0
APScheduler==3.11.0
asyncer==0.0.8
attrs==24.3.0
backoff==2.2.1
cachetools==5.5.1
certifi==2024.12.14
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
cloudpickle==3.1.1
colorlog==6.9.0
cryptography==42.0.8
dataclasses-json==0.6.7
datasets==3.2.0
Deprecated==1.2.15
dill==0.3.8
dirtyjson==1.0.8
diskcache==5.6.3
distro==1.9.0
dnspython==2.7.0
dspy==2.5.43
email_validator==2.2.0
fastapi==0.111.1
fastapi-cli==0.0.7
fastapi-sso==0.10.0
filelock==3.17.0
filetype==1.2.0
frozenlist==1.5.0
fsspec==2024.9.0
greenlet==3.1.1
gunicorn==22.0.0
h11==0.14.0
httpcore==1.0.7
httptools==0.6.4
httpx==0.27.2
huggingface-hub==0.27.1
idna==3.10
importlib_metadata==8.6.1
Jinja2==3.1.5
jiter==0.8.2
joblib==1.4.2
json_repair==0.35.0
jsonschema==4.23.0
jsonschema-specifications==2024.10.1
litellm==1.53.7
llama-index-core==0.12.12
llama-parse==0.5.19
magicattr==0.1.6
Mako==1.3.8
markdown-it-py==3.0.0
MarkupSafe==3.0.2
marshmallow==3.25.1
mdurl==0.1.2
multidict==6.1.0
multiprocess==0.70.16
mypy-extensions==1.0.0
nest-asyncio==1.6.0
networkx==3.4.2
nltk==3.9.1
numpy==2.2.2
oauthlib==3.2.2
openai==1.60.0
optuna==4.2.0
orjson==3.10.15
packaging==24.2
pandas==2.2.3
pillow==11.1.0
propcache==0.2.1
pyarrow==19.0.0
pycparser==2.22
pydantic==2.10.5
pydantic_core==2.27.2
Pygments==2.19.1
PyJWT==2.10.1
PyNaCl==1.5.0
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-multipart==0.0.9
pytz==2024.2
PyYAML==6.0.2
redis==5.2.1
referencing==0.36.1
regex==2024.11.6
requests==2.32.3
rich==13.9.4
rich-toolkit==0.13.2
rpds-py==0.22.3
rq==2.1.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.37
starlette==0.37.2
tenacity==9.0.0
tiktoken==0.8.0
tokenizers==0.21.0
tqdm==4.67.1
typer==0.15.1
typing-inspect==0.9.0
typing_extensions==4.12.2
tzdata==2025.1
tzlocal==5.2
ujson==5.10.0
urllib3==2.3.0
uvicorn==0.22.0
uvloop==0.21.0
watchfiles==1.0.4
websockets==14.2
wrapt==1.17.2
xxhash==3.5.0
yarl==1.18.3
zipp==3.21.0

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 742 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 419 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 829 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 838 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 147 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 579 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 558 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 159 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 248 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 416 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 336 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 718 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 57 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 303 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 218 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 343 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 391 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 431 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 158 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 828 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 515 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 302 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 665 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.9 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 220 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

150
utils/input_report.json Executable file
View file

@ -0,0 +1,150 @@
{
"profile": "HM.json",
"timestamp": "2025-01-24T16:48:28.409049Z",
"checks": [
{
"index": 1,
"id": "HM_parse",
"script": "checks.HM_parse",
"config": {
"description": "Parses document with Llamaparse, returning both text extraction and image of document",
"input_file": "input_bucket/10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf",
"working_dir": "HM_working"
},
"result": {
"status": "passed",
"details": {
"message": "PDF parsed successfully.",
"working_dir": "HM_working",
"text_file": "HM_working/extracted_text.txt",
"image_files": [
"HM_working/parsed_test_image.jpg"
]
}
}
},
{
"index": 2,
"id": "HM_filename_parse",
"script": "checks.HM_filename_parse",
"config": {
"description": "Parses filename into constituent pieces",
"working_dir": "HM_working",
"filename_path": "filename.txt",
"input_file": "input_bucket/10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf"
},
"result": {
"status": "passed",
"details": {
"message": "Filename parsed and stored in context",
"filename_source": "HM_parse context",
"gpt_response_summary": "Parsed 5 components"
}
}
},
{
"index": 3,
"id": "HM_imprint_check",
"script": "checks.HM_imprint_check",
"config": {
"description": "Checks imprint on document against relevant portion of filename",
"working_dir": "HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg",
"input_file": "input_bucket/10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf"
},
"result": {
"status": "passed",
"details": {
"message": "Imprint reference verified",
"filename": "10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf",
"expected_reference": "1001D_10004-02",
"detected_imprint": "1001D_10004-02_GEN",
"match_verified": true,
"skipped": false,
"verification_response": "true"
}
}
},
{
"index": 4,
"id": "HM_language_validate",
"script": "checks.HM_language_validate",
"config": {
"description": "Validates language of document content against language code in file name",
"working_dir": "HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg",
"input_file": "input_bucket/10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf"
},
"result": {
"status": "passed",
"details": {
"filename": "10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf",
"expected_language": "CEN",
"detected_language": "CEN",
"matches": true,
"isCensorshipRequired": true,
"validation_method": "auto",
"gpt_response": null
}
}
},
{
"index": 5,
"id": "HM_price_currency_check",
"script": "checks.HM_price_currency_check",
"config": {
"description": "Validates currency of pricing in document against language detected in previous check",
"working_dir": "HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg",
"input_file": "input_bucket/10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf"
},
"result": {
"status": "passed",
"details": {
"expected_region": "CEN",
"censorship_required": true,
"language_matches": true,
"currency_found": null,
"price_value": null,
"format_valid": false,
"matches_region": true,
"validation_steps": [
"Language-based auto-pass"
]
}
}
},
{
"index": 6,
"id": "HM_censorship",
"script": "checks.HM_censorship",
"config": {
"description": "Checks CEN and GEN assets (primarily GEN) to determine whether the image(s) are actually censored for conservative markets. Examines images for full coverage of clothing on all body parts (except face and hands) and fail if too much skin is exposed.",
"images_dir": "supporting/censorship_trainset",
"working_dir": "HM_working",
"test_image_path": "HM_working/parsed_test_image.jpg",
"input_file": "input_bucket/10.8x14cm_quarter_letter_1001D_10004-02_CEN.pdf"
},
"result": {
"status": "error",
"error_message": "Censorship requirement mismatch",
"details": {
"expected_language": "CEN",
"censorship_required": true,
"check_performed": true,
"check_outcome": "failed",
"test_results": {
"is_censored": false,
"required_censorship": true,
"censorship_match": false,
"reasoning": "The outfit consists of a sports bra that covers the upper body but leaves the midriff exposed, while the high-waisted leggings cover the legs completely. Since the midriff is not covered, the clothing does not meet the criteria for being considered censored.",
"test_image": "parsed_test_image.jpg"
},
"error": null,
"expected_censored": true,
"actual_censored": false
}
}
}
]
}

140
utils/report.py Executable file
View file

@ -0,0 +1,140 @@
import json
from datetime import datetime
def generate_html_report(json_data, output_file):
# Extract input filename from first check
input_file_path = json_data['checks'][0]['config']['input_file']
input_filename = input_file_path.split('/')[-1] # Get just the filename
# HTML template with Bootstrap for styling
html_template = f'''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>QC Report - {input_filename}</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
.status-badge {{
font-size: 0.8rem;
padding: 0.35em 0.65em;
}}
.check-card {{
margin-bottom: 1rem;
}}
.details-list {{
list-style-type: none;
padding-left: 1.5rem;
}}
.details-list li {{
margin-bottom: 0.5rem;
}}
.nested-details {{
padding-left: 1.5rem;
margin-top: 0.5rem;
border-left: 2px solid #dee2e6;
}}
.error-section {{
background-color: #fff3cd;
border-radius: 4px;
padding: 1rem;
margin: 1rem 0;
}}
</style>
</head>
<body>
<div class="container py-4">
<header class="mb-4">
<h1 class="display-4">QC Report: {input_filename}</h1>
<p class="text-muted">Generated at: {datetime.fromisoformat(json_data["timestamp"]).strftime('%Y-%m-%d %H:%M:%S')}</p>
</header>
<div class="accordion" id="checksAccordion">
{''.join([generate_check_html(check) for check in json_data['checks']])}
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
</body>
</html>
'''
with open(output_file, 'w') as f:
f.write(html_template)
def generate_check_html(check):
status_color = {
'passed': 'success',
'error': 'danger',
'failed': 'warning'
}.get(check['result']['status'].lower(), 'secondary')
details_html = format_details(check['result'].get('details', {}))
error_html = ''
if 'error_message' in check['result']:
error_html = f'''
<div class="error-section">
<h5 class="text-danger">Error:</h5>
<p>{check['result']['error_message']}</p>
</div>
'''
return f'''
<div class="accordion-item check-card">
<h2 class="accordion-header" id="heading{check['index']}">
<button class="accordion-button collapsed" type="button" data-bs-toggle="collapse"
data-bs-target="#collapse{check['index']}" aria-expanded="false"
aria-controls="collapse{check['index']}">
<span class="badge bg-{status_color} status-badge me-2">{check['result']['status'].upper()}</span>
{check['id']}: {check['config']['description']}
</button>
</h2>
<div id="collapse{check['index']}" class="accordion-collapse collapse"
aria-labelledby="heading{check['index']}" data-bs-parent="#checksAccordion">
<div class="accordion-body">
{error_html}
<h5>Configuration</h5>
<ul class="details-list">
{format_details(check['config'])}
</ul>
<h5>Results</h5>
<ul class="details-list">
{details_html}
</ul>
</div>
</div>
</div>
'''
def format_details(details, level=0):
items = []
for key, value in details.items():
if isinstance(value, dict):
items.append(f'''
<li>
<strong>{key.title()}:</strong>
<div class="nested-details">
{format_details(value, level+1)}
</div>
</li>
''')
elif isinstance(value, list):
list_items = ''.join([f'<li>{item}</li>' for item in value])
items.append(f'''
<li>
<strong>{key.title()}:</strong>
<ul>{list_items}</ul>
</li>
''')
else:
items.append(f'<li><strong>{key.title()}:</strong> {value}</li>')
return '\n'.join(items)
# Example usage
if __name__ == "__main__":
with open('input_report.json') as f:
data = json.load(f)
generate_html_report(data, 'qc_report.html')