master_adapt_detect/advanced_splitter.py
2025-10-01 14:32:55 -05:00

409 lines
No EOL
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Advanced Panel Splitter Module - Edge detection and gutter analysis for panel splitting
"""
import os
import cv2
import numpy as np
from typing import List, Dict, Tuple, Optional
from pathlib import Path
from PIL import Image
class AdvancedPanelSplitter:
"""
Advanced panel splitter using edge detection and gutter analysis
for more accurate splitting of horizontal multi-panel marketing layouts.
"""
def __init__(self, percentile: float = 10, min_gap: int = 5, debug: bool = False):
"""
Initialize the AdvancedPanelSplitter
Args:
percentile (float): Percentile threshold for detecting gutters (0-100)
min_gap (int): Minimum consecutive low-energy columns for gutter detection
debug (bool): Enable debug mode for visualization
"""
self.percentile = percentile
self.min_gap = min_gap
self.debug = debug
self.debug_dir = "debug_advanced_splitting"
if self.debug and not os.path.exists(self.debug_dir):
os.makedirs(self.debug_dir)
def find_boundaries_auto(self, img_gray: np.ndarray) -> List[int]:
"""
Locate column indices that represent gutters between panels.
Parameters
----------
img_gray : np.ndarray
Grayscale image (H, W).
Returns
-------
List[int]
Sorted list of boundary xcoordinates (including 0 and width1).
"""
# Vertical Sobel to highlight vertical edges
sobelx = cv2.Sobel(img_gray, cv2.CV_64F, 1, 0, ksize=3)
col_energy = np.abs(sobelx).sum(axis=0) # 1D edge energy profile
thresh = np.percentile(col_energy, self.percentile)
low_energy_cols = np.where(col_energy < thresh)[0]
if low_energy_cols.size == 0: # fallback to equidistant split in worst case
return [0, img_gray.shape[1] - 1]
# Group consecutive columns
clusters, current = [], [low_energy_cols[0]]
for c in low_energy_cols[1:]:
if c == current[-1] + 1:
current.append(c)
else:
clusters.append(current)
current = [c]
clusters.append(current)
# Keep clusters that are wide enough (filter noise)
clusters = [cl for cl in clusters if len(cl) >= self.min_gap]
# Use the centre of each cluster as the boundary position
boundaries = [0] + [int(np.mean(cl)) for cl in clusters] + [img_gray.shape[1] - 1]
boundaries = sorted(list(set(boundaries))) # deduplicate & sort
return boundaries
def split_image(self, img: Image.Image, boundaries: List[int], out_dir: Path, stem: str) -> List[Dict]:
"""
Crop and save each panel, returning split information.
Parameters
----------
img : PIL.Image.Image
boundaries : List[int]
Sorted x positions of panel borders.
out_dir : Path
Where to write files.
stem : str
Base name for panel files.
Returns
-------
List[Dict]
List of split information with image data and metadata
"""
out_dir.mkdir(parents=True, exist_ok=True)
splits = []
for i in range(len(boundaries) - 1):
left = boundaries[i]
right = boundaries[i + 1]
if right - left < 5: # skip spurious zerowidth crops
continue
panel = img.crop((left, 0, right, img.height))
# Convert PIL image to OpenCV format for consistency
panel_cv = cv2.cvtColor(np.array(panel), cv2.COLOR_RGB2BGR)
# Save panel file if debug mode
if self.debug:
panel_path = out_dir / f"{stem}_panel_{i+1:02d}.png"
panel.save(panel_path, "PNG")
print(f"Saved panel {i+1}{stem}_panel_{i+1:02d}.png")
splits.append({
'image': panel_cv,
'bounds': (left, 0, right - left, img.height),
'confidence': 0.9, # High confidence for advanced method
'method': 'advanced_edge_detection'
})
return splits
def split_layout_and_match(self, layout_path: str, master_images: List[str],
detector_instance=None, n_panels: Optional[int] = None) -> Dict:
"""
Main method to split a layout using advanced edge detection and match splits to master images
Args:
layout_path (str): Path to the layout image
master_images (List[str]): List of master image paths
detector_instance: The detector instance to use for matching
n_panels (int, optional): If provided, split into this many equalwidth panels
Returns:
Dict: Detection results with matches from all splits
"""
# Load image
img = Image.open(layout_path).convert("RGB")
img_gray = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2GRAY)
print(f"Processing {os.path.basename(layout_path)} with advanced splitting")
print(f"Image dimensions: {img.width}x{img.height}")
print(f"Percentile threshold: {self.percentile}, Min gap: {self.min_gap}")
# Determine split boundaries
if n_panels:
# Equally spaced boundaries
w = img.width
step = w / n_panels
boundaries = [0] + [int(round(step * k)) for k in range(1, n_panels)] + [w - 1]
print(f"Using fixed {n_panels} panels with equal spacing")
else:
boundaries = self.find_boundaries_auto(img_gray)
print(f"Auto-detected {len(boundaries) - 1} panels")
# Create output directory for splits if debug mode
out_dir = Path(self.debug_dir) if self.debug else Path("/tmp/advanced_splits")
stem = Path(layout_path).stem
# Split the image
splits = self.split_image(img, boundaries, out_dir, stem)
if not splits:
print("No splits detected, returning empty results")
return {
'layout_path': layout_path,
'detected_masters': [],
'panel_count': 0,
'split_mode': 'advanced',
'splits_generated': 0,
'percentile': self.percentile,
'min_gap': self.min_gap
}
print(f"Generated {len(splits)} splits using advanced method")
# Match each split to master images
all_matches = []
split_results = []
for i, split_info in enumerate(splits):
print(f"Processing split {i+1}/{len(splits)}")
# Save split image temporarily for matching
split_image = split_info['image']
temp_split_path = f"/tmp/advanced_split_{i}.jpg"
cv2.imwrite(temp_split_path, split_image)
# Match this split to master images using existing inlier analysis
if hasattr(detector_instance, 'match_split_to_masters'):
split_matches = detector_instance.match_split_to_masters(
temp_split_path, master_images
)
else:
# Use basic inlier analysis if method doesn't exist
split_matches = self._match_split_basic(temp_split_path, master_images)
# Add split metadata to matches
for match in split_matches:
match['split_index'] = i
match['split_bounds'] = split_info['bounds']
match['split_confidence'] = split_info['confidence']
match['split_method'] = 'advanced_edge_detection'
all_matches.append(match)
split_results.append({
'split_index': i,
'bounds': split_info['bounds'],
'confidence': split_info['confidence'],
'method': 'advanced_edge_detection',
'matches': split_matches
})
# Clean up temporary file
if os.path.exists(temp_split_path):
os.remove(temp_split_path)
# Aggregate results
result = {
'layout_path': layout_path,
'detected_masters': [match['master_id'] for match in all_matches],
'panel_count': len(splits),
'split_mode': 'advanced',
'splits_generated': len(splits),
'split_results': split_results,
'all_matches': all_matches,
'percentile': self.percentile,
'min_gap': self.min_gap,
'boundaries': boundaries
}
# Remove duplicates while preserving highest confidence matches
result = self._deduplicate_matches(result)
return result
def split_panels(self, image_path: str, target_panel_count: int) -> List[Dict]:
"""
Split a layout image into individual panels (compatibility method for hybrid detector)
Args:
image_path (str): Path to the layout image
target_panel_count (int): Target number of panels to split into
Returns:
List[Dict]: List of split information with image data and metadata
"""
# Load image
img = Image.open(image_path).convert("RGB")
img_gray = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2GRAY)
print(f"Advanced splitting: Processing {os.path.basename(image_path)}")
print(f"Image dimensions: {img.width}x{img.height}")
print(f"Target panels: {target_panel_count}, Percentile: {self.percentile}, Min gap: {self.min_gap}")
# Determine split boundaries
boundaries = self.find_boundaries_auto(img_gray)
print(f"Auto-detected {len(boundaries) - 1} panels using advanced method")
# Create output directory for splits if debug mode
out_dir = Path(self.debug_dir) if self.debug else Path("/tmp/advanced_splits")
stem = Path(image_path).stem
# Split the image
splits = self.split_image(img, boundaries, out_dir, stem)
if not splits:
print("No splits detected, falling back to equal division")
# Fallback to equal division if no splits detected
w = img.width
h = img.height
panel_width = w // target_panel_count
splits = []
for i in range(target_panel_count):
x = i * panel_width
width = panel_width if i < target_panel_count - 1 else w - x
panel_img = img.crop((x, 0, x + width, h))
panel_cv = cv2.cvtColor(np.array(panel_img), cv2.COLOR_RGB2BGR)
splits.append({
'image': panel_cv,
'bounds': (x, 0, width, h),
'confidence': 0.7,
'method': 'advanced_fallback_equal_division'
})
return splits
def _match_split_basic(self, split_path: str, master_images: List[str]) -> List[Dict]:
"""Basic matching using OpenCV features (fallback)"""
matches = []
try:
# Load the split image
split_img = cv2.imread(split_path, cv2.IMREAD_GRAYSCALE)
if split_img is None:
return matches
# Initialize feature detector
akaze = cv2.AKAZE_create()
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
# Detect keypoints and descriptors for split image
kp_split, des_split = akaze.detectAndCompute(split_img, None)
if des_split is None:
return matches
# Load master images from the master_images directory
master_images_path = Path("master_images")
for master_id in master_images:
master_path = master_images_path / f"{master_id}.jpg"
if not master_path.exists():
continue
# Load master image
master_img = cv2.imread(str(master_path), cv2.IMREAD_GRAYSCALE)
if master_img is None:
continue
# Detect keypoints and descriptors for master image
kp_master, des_master = akaze.detectAndCompute(master_img, None)
if des_master is None:
continue
# Match features
matches_raw = bf.knnMatch(des_split, des_master, k=2)
# Apply Lowe's ratio test
good_matches = []
for match_pair in matches_raw:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.7 * n.distance:
good_matches.append(m)
# If we have enough good matches, try to find homography
if len(good_matches) >= 10:
src_pts = np.float32([kp_split[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp_master[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
try:
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
if M is not None:
inliers = int(np.sum(mask))
inlier_ratio = inliers / len(good_matches)
# Basic confidence scoring
if inliers >= 15 and inlier_ratio >= 0.6:
confidence = 'high'
elif inliers >= 8 and inlier_ratio >= 0.4:
confidence = 'medium'
else:
confidence = 'low'
# Only include medium and high confidence matches
if confidence in ['medium', 'high']:
matches.append({
'master_id': master_id,
'confidence': confidence,
'inliers': inliers,
'match_details': {
'inliers': inliers,
'good_matches': len(good_matches),
'inlier_ratio': round(inlier_ratio, 3)
}
})
except:
continue
except Exception as e:
print(f"Error in basic matching: {e}")
return matches
def _deduplicate_matches(self, result: Dict) -> Dict:
"""Remove duplicate matches, keeping highest confidence ones"""
if not result['all_matches']:
return result
# Group matches by master_id
master_groups = {}
for match in result['all_matches']:
master_id = match['master_id']
if master_id not in master_groups:
master_groups[master_id] = []
master_groups[master_id].append(match)
# Keep only the highest confidence match for each master
deduplicated_matches = []
for master_id, matches in master_groups.items():
# Sort by confidence (high > medium > low) and inliers
confidence_order = {'high': 3, 'medium': 2, 'low': 1}
best_match = max(matches, key=lambda x: (
confidence_order.get(x.get('confidence', 'low'), 0),
x.get('inliers', 0)
))
deduplicated_matches.append(best_match)
result['all_matches'] = deduplicated_matches
result['detected_masters'] = [match['master_id'] for match in deduplicated_matches]
return result