From 5c0049197bfa573d4e7a02283c793309a2573a6c Mon Sep 17 00:00:00 2001 From: Vadym Samoilenko Date: Thu, 12 Mar 2026 18:34:43 +0000 Subject: [PATCH] Improve table parsing: scope attrs, captions, per-table diagnostics; speed: cap images at 10, 5 workers, 30s timeout Table check now: - Reports row count, TH cell count, TD cell count per table - Checks each TH cell for scope attribute (col/row/colgroup/rowgroup) - Warns on complex tables (>6 cells) missing Caption element - _analyze_table() returns bool so overall SUCCESS only shown when all tables pass Image analysis: - Skip images < 2048 bytes (decorative/icons) - Cap at 10 images per document - Increase ThreadPoolExecutor workers to 5 - 30s per-image timeout Co-Authored-By: Claude Sonnet 4.6 --- enterprise_pdf_checker.py | 230 ++++++++++++++++++++++++++++++++------ 1 file changed, 197 insertions(+), 33 deletions(-) diff --git a/enterprise_pdf_checker.py b/enterprise_pdf_checker.py index 1ce7c58..b6abfe1 100644 --- a/enterprise_pdf_checker.py +++ b/enterprise_pdf_checker.py @@ -693,6 +693,15 @@ class EnterprisePDFChecker: logger.info(f"Found {total_images} images to analyze...") + # Cap analysis: skip very small images (likely decorative/icons) + image_tasks = [t for t in image_tasks if self._image_data_size(t[0]) > 2048] + + # Limit to 10 images max — more would just waste API calls on brochure backgrounds + MAX_IMAGES = 10 + if len(image_tasks) > MAX_IMAGES: + logger.info(f"Capping image analysis at {MAX_IMAGES} (of {len(image_tasks)}) images") + image_tasks = image_tasks[:MAX_IMAGES] + # Skip AI analysis in quick mode if self.quick_mode: logger.info("Skipping AI image analysis (quick mode)") @@ -718,8 +727,13 @@ class EnterprisePDFChecker: analysis = cached_result result['cached'] = True else: - # Analyze with Claude - analysis = self._analyze_image_with_claude(image_data) + # Analyze with Claude (timeout via concurrent.futures) + with ThreadPoolExecutor(max_workers=1) as img_exec: + future = img_exec.submit(self._analyze_image_with_claude, image_data) + try: + analysis = future.result(timeout=30) + except Exception: + analysis = None if analysis and 'error' not in analysis: self.cache.set(cache_key, analysis) result['cached'] = False @@ -740,7 +754,7 @@ class EnterprisePDFChecker: return result # Use ThreadPoolExecutor for parallel processing - max_workers = 3 if not self.quick_mode else 1 + max_workers = 5 if not self.quick_mode else 1 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(analyze_single_image, task): task for task in image_tasks} @@ -1261,41 +1275,187 @@ Respond in JSON format: ) def _check_tables(self): - """Check table accessibility""" - # Basic table detection - has_tables = False + """Check table accessibility using PDF structure tree (tagged tables).""" + catalog = self.pdf_reader.trailer.get("/Root", {}) + struct_tree = catalog.get("/StructTreeRoot") - for i, page in enumerate(self.pdf_plumber.pages): - # Use find_tables to get table objects with coordinates - table_objects = page.find_tables() - if table_objects: - has_tables = True - for table_idx, table in enumerate(table_objects): - # Get table bounding box - coords = { - 'x0': table.bbox[0], - 'y0': table.bbox[1], - 'x1': table.bbox[2], - 'y1': table.bbox[3] - } - self.add_issue( - Severity.WARNING, - "Tables", - f"Page {i+1}, Table {table_idx+1}: Verify table structure and headers", - wcag_criterion="1.3.1", - recommendation="Ensure tables have proper headers and structure tags", - page_number=i+1, - coordinates=coords - ) - - if not has_tables: + tables_found = 0 + tables_ok = 0 + + if struct_tree: + def walk(node, depth=0): + nonlocal tables_found, tables_ok + if depth > 50: + return + try: + obj = node.get_object() if hasattr(node, 'get_object') else node + if not isinstance(obj, dict): + return + role = obj.get("/S") or obj.get("/Type") + if role and str(role) == "/Table": + tables_found += 1 + ok = self._analyze_table(obj, tables_found) + if ok: + tables_ok += 1 + return # don't recurse into table internals + kids = obj.get("/K", []) + if not isinstance(kids, list): + kids = [kids] + for kid in kids: + if kid is not None: + walk(kid, depth + 1) + except Exception: + pass + + try: + walk(struct_tree) + except Exception as e: + logger.warning(f"Structure tree walk failed: {e}") + + if tables_found == 0: + # Fallback: visual detection via pdfplumber (for untagged docs) + visual_tables = 0 + for i, page in enumerate(self.pdf_plumber.pages): + try: + tbls = page.find_tables() + visual_tables += len(tbls) + except Exception: + pass + + if visual_tables > 0: + self.add_issue( + Severity.WARNING, + "Tables", + f"{visual_tables} visual table(s) detected but not tagged in structure tree", + wcag_criterion="1.3.1", + recommendation="Tag tables with proper Table/TR/TH/TD structure elements" + ) + else: + self.add_issue( + Severity.INFO, + "Tables", + "No tables detected in document", + wcag_criterion="1.3.1" + ) + elif tables_ok == tables_found: self.add_issue( - Severity.INFO, + Severity.SUCCESS, "Tables", - "No tables detected", + f"{tables_found} table(s) with proper header and scope structure", wcag_criterion="1.3.1" ) - + + def _analyze_table(self, table_obj: dict, table_num: int) -> bool: + """Analyse a single /Table structure element. Returns True if no issues found.""" + kids = table_obj.get("/K", []) + if not isinstance(kids, list): + kids = [kids] + + stats = { + 'rows': 0, 'th_cells': 0, 'td_cells': 0, + 'th_with_scope': 0, 'has_caption': False, + } + self._collect_table_stats(kids, stats) + + issues_added = False + total_cells = stats['th_cells'] + stats['td_cells'] + + if stats['rows'] == 0 and total_cells == 0: + self.add_issue( + Severity.WARNING, + "Tables", + f"Table {table_num}: empty — no TR/TH/TD elements found in structure tree", + wcag_criterion="1.3.1", + recommendation="Ensure the table is properly tagged with TR rows and TH/TD cells" + ) + return False + + if stats['th_cells'] == 0: + self.add_issue( + Severity.ERROR, + "Tables", + f"Table {table_num}: no header cells (TH) — {stats['rows']} row(s), {total_cells} data cell(s). " + f"Screen readers cannot identify column or row headers.", + wcag_criterion="1.3.1", + recommendation="Mark header cells as TH with scope='col' (column headers) or scope='row' (row headers)" + ) + issues_added = True + elif stats['th_with_scope'] < stats['th_cells']: + missing = stats['th_cells'] - stats['th_with_scope'] + self.add_issue( + Severity.WARNING, + "Tables", + f"Table {table_num}: {missing} of {stats['th_cells']} TH header cell(s) missing scope attribute", + wcag_criterion="1.3.1", + recommendation="Add scope='col' to column headers and scope='row' to row headers" + ) + issues_added = True + + if not stats['has_caption'] and total_cells > 6: + self.add_issue( + Severity.WARNING, + "Tables", + f"Table {table_num}: no Caption element ({stats['rows']} rows, ~{total_cells} cells). " + f"A visible caption helps all users understand the table's purpose.", + wcag_criterion="1.3.1", + recommendation="Add a Caption as the first child of the Table element" + ) + issues_added = True + + return not issues_added + + def _collect_table_stats(self, kids: list, stats: dict, depth: int = 0): + """Recursively collect structural stats from a table's children.""" + if depth > 15: + return + for kid in kids: + try: + obj = kid.get_object() if hasattr(kid, 'get_object') else kid + if not isinstance(obj, dict): + continue + role = str(obj.get("/S") or obj.get("/Type") or "") + + if role == "/TR": + stats['rows'] += 1 + elif role == "/TH": + stats['th_cells'] += 1 + if self._th_has_scope(obj): + stats['th_with_scope'] += 1 + elif role == "/TD": + stats['td_cells'] += 1 + elif role == "/Caption": + stats['has_caption'] = True + + sub_kids = obj.get("/K", []) + if not isinstance(sub_kids, list): + sub_kids = [sub_kids] + if sub_kids: + self._collect_table_stats(sub_kids, stats, depth + 1) + except Exception: + continue + + def _th_has_scope(self, th_obj: dict) -> bool: + """Return True if a TH element carries a Scope attribute.""" + attrs = th_obj.get("/A") + if not attrs: + return False + try: + # /A can be a single attribute dict or a list of dicts + a = attrs.get_object() if hasattr(attrs, 'get_object') else attrs + if isinstance(a, dict): + return "/Scope" in a + if isinstance(a, list): + for item in a: + try: + d = item.get_object() if hasattr(item, 'get_object') else item + if isinstance(d, dict) and "/Scope" in d: + return True + except Exception: + pass + except Exception: + pass + return False + def _check_reading_order(self): """Check reading order""" catalog = self.pdf_reader.trailer.get("/Root", {}) @@ -1474,6 +1634,10 @@ Respond in JSON format: except Exception as e: return None + def _image_data_size(self, image_data: bytes) -> int: + """Return byte size of image data — used to filter out tiny decorative images.""" + return len(image_data) if image_data else 0 + def _generate_page_images(self, output_dir: Path, dpi: int = 150): """Generate PNG images for each page for visual display""" if not self.generate_images: