ford_qc/qc_engine.py
michael 53e06fff63 Add environment configuration support for multi-deployment (prod/dev)
- Add utils/config.py module using python-dotenv for centralized config
- Externalize Box folder IDs, paths, and timeout settings to .env files
- Create .env.example template with all configuration variables
- Add separate systemd service files for prod and dev environments
- Update ford_qc_box_hotfolder_process.py to use config module
- Update qc_engine.py and path_resolver.py with optional config support
- Maintain backwards compatibility with hardcoded defaults
- Update documentation in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-18 06:10:43 -06:00

295 lines
No EOL
11 KiB
Python

import json
import os
import importlib
import datetime
import sys
from typing import Dict, Any
import argparse
# Get script directory and setup paths
script_dir = os.path.dirname(os.path.abspath(__file__))
# Add script directory to path for imports
sys.path.insert(0, script_dir)
# Try to load configuration (optional for qc_engine - can run without .env)
try:
from utils.config import config
# Add fallback path from config if available
fallback_checks_path = os.path.join(
os.path.dirname(config.FALLBACK_WORKING_DIR),
'checks'
)
if os.path.exists(fallback_checks_path):
sys.path.append(fallback_checks_path)
except (ImportError, Exception):
# Config module not available or not configured - use hardcoded fallback for backwards compatibility
fallback_path = '/home/box-cli/FORD_SCRIPTS/FORD_ASSET_PACK_QC_NEW/checks'
if os.path.exists(fallback_path):
sys.path.append(fallback_path)
# Add local checks path
sys.path.append(os.path.join(script_dir, 'checks'))
# Create local working directory
working_dir = os.path.join(script_dir, 'working')
os.makedirs(working_dir, exist_ok=True)
# Import the path resolver
from utils.path_resolver import update_profile_paths
# Import error reporting functionality
from checks.html_error_reporter import (
HTMLErrorReporter,
generate_zip_error_report,
generate_json_error_report,
generate_file_access_error_report,
generate_qc_check_error_report
)
def run_qc_profile(profile_path: str, input_file: str = None) -> Dict[str, Any]:
"""
Run all QC checks defined in the given QC profile JSON file.
:param profile_path: Path to the QC profile JSON file.
:param input_file: Optional input file path that can be passed into all checks.
:return: A dictionary containing the aggregated results.
"""
# Load the QC profile
with open(profile_path, 'r', encoding='utf-8') as f:
profile = json.load(f)
if not isinstance(profile, list):
raise ValueError("QC profile must be a JSON array of check definitions.")
# Prepare the aggregated result structure
aggregated_results = {
"profile": os.path.basename(profile_path),
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
"checks": []
}
# Process each QC check
for idx, check_def in enumerate(profile, start=1):
if not isinstance(check_def, dict):
raise ValueError(f"Check #{idx} must be a dictionary object.")
script = check_def.get("script")
config = check_def.get("config", {})
if not script:
raise ValueError(f"Check #{idx} is missing a 'script' value.")
# Inject input_file into config if provided
if input_file is not None:
config["input_file"] = input_file
# Run the single check
result = run_single_check(script, config)
# Append the result to the aggregated results
aggregated_results["checks"].append({
"index": idx,
"script": script,
"config": config,
"result": result
})
return aggregated_results
def run_single_check(script: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Import and run a single QC check module.
The module specified by 'script' should implement a 'run_check(config: dict) -> dict' function.
:param script: The dotted path to the check script module (e.g. "checks.image_resolution_check")
:param config: Configuration dictionary for the check.
:return: A dictionary representing the QC check result.
"""
try:
module = importlib.import_module(script)
except ModuleNotFoundError as e:
return {
"status": "error",
"error_message": f"Module not found: {script}",
"exception": str(e)
}
if not hasattr(module, "run_check"):
return {
"status": "error",
"error_message": f"Module '{script}' does not implement run_check(config)."
}
result = module.run_check(config)
if not isinstance(result, dict):
return {
"status": "error",
"error_message": f"run_check did not return a dictionary for script '{script}'."
}
return result
def write_results_report(results: Dict[str, Any], reports_dir: str = "reports") -> str:
"""
Write the aggregated QC results to a timestamped directory in JSON format.
:param results: The aggregated QC results dictionary.
:param reports_dir: The base directory for reports.
:return: Path to the written report file.
"""
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
output_dir = os.path.join(reports_dir, timestamp)
os.makedirs(output_dir, exist_ok=True)
report_path = os.path.join(output_dir, "qc_results.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=4)
return report_path
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run QC checks using a QC profile.")
parser.add_argument("profile", help="Path to the QC profile JSON file.")
parser.add_argument("--reports_dir", default="reports", help="Directory to store QC reports.")
parser.add_argument("--input_file", default=None, help="Optional input file to pass to checks.")
args = parser.parse_args()
# Determine output filename for potential error reports
input_filename = os.path.basename(args.input_file) if args.input_file else "qc_engine"
try:
# Update the profile with correct working directory path
try:
temp_profile_path = update_profile_paths(args.profile)
except Exception as profile_error:
error_msg = f"Failed to load QC profile '{args.profile}': {str(profile_error)}"
print(f"Error: {error_msg}", file=sys.stderr)
# Generate HTML error report for profile loading failure
error_report_path = HTMLErrorReporter.generate_error_report(
error_type='qc_profile',
error_message=error_msg,
filename=input_filename,
reports_dir=args.reports_dir,
error_details={'profile_path': args.profile},
exception_info=str(profile_error)
)
print(f"Error report generated: {error_report_path}")
sys.exit(1)
try:
# Run the QC checks with the updated profile
aggregated_results = run_qc_profile(temp_profile_path, args.input_file)
except Exception as qc_error:
error_msg = f"QC processing failed: {str(qc_error)}"
print(f"Error: {error_msg}", file=sys.stderr)
# Determine error type based on exception details
error_type = 'unknown'
if 'zip' in str(qc_error).lower() or 'extract' in str(qc_error).lower():
error_type = 'zip_extraction'
elif 'json' in str(qc_error).lower() or 'parse' in str(qc_error).lower():
error_type = 'json_parsing'
elif 'file' in str(qc_error).lower() or 'path' in str(qc_error).lower():
error_type = 'file_access'
elif 'check' in str(qc_error).lower() or 'module' in str(qc_error).lower():
error_type = 'check_execution'
# Generate appropriate error report
if error_type == 'zip_extraction':
error_report_path = generate_zip_error_report(
filename=input_filename,
reports_dir=args.reports_dir,
error_message=error_msg,
exception=qc_error
)
elif error_type == 'json_parsing':
error_report_path = generate_json_error_report(
filename=input_filename,
reports_dir=args.reports_dir,
error_message=error_msg,
json_file="linkingrecord.json",
exception=qc_error
)
elif error_type == 'file_access':
error_report_path = generate_file_access_error_report(
filename=input_filename,
reports_dir=args.reports_dir,
error_message=error_msg,
file_path=args.input_file,
exception=qc_error
)
elif error_type == 'check_execution':
error_report_path = generate_qc_check_error_report(
filename=input_filename,
reports_dir=args.reports_dir,
error_message=error_msg,
failed_check="qc_processing",
exception=qc_error
)
else:
error_report_path = HTMLErrorReporter.generate_error_report(
error_type='unknown',
error_message=error_msg,
filename=input_filename,
reports_dir=args.reports_dir,
error_details={'input_file': args.input_file, 'profile': args.profile},
exception_info=str(qc_error)
)
print(f"Error report generated: {error_report_path}")
sys.exit(1)
try:
# Write the results report
report_path = write_results_report(aggregated_results, args.reports_dir)
print(f"QC run complete. Report written to: {report_path}")
except Exception as report_error:
error_msg = f"Failed to write QC results report: {str(report_error)}"
print(f"Error: {error_msg}", file=sys.stderr)
# Generate HTML error report for report writing failure
error_report_path = HTMLErrorReporter.generate_error_report(
error_type='unknown',
error_message=error_msg,
filename=input_filename,
reports_dir=args.reports_dir,
error_details={'stage': 'report_writing', 'target_dir': args.reports_dir},
exception_info=str(report_error)
)
print(f"Error report generated: {error_report_path}")
sys.exit(1)
except Exception as unexpected_error:
error_msg = f"Unexpected error during QC processing: {str(unexpected_error)}"
print(f"Critical Error: {error_msg}", file=sys.stderr)
# Generate HTML error report for unexpected failure
try:
error_report_path = HTMLErrorReporter.generate_error_report(
error_type='unknown',
error_message=error_msg,
filename=input_filename,
reports_dir=args.reports_dir,
error_details={'input_file': args.input_file, 'profile': args.profile},
exception_info=str(unexpected_error)
)
print(f"Error report generated: {error_report_path}")
except Exception as final_error:
print(f"Critical failure: Unable to generate error report: {final_error}", file=sys.stderr)
sys.exit(1)
finally:
# Clean up the temporary profile if one was created
try:
if 'temp_profile_path' in locals() and temp_profile_path != args.profile and os.path.exists(temp_profile_path):
os.remove(temp_profile_path)
except Exception:
pass # Ignore cleanup errors