import json import os import importlib import datetime from typing import Dict, Any, Optional, List import logging import config from checks.html_reporter import HTMLReporter def run_single_check(script: str, config: Dict[str, Any], context: Dict[str, Any], check_id: str) -> Dict[str, Any]: """ Import and run a single QC check module with context support. The module should implement a 'run_check(config: dict, context: dict, check_id: str) -> dict' function. :param script: The dotted path to the check script module :param config: Configuration dictionary for the check :param context: Shared context dictionary between checks :param check_id: Unique identifier for this check :return: QC check result dictionary """ try: module = importlib.import_module(script) except ModuleNotFoundError as e: return { "status": "error", "error_message": f"Module not found: {script}", "exception": str(e) } if not hasattr(module, "run_check"): return { "status": "error", "error_message": f"Module '{script}' does not implement run_check(config, context, check_id)." } try: result = module.run_check(config=config, context=context, check_id=check_id) except Exception as e: return { "status": "error", "error_message": f"Check '{check_id}' failed with exception", "exception": str(e) } if not isinstance(result, dict): return { "status": "error", "error_message": f"run_check did not return a dictionary for check '{check_id}'." } return result def run_qc_profile(profile_path: str, input_file: Optional[str] = None) -> Dict[str, Any]: """ Run all QC checks defined in the given profile with shared context. :param profile_path: Path to the QC profile JSON file :param input_file: Optional input file path for checks :return: Aggregated results with context-aware checks """ with open(profile_path, 'r', encoding='utf-8') as f: profile = json.load(f) if not isinstance(profile, list): raise ValueError("QC profile must be a JSON array of check definitions.") aggregated_results = { "profile": os.path.basename(profile_path), "timestamp": datetime.datetime.utcnow().isoformat() + 'Z', "checks": [], "context_snapshot": {} # Final state of the context for debugging } context: Dict[str, Any] = {} # Shared context between checks executed_ids = set() # Track check IDs for uniqueness for idx, check_def in enumerate(profile, start=1): if not isinstance(check_def, dict): raise ValueError(f"Check #{idx} must be a dictionary object.") script = check_def.get("script") check_id = check_def.get("id") config = check_def.get("config", {}) if not script: raise ValueError(f"Check #{idx} is missing a 'script' value.") if not check_id: raise ValueError(f"Check #{idx} is missing required 'id' field.") if check_id in executed_ids: raise ValueError(f"Duplicate check ID detected: '{check_id}'") executed_ids.add(check_id) # Inject common parameters if input_file is not None: config["input_file"] = input_file # Run the check with shared context result = run_single_check(script, config, context, check_id) # Store results with check ID aggregated_results["checks"].append({ "index": idx, "id": check_id, "script": script, "config": config, "result": result }) # Store final context state (consider security implications for production use) # aggregated_results["context_snapshot"] = context return aggregated_results def run_qc_checks(profile_path: str, input_file: str, report_path: str) -> str: # Your existing QC execution code json_results = run_qc_profile(profile_path, input_file) # Generate report path components reports_dir = report_path input_filename = os.path.basename(input_file) if input_file else "unknown_file" #hm_filename_parse_data = context.get("HM_filename_parse", {}) #input_filename = hm_filename_parse_data.get("short_name") if hm_filename_parse_data.get("short_name") else "couldnt_get_short_filename_from_context" # Call reporter with correct arguments return HTMLReporter.generate_report( json_data=json_results, reports_dir=reports_dir, input_filename=input_filename )