136 lines
4.6 KiB
Python
Executable file
136 lines
4.6 KiB
Python
Executable file
import json
|
|
import os
|
|
import importlib
|
|
import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
import logging
|
|
|
|
import config
|
|
from checks.html_reporter import HTMLReporter
|
|
|
|
def run_single_check(script: str, config: Dict[str, Any], context: Dict[str, Any], check_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Import and run a single QC check module with context support.
|
|
|
|
The module should implement a 'run_check(config: dict, context: dict, check_id: str) -> dict' function.
|
|
|
|
:param script: The dotted path to the check script module
|
|
:param config: Configuration dictionary for the check
|
|
:param context: Shared context dictionary between checks
|
|
:param check_id: Unique identifier for this check
|
|
:return: QC check result dictionary
|
|
"""
|
|
try:
|
|
module = importlib.import_module(script)
|
|
except ModuleNotFoundError as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Module not found: {script}",
|
|
"exception": str(e)
|
|
}
|
|
|
|
if not hasattr(module, "run_check"):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Module '{script}' does not implement run_check(config, context, check_id)."
|
|
}
|
|
|
|
try:
|
|
result = module.run_check(config=config, context=context, check_id=check_id)
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Check '{check_id}' failed with exception",
|
|
"exception": str(e)
|
|
}
|
|
|
|
if not isinstance(result, dict):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"run_check did not return a dictionary for check '{check_id}'."
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def run_qc_profile(profile_path: str, input_file: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Run all QC checks defined in the given profile with shared context.
|
|
|
|
:param profile_path: Path to the QC profile JSON file
|
|
:param input_file: Optional input file path for checks
|
|
:return: Aggregated results with context-aware checks
|
|
"""
|
|
with open(profile_path, 'r', encoding='utf-8') as f:
|
|
profile = json.load(f)
|
|
|
|
if not isinstance(profile, list):
|
|
raise ValueError("QC profile must be a JSON array of check definitions.")
|
|
|
|
aggregated_results = {
|
|
"profile": os.path.basename(profile_path),
|
|
"timestamp": datetime.datetime.utcnow().isoformat() + 'Z',
|
|
"checks": [],
|
|
"context_snapshot": {} # Final state of the context for debugging
|
|
}
|
|
|
|
context: Dict[str, Any] = {} # Shared context between checks
|
|
executed_ids = set() # Track check IDs for uniqueness
|
|
|
|
for idx, check_def in enumerate(profile, start=1):
|
|
if not isinstance(check_def, dict):
|
|
raise ValueError(f"Check #{idx} must be a dictionary object.")
|
|
|
|
script = check_def.get("script")
|
|
check_id = check_def.get("id")
|
|
config = check_def.get("config", {})
|
|
|
|
if not script:
|
|
raise ValueError(f"Check #{idx} is missing a 'script' value.")
|
|
if not check_id:
|
|
raise ValueError(f"Check #{idx} is missing required 'id' field.")
|
|
if check_id in executed_ids:
|
|
raise ValueError(f"Duplicate check ID detected: '{check_id}'")
|
|
|
|
executed_ids.add(check_id)
|
|
|
|
# Inject common parameters
|
|
if input_file is not None:
|
|
config["input_file"] = input_file
|
|
|
|
# Run the check with shared context
|
|
result = run_single_check(script, config, context, check_id)
|
|
|
|
# Store results with check ID
|
|
aggregated_results["checks"].append({
|
|
"index": idx,
|
|
"id": check_id,
|
|
"script": script,
|
|
"config": config,
|
|
"result": result
|
|
})
|
|
|
|
# Store final context state (consider security implications for production use)
|
|
# aggregated_results["context_snapshot"] = context
|
|
|
|
return aggregated_results
|
|
|
|
|
|
def run_qc_checks(profile_path: str, input_file: str, report_path: str) -> str:
|
|
# Your existing QC execution code
|
|
json_results = run_qc_profile(profile_path, input_file)
|
|
|
|
# Generate report path components
|
|
reports_dir = report_path
|
|
input_filename = os.path.basename(input_file) if input_file else "unknown_file"
|
|
|
|
#hm_filename_parse_data = context.get("HM_filename_parse", {})
|
|
#input_filename = hm_filename_parse_data.get("short_name") if hm_filename_parse_data.get("short_name") else "couldnt_get_short_filename_from_context"
|
|
|
|
|
|
# Call reporter with correct arguments
|
|
return HTMLReporter.generate_report(
|
|
json_data=json_results,
|
|
reports_dir=reports_dir,
|
|
input_filename=input_filename
|
|
)
|