- Add utils/config.py module using python-dotenv for centralized config - Externalize Box folder IDs, paths, and timeout settings to .env files - Create .env.example template with all configuration variables - Add separate systemd service files for prod and dev environments - Update ford_qc_box_hotfolder_process.py to use config module - Update qc_engine.py and path_resolver.py with optional config support - Maintain backwards compatibility with hardcoded defaults - Update documentation in CLAUDE.md 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
295 lines
No EOL
11 KiB
Python
295 lines
No EOL
11 KiB
Python
import json
|
|
import os
|
|
import importlib
|
|
import datetime
|
|
import sys
|
|
from typing import Dict, Any
|
|
import argparse
|
|
|
|
# Get script directory and setup paths
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# Add script directory to path for imports
|
|
sys.path.insert(0, script_dir)
|
|
|
|
# Try to load configuration (optional for qc_engine - can run without .env)
|
|
try:
|
|
from utils.config import config
|
|
# Add fallback path from config if available
|
|
fallback_checks_path = os.path.join(
|
|
os.path.dirname(config.FALLBACK_WORKING_DIR),
|
|
'checks'
|
|
)
|
|
if os.path.exists(fallback_checks_path):
|
|
sys.path.append(fallback_checks_path)
|
|
except (ImportError, Exception):
|
|
# Config module not available or not configured - use hardcoded fallback for backwards compatibility
|
|
fallback_path = '/home/box-cli/FORD_SCRIPTS/FORD_ASSET_PACK_QC_NEW/checks'
|
|
if os.path.exists(fallback_path):
|
|
sys.path.append(fallback_path)
|
|
|
|
# Add local checks path
|
|
sys.path.append(os.path.join(script_dir, 'checks'))
|
|
|
|
# Create local working directory
|
|
working_dir = os.path.join(script_dir, 'working')
|
|
os.makedirs(working_dir, exist_ok=True)
|
|
|
|
# Import the path resolver
|
|
from utils.path_resolver import update_profile_paths
|
|
|
|
# Import error reporting functionality
|
|
from checks.html_error_reporter import (
|
|
HTMLErrorReporter,
|
|
generate_zip_error_report,
|
|
generate_json_error_report,
|
|
generate_file_access_error_report,
|
|
generate_qc_check_error_report
|
|
)
|
|
|
|
def run_qc_profile(profile_path: str, input_file: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Run all QC checks defined in the given QC profile JSON file.
|
|
|
|
:param profile_path: Path to the QC profile JSON file.
|
|
:param input_file: Optional input file path that can be passed into all checks.
|
|
:return: A dictionary containing the aggregated results.
|
|
"""
|
|
# Load the QC profile
|
|
with open(profile_path, 'r', encoding='utf-8') as f:
|
|
profile = json.load(f)
|
|
|
|
if not isinstance(profile, list):
|
|
raise ValueError("QC profile must be a JSON array of check definitions.")
|
|
|
|
# Prepare the aggregated result structure
|
|
aggregated_results = {
|
|
"profile": os.path.basename(profile_path),
|
|
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
|
|
"checks": []
|
|
}
|
|
|
|
# Process each QC check
|
|
for idx, check_def in enumerate(profile, start=1):
|
|
if not isinstance(check_def, dict):
|
|
raise ValueError(f"Check #{idx} must be a dictionary object.")
|
|
|
|
script = check_def.get("script")
|
|
config = check_def.get("config", {})
|
|
|
|
if not script:
|
|
raise ValueError(f"Check #{idx} is missing a 'script' value.")
|
|
|
|
# Inject input_file into config if provided
|
|
if input_file is not None:
|
|
config["input_file"] = input_file
|
|
|
|
# Run the single check
|
|
result = run_single_check(script, config)
|
|
|
|
# Append the result to the aggregated results
|
|
aggregated_results["checks"].append({
|
|
"index": idx,
|
|
"script": script,
|
|
"config": config,
|
|
"result": result
|
|
})
|
|
|
|
return aggregated_results
|
|
|
|
|
|
def run_single_check(script: str, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Import and run a single QC check module.
|
|
|
|
The module specified by 'script' should implement a 'run_check(config: dict) -> dict' function.
|
|
|
|
:param script: The dotted path to the check script module (e.g. "checks.image_resolution_check")
|
|
:param config: Configuration dictionary for the check.
|
|
:return: A dictionary representing the QC check result.
|
|
"""
|
|
try:
|
|
module = importlib.import_module(script)
|
|
except ModuleNotFoundError as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Module not found: {script}",
|
|
"exception": str(e)
|
|
}
|
|
|
|
if not hasattr(module, "run_check"):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Module '{script}' does not implement run_check(config)."
|
|
}
|
|
|
|
result = module.run_check(config)
|
|
if not isinstance(result, dict):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"run_check did not return a dictionary for script '{script}'."
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def write_results_report(results: Dict[str, Any], reports_dir: str = "reports") -> str:
|
|
"""
|
|
Write the aggregated QC results to a timestamped directory in JSON format.
|
|
|
|
:param results: The aggregated QC results dictionary.
|
|
:param reports_dir: The base directory for reports.
|
|
:return: Path to the written report file.
|
|
"""
|
|
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
output_dir = os.path.join(reports_dir, timestamp)
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
report_path = os.path.join(output_dir, "qc_results.json")
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, indent=4)
|
|
|
|
return report_path
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run QC checks using a QC profile.")
|
|
parser.add_argument("profile", help="Path to the QC profile JSON file.")
|
|
parser.add_argument("--reports_dir", default="reports", help="Directory to store QC reports.")
|
|
parser.add_argument("--input_file", default=None, help="Optional input file to pass to checks.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Determine output filename for potential error reports
|
|
input_filename = os.path.basename(args.input_file) if args.input_file else "qc_engine"
|
|
|
|
try:
|
|
# Update the profile with correct working directory path
|
|
try:
|
|
temp_profile_path = update_profile_paths(args.profile)
|
|
except Exception as profile_error:
|
|
error_msg = f"Failed to load QC profile '{args.profile}': {str(profile_error)}"
|
|
print(f"Error: {error_msg}", file=sys.stderr)
|
|
|
|
# Generate HTML error report for profile loading failure
|
|
error_report_path = HTMLErrorReporter.generate_error_report(
|
|
error_type='qc_profile',
|
|
error_message=error_msg,
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_details={'profile_path': args.profile},
|
|
exception_info=str(profile_error)
|
|
)
|
|
print(f"Error report generated: {error_report_path}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# Run the QC checks with the updated profile
|
|
aggregated_results = run_qc_profile(temp_profile_path, args.input_file)
|
|
except Exception as qc_error:
|
|
error_msg = f"QC processing failed: {str(qc_error)}"
|
|
print(f"Error: {error_msg}", file=sys.stderr)
|
|
|
|
# Determine error type based on exception details
|
|
error_type = 'unknown'
|
|
if 'zip' in str(qc_error).lower() or 'extract' in str(qc_error).lower():
|
|
error_type = 'zip_extraction'
|
|
elif 'json' in str(qc_error).lower() or 'parse' in str(qc_error).lower():
|
|
error_type = 'json_parsing'
|
|
elif 'file' in str(qc_error).lower() or 'path' in str(qc_error).lower():
|
|
error_type = 'file_access'
|
|
elif 'check' in str(qc_error).lower() or 'module' in str(qc_error).lower():
|
|
error_type = 'check_execution'
|
|
|
|
# Generate appropriate error report
|
|
if error_type == 'zip_extraction':
|
|
error_report_path = generate_zip_error_report(
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_message=error_msg,
|
|
exception=qc_error
|
|
)
|
|
elif error_type == 'json_parsing':
|
|
error_report_path = generate_json_error_report(
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_message=error_msg,
|
|
json_file="linkingrecord.json",
|
|
exception=qc_error
|
|
)
|
|
elif error_type == 'file_access':
|
|
error_report_path = generate_file_access_error_report(
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_message=error_msg,
|
|
file_path=args.input_file,
|
|
exception=qc_error
|
|
)
|
|
elif error_type == 'check_execution':
|
|
error_report_path = generate_qc_check_error_report(
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_message=error_msg,
|
|
failed_check="qc_processing",
|
|
exception=qc_error
|
|
)
|
|
else:
|
|
error_report_path = HTMLErrorReporter.generate_error_report(
|
|
error_type='unknown',
|
|
error_message=error_msg,
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_details={'input_file': args.input_file, 'profile': args.profile},
|
|
exception_info=str(qc_error)
|
|
)
|
|
|
|
print(f"Error report generated: {error_report_path}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# Write the results report
|
|
report_path = write_results_report(aggregated_results, args.reports_dir)
|
|
print(f"QC run complete. Report written to: {report_path}")
|
|
except Exception as report_error:
|
|
error_msg = f"Failed to write QC results report: {str(report_error)}"
|
|
print(f"Error: {error_msg}", file=sys.stderr)
|
|
|
|
# Generate HTML error report for report writing failure
|
|
error_report_path = HTMLErrorReporter.generate_error_report(
|
|
error_type='unknown',
|
|
error_message=error_msg,
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_details={'stage': 'report_writing', 'target_dir': args.reports_dir},
|
|
exception_info=str(report_error)
|
|
)
|
|
print(f"Error report generated: {error_report_path}")
|
|
sys.exit(1)
|
|
|
|
except Exception as unexpected_error:
|
|
error_msg = f"Unexpected error during QC processing: {str(unexpected_error)}"
|
|
print(f"Critical Error: {error_msg}", file=sys.stderr)
|
|
|
|
# Generate HTML error report for unexpected failure
|
|
try:
|
|
error_report_path = HTMLErrorReporter.generate_error_report(
|
|
error_type='unknown',
|
|
error_message=error_msg,
|
|
filename=input_filename,
|
|
reports_dir=args.reports_dir,
|
|
error_details={'input_file': args.input_file, 'profile': args.profile},
|
|
exception_info=str(unexpected_error)
|
|
)
|
|
print(f"Error report generated: {error_report_path}")
|
|
except Exception as final_error:
|
|
print(f"Critical failure: Unable to generate error report: {final_error}", file=sys.stderr)
|
|
|
|
sys.exit(1)
|
|
|
|
finally:
|
|
# Clean up the temporary profile if one was created
|
|
try:
|
|
if 'temp_profile_path' in locals() and temp_profile_path != args.profile and os.path.exists(temp_profile_path):
|
|
os.remove(temp_profile_path)
|
|
except Exception:
|
|
pass # Ignore cleanup errors |