376 lines
12 KiB
Python
376 lines
12 KiB
Python
"""Parser for QC HTML reports."""
|
|
import re
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
from bs4 import BeautifulSoup
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class QCReportParser:
|
|
"""Parser for extracting data from QC HTML reports."""
|
|
|
|
def __init__(self, html_content: str):
|
|
"""
|
|
Initialize parser with HTML content.
|
|
|
|
Args:
|
|
html_content: HTML report content as string
|
|
"""
|
|
self.html_content = html_content
|
|
self.soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
def parse(self) -> Dict:
|
|
"""
|
|
Parse HTML report and extract structured data.
|
|
|
|
Returns:
|
|
Dictionary with parsed report data
|
|
"""
|
|
try:
|
|
return {
|
|
'filename': self._extract_filename(),
|
|
'timestamp': self._extract_timestamp(),
|
|
'checks': self._extract_checks(),
|
|
'summary': self._generate_summary()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error parsing report: {e}")
|
|
return {
|
|
'filename': 'Unknown',
|
|
'timestamp': None,
|
|
'checks': [],
|
|
'summary': {'total': 0, 'passed': 0, 'error': 0, 'skipped': 0},
|
|
'error': str(e)
|
|
}
|
|
|
|
def _extract_filename(self) -> str:
|
|
"""Extract filename from report title."""
|
|
try:
|
|
title = self.soup.find('title')
|
|
if title:
|
|
# Extract from "QC Report - filename.pdf"
|
|
match = re.search(r'QC Report - (.+)$', title.text)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
# Fallback: look for h1 with filename
|
|
h1 = self.soup.find('h1', class_='display-4')
|
|
if h1:
|
|
match = re.search(r'QC Report: (.+)$', h1.text)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return 'Unknown'
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not extract filename: {e}")
|
|
return 'Unknown'
|
|
|
|
def _extract_timestamp(self) -> Optional[str]:
|
|
"""Extract generation timestamp from report."""
|
|
try:
|
|
# Look for timestamp in header
|
|
timestamp_elem = self.soup.find('p', class_='text-muted')
|
|
if timestamp_elem:
|
|
# Extract from "Generated at: 2025-11-06 11:15:45"
|
|
match = re.search(r'Generated at: (.+)$', timestamp_elem.text)
|
|
if match:
|
|
return match.group(1).strip()
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not extract timestamp: {e}")
|
|
return None
|
|
|
|
def _extract_checks(self) -> List[Dict]:
|
|
"""Extract all QC checks from the report."""
|
|
checks = []
|
|
|
|
try:
|
|
# Find all accordion items (each represents a check)
|
|
accordion_items = self.soup.find_all('div', class_='accordion-item')
|
|
|
|
for idx, item in enumerate(accordion_items, start=1):
|
|
check = self._parse_check_item(item, idx)
|
|
if check:
|
|
checks.append(check)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting checks: {e}")
|
|
|
|
return checks
|
|
|
|
def _parse_check_item(self, item, index: int) -> Optional[Dict]:
|
|
"""
|
|
Parse a single check accordion item.
|
|
|
|
Args:
|
|
item: BeautifulSoup element for accordion item
|
|
index: Check index number
|
|
|
|
Returns:
|
|
Dictionary with check data
|
|
"""
|
|
try:
|
|
# Extract status badge
|
|
badge = item.find('span', class_='badge')
|
|
status = 'unknown'
|
|
if badge:
|
|
badge_classes = badge.get('class', [])
|
|
if 'bg-success' in badge_classes:
|
|
status = 'passed'
|
|
elif 'bg-danger' in badge_classes:
|
|
status = 'error'
|
|
elif 'bg-warning' in badge_classes:
|
|
status = 'warning'
|
|
elif 'bg-secondary' in badge_classes:
|
|
status = 'skipped'
|
|
|
|
# Also get text content
|
|
status_text = badge.text.strip()
|
|
|
|
# Extract check name and description from button
|
|
button = item.find('button', class_='accordion-button')
|
|
check_name = 'Unknown'
|
|
description = ''
|
|
|
|
if button:
|
|
# Remove badge to get just the check text
|
|
button_copy = button.text
|
|
if badge:
|
|
button_copy = button_copy.replace(badge.text, '')
|
|
|
|
# Parse "check_id: description"
|
|
parts = button_copy.strip().split(':', 1)
|
|
if len(parts) == 2:
|
|
check_name = parts[0].strip()
|
|
description = parts[1].strip()
|
|
else:
|
|
check_name = button_copy.strip()
|
|
|
|
# Extract error message if present
|
|
error_message = None
|
|
error_section = item.find('div', class_='error-section')
|
|
if error_section:
|
|
error_text = error_section.find('p')
|
|
if error_text:
|
|
error_message = error_text.text.strip()
|
|
|
|
# Extract configuration
|
|
config = self._extract_section_data(item, 'Configuration')
|
|
|
|
# Extract results
|
|
results = self._extract_section_data(item, 'Results')
|
|
|
|
return {
|
|
'index': index,
|
|
'name': check_name,
|
|
'description': description,
|
|
'status': status,
|
|
'status_text': status_text if badge else status.upper(),
|
|
'error_message': error_message,
|
|
'config': config,
|
|
'results': results
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing check item {index}: {e}")
|
|
return None
|
|
|
|
def _extract_section_data(self, item, section_name: str) -> Dict:
|
|
"""
|
|
Extract data from Configuration or Results section.
|
|
|
|
Args:
|
|
item: BeautifulSoup element
|
|
section_name: Name of section (e.g., 'Configuration', 'Results')
|
|
|
|
Returns:
|
|
Dictionary with section data
|
|
"""
|
|
data = {}
|
|
|
|
try:
|
|
# Find the h5 with section name
|
|
section_headers = item.find_all('h5')
|
|
|
|
for header in section_headers:
|
|
if section_name in header.text:
|
|
# Get the next ul.details-list
|
|
details_list = header.find_next('ul', class_='details-list')
|
|
|
|
if details_list:
|
|
# Extract all list items
|
|
items = details_list.find_all('li', recursive=False)
|
|
|
|
for li in items:
|
|
# Extract key-value pairs
|
|
strong = li.find('strong')
|
|
if strong:
|
|
key = strong.text.replace(':', '').strip()
|
|
|
|
# Get value (text after strong tag)
|
|
value_text = strong.next_sibling
|
|
|
|
if isinstance(value_text, str):
|
|
value = value_text.strip()
|
|
else:
|
|
# Handle nested details
|
|
nested = li.find('div', class_='nested-details')
|
|
if nested:
|
|
value = self._extract_nested_data(nested)
|
|
else:
|
|
value = li.get_text(strip=True).replace(key, '', 1)
|
|
|
|
data[key] = value
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting section {section_name}: {e}")
|
|
|
|
return data
|
|
|
|
def _extract_nested_data(self, nested_elem) -> Dict:
|
|
"""Extract data from nested details sections."""
|
|
nested_data = {}
|
|
|
|
try:
|
|
items = nested_elem.find_all('li', recursive=False)
|
|
|
|
for li in items:
|
|
strong = li.find('strong')
|
|
if strong:
|
|
key = strong.text.replace(':', '').strip()
|
|
value_text = strong.next_sibling
|
|
|
|
if isinstance(value_text, str):
|
|
value = value_text.strip()
|
|
else:
|
|
value = li.get_text(strip=True).replace(key, '', 1)
|
|
|
|
nested_data[key] = value
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting nested data: {e}")
|
|
|
|
return nested_data
|
|
|
|
def _generate_summary(self) -> Dict:
|
|
"""Generate summary statistics from checks."""
|
|
checks = self._extract_checks()
|
|
|
|
summary = {
|
|
'total': len(checks),
|
|
'passed': 0,
|
|
'error': 0,
|
|
'warning': 0,
|
|
'skipped': 0,
|
|
'unknown': 0
|
|
}
|
|
|
|
for check in checks:
|
|
status = check.get('status', 'unknown')
|
|
if status in summary:
|
|
summary[status] += 1
|
|
|
|
return summary
|
|
|
|
def get_job_number(self) -> Optional[str]:
|
|
"""
|
|
Extract job/reference number from the report.
|
|
|
|
Returns:
|
|
Job number string or None
|
|
"""
|
|
try:
|
|
checks = self._extract_checks()
|
|
|
|
# Look for HM_filename_parse check
|
|
for check in checks:
|
|
if 'filename_parse' in check.get('name', '').lower():
|
|
results = check.get('results', {})
|
|
|
|
# Look for Reference or reference field
|
|
if 'Reference' in results:
|
|
return results['Reference']
|
|
|
|
# Check in parsed data
|
|
if 'Parsed' in results:
|
|
parsed = results['Parsed']
|
|
if isinstance(parsed, dict) and 'Reference' in parsed:
|
|
return parsed['Reference']
|
|
|
|
# Fallback: try to extract from filename
|
|
filename = self._extract_filename()
|
|
match = re.search(r'(\d{4,5}-\d{2})', filename)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not extract job number: {e}")
|
|
return None
|
|
|
|
|
|
def aggregate_reports(parsed_reports: List[Dict]) -> Dict:
|
|
"""
|
|
Aggregate multiple parsed reports into a summary.
|
|
|
|
Args:
|
|
parsed_reports: List of parsed report dictionaries
|
|
|
|
Returns:
|
|
Aggregated summary dictionary
|
|
"""
|
|
if not parsed_reports:
|
|
return {
|
|
'total_files': 0,
|
|
'total_checks': 0,
|
|
'overall_status': 'unknown',
|
|
'files_with_errors': [],
|
|
'summary': {'passed': 0, 'error': 0, 'warning': 0, 'skipped': 0}
|
|
}
|
|
|
|
total_files = len(parsed_reports)
|
|
total_checks = sum(report['summary']['total'] for report in parsed_reports)
|
|
|
|
files_with_errors = []
|
|
overall_summary = {
|
|
'passed': 0,
|
|
'error': 0,
|
|
'warning': 0,
|
|
'skipped': 0
|
|
}
|
|
|
|
for report in parsed_reports:
|
|
summary = report['summary']
|
|
|
|
# Aggregate counts
|
|
for status in overall_summary:
|
|
overall_summary[status] += summary.get(status, 0)
|
|
|
|
# Track files with errors
|
|
if summary.get('error', 0) > 0:
|
|
files_with_errors.append({
|
|
'filename': report['filename'],
|
|
'error_count': summary['error'],
|
|
'timestamp': report['timestamp']
|
|
})
|
|
|
|
# Determine overall status
|
|
overall_status = 'passed'
|
|
if overall_summary['error'] > 0:
|
|
overall_status = 'error'
|
|
elif overall_summary['warning'] > 0:
|
|
overall_status = 'warning'
|
|
|
|
return {
|
|
'total_files': total_files,
|
|
'total_checks': total_checks,
|
|
'overall_status': overall_status,
|
|
'files_with_errors': files_with_errors,
|
|
'summary': overall_summary
|
|
}
|