153 lines
No EOL
5.1 KiB
Python
Executable file
153 lines
No EOL
5.1 KiB
Python
Executable file
import json
|
|
import os
|
|
import importlib
|
|
import datetime
|
|
from typing import Dict, Any, Optional
|
|
|
|
# Import the path resolver
|
|
from utils.path_resolver import update_profile_paths
|
|
|
|
# Import error reporting functionality
|
|
from checks.html_error_reporter import (
|
|
HTMLErrorReporter,
|
|
generate_json_error_report,
|
|
generate_file_access_error_report,
|
|
generate_qc_check_error_report
|
|
)
|
|
|
|
def run_single_check(script: str, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Import and run a single QC check module.
|
|
|
|
The module specified by 'script' should implement a 'run_check(config: dict) -> dict' function.
|
|
|
|
:param script: The dotted path to the check script module (e.g. "checks.image_resolution_check")
|
|
:param config: Configuration dictionary for the check.
|
|
:return: A dictionary representing the QC check result.
|
|
"""
|
|
try:
|
|
module = importlib.import_module(script)
|
|
except ModuleNotFoundError as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Module not found: {script}",
|
|
"exception": str(e)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Failed to import module '{script}': {str(e)}",
|
|
"exception": str(e)
|
|
}
|
|
|
|
if not hasattr(module, "run_check"):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Module '{script}' does not implement run_check(config)."
|
|
}
|
|
|
|
try:
|
|
result = module.run_check(config)
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Check '{script}' failed during execution: {str(e)}",
|
|
"exception": str(e),
|
|
"check_script": script
|
|
}
|
|
|
|
if not isinstance(result, dict):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"run_check did not return a dictionary for script '{script}'."
|
|
}
|
|
|
|
# Ensure the result has a status field
|
|
if "status" not in result:
|
|
result["status"] = "unknown"
|
|
|
|
return result
|
|
|
|
|
|
def run_qc_profile(profile_path: str, input_file: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Run all QC checks defined in the given QC profile JSON file.
|
|
|
|
:param profile_path: Path to the QC profile JSON file.
|
|
:param input_file: Optional input file path that can be passed into all checks.
|
|
:return: A dictionary containing the aggregated results.
|
|
"""
|
|
# Load the QC profile
|
|
try:
|
|
with open(profile_path, 'r', encoding='utf-8') as f:
|
|
profile = json.load(f)
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(f"QC profile file not found: {profile_path}")
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Invalid JSON in QC profile '{profile_path}': {str(e)}")
|
|
except PermissionError:
|
|
raise PermissionError(f"Permission denied accessing QC profile: {profile_path}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to load QC profile '{profile_path}': {str(e)}")
|
|
|
|
if not isinstance(profile, list):
|
|
raise ValueError(f"QC profile must be a JSON array of check definitions, got: {type(profile).__name__}")
|
|
|
|
# Prepare the aggregated result structure
|
|
aggregated_results = {
|
|
"profile": os.path.basename(profile_path),
|
|
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
|
|
"checks": []
|
|
}
|
|
|
|
# Process each QC check
|
|
for idx, check_def in enumerate(profile, start=1):
|
|
if not isinstance(check_def, dict):
|
|
raise ValueError(f"Check #{idx} must be a dictionary object.")
|
|
|
|
script = check_def.get("script")
|
|
config = check_def.get("config", {})
|
|
|
|
if not script:
|
|
raise ValueError(f"Check #{idx} is missing a 'script' value.")
|
|
|
|
# Inject input_file into config if provided
|
|
if input_file is not None:
|
|
config["input_file"] = input_file
|
|
|
|
# Run the single check
|
|
result = run_single_check(script, config)
|
|
|
|
# Append the result to the aggregated results
|
|
aggregated_results["checks"].append({
|
|
"index": idx,
|
|
"script": script,
|
|
"config": config,
|
|
"result": result
|
|
})
|
|
|
|
return aggregated_results
|
|
|
|
|
|
def run_qc_checks(profile_path: str, input_file: Optional[str] = None) -> str:
|
|
"""
|
|
Run QC checks using a QC profile and return the results as a JSON string.
|
|
|
|
:param profile_path: Path to the QC profile JSON file.
|
|
:param input_file: Optional input file to pass to checks.
|
|
:return: A JSON string representing the QC results.
|
|
"""
|
|
# Update the profile with correct working directory path
|
|
temp_profile_path = update_profile_paths(profile_path)
|
|
|
|
try:
|
|
# Run the QC checks with the updated profile
|
|
results = run_qc_profile(temp_profile_path, input_file)
|
|
return json.dumps(results, indent=4)
|
|
finally:
|
|
# Clean up the temporary profile if one was created
|
|
if temp_profile_path != profile_path and os.path.exists(temp_profile_path):
|
|
try:
|
|
os.remove(temp_profile_path)
|
|
except Exception:
|
|
pass # Ignore cleanup errors |