ford_qc/qc_module.py
2025-09-03 07:03:21 -05:00

153 lines
No EOL
5.1 KiB
Python
Executable file

import json
import os
import importlib
import datetime
from typing import Dict, Any, Optional
# Import the path resolver
from utils.path_resolver import update_profile_paths
# Import error reporting functionality
from checks.html_error_reporter import (
HTMLErrorReporter,
generate_json_error_report,
generate_file_access_error_report,
generate_qc_check_error_report
)
def run_single_check(script: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Import and run a single QC check module.
The module specified by 'script' should implement a 'run_check(config: dict) -> dict' function.
:param script: The dotted path to the check script module (e.g. "checks.image_resolution_check")
:param config: Configuration dictionary for the check.
:return: A dictionary representing the QC check result.
"""
try:
module = importlib.import_module(script)
except ModuleNotFoundError as e:
return {
"status": "error",
"error_message": f"Module not found: {script}",
"exception": str(e)
}
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to import module '{script}': {str(e)}",
"exception": str(e)
}
if not hasattr(module, "run_check"):
return {
"status": "error",
"error_message": f"Module '{script}' does not implement run_check(config)."
}
try:
result = module.run_check(config)
except Exception as e:
return {
"status": "error",
"error_message": f"Check '{script}' failed during execution: {str(e)}",
"exception": str(e),
"check_script": script
}
if not isinstance(result, dict):
return {
"status": "error",
"error_message": f"run_check did not return a dictionary for script '{script}'."
}
# Ensure the result has a status field
if "status" not in result:
result["status"] = "unknown"
return result
def run_qc_profile(profile_path: str, input_file: Optional[str] = None) -> Dict[str, Any]:
"""
Run all QC checks defined in the given QC profile JSON file.
:param profile_path: Path to the QC profile JSON file.
:param input_file: Optional input file path that can be passed into all checks.
:return: A dictionary containing the aggregated results.
"""
# Load the QC profile
try:
with open(profile_path, 'r', encoding='utf-8') as f:
profile = json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"QC profile file not found: {profile_path}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in QC profile '{profile_path}': {str(e)}")
except PermissionError:
raise PermissionError(f"Permission denied accessing QC profile: {profile_path}")
except Exception as e:
raise RuntimeError(f"Failed to load QC profile '{profile_path}': {str(e)}")
if not isinstance(profile, list):
raise ValueError(f"QC profile must be a JSON array of check definitions, got: {type(profile).__name__}")
# Prepare the aggregated result structure
aggregated_results = {
"profile": os.path.basename(profile_path),
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
"checks": []
}
# Process each QC check
for idx, check_def in enumerate(profile, start=1):
if not isinstance(check_def, dict):
raise ValueError(f"Check #{idx} must be a dictionary object.")
script = check_def.get("script")
config = check_def.get("config", {})
if not script:
raise ValueError(f"Check #{idx} is missing a 'script' value.")
# Inject input_file into config if provided
if input_file is not None:
config["input_file"] = input_file
# Run the single check
result = run_single_check(script, config)
# Append the result to the aggregated results
aggregated_results["checks"].append({
"index": idx,
"script": script,
"config": config,
"result": result
})
return aggregated_results
def run_qc_checks(profile_path: str, input_file: Optional[str] = None) -> str:
"""
Run QC checks using a QC profile and return the results as a JSON string.
:param profile_path: Path to the QC profile JSON file.
:param input_file: Optional input file to pass to checks.
:return: A JSON string representing the QC results.
"""
# Update the profile with correct working directory path
temp_profile_path = update_profile_paths(profile_path)
try:
# Run the QC checks with the updated profile
results = run_qc_profile(temp_profile_path, input_file)
return json.dumps(results, indent=4)
finally:
# Clean up the temporary profile if one was created
if temp_profile_path != profile_path and os.path.exists(temp_profile_path):
try:
os.remove(temp_profile_path)
except Exception:
pass # Ignore cleanup errors