409 lines
No EOL
16 KiB
Python
409 lines
No EOL
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Advanced Panel Splitter Module - Edge detection and gutter analysis for panel splitting
|
||
"""
|
||
|
||
import os
|
||
import cv2
|
||
import numpy as np
|
||
from typing import List, Dict, Tuple, Optional
|
||
from pathlib import Path
|
||
from PIL import Image
|
||
|
||
|
||
class AdvancedPanelSplitter:
|
||
"""
|
||
Advanced panel splitter using edge detection and gutter analysis
|
||
for more accurate splitting of horizontal multi-panel marketing layouts.
|
||
"""
|
||
|
||
def __init__(self, percentile: float = 10, min_gap: int = 5, debug: bool = False):
|
||
"""
|
||
Initialize the AdvancedPanelSplitter
|
||
|
||
Args:
|
||
percentile (float): Percentile threshold for detecting gutters (0-100)
|
||
min_gap (int): Minimum consecutive low-energy columns for gutter detection
|
||
debug (bool): Enable debug mode for visualization
|
||
"""
|
||
self.percentile = percentile
|
||
self.min_gap = min_gap
|
||
self.debug = debug
|
||
self.debug_dir = "debug_advanced_splitting"
|
||
if self.debug and not os.path.exists(self.debug_dir):
|
||
os.makedirs(self.debug_dir)
|
||
|
||
def find_boundaries_auto(self, img_gray: np.ndarray) -> List[int]:
|
||
"""
|
||
Locate column indices that represent gutters between panels.
|
||
|
||
Parameters
|
||
----------
|
||
img_gray : np.ndarray
|
||
Grayscale image (H, W).
|
||
|
||
Returns
|
||
-------
|
||
List[int]
|
||
Sorted list of boundary x‑coordinates (including 0 and width‑1).
|
||
"""
|
||
# Vertical Sobel to highlight vertical edges
|
||
sobelx = cv2.Sobel(img_gray, cv2.CV_64F, 1, 0, ksize=3)
|
||
col_energy = np.abs(sobelx).sum(axis=0) # 1‑D edge energy profile
|
||
|
||
thresh = np.percentile(col_energy, self.percentile)
|
||
low_energy_cols = np.where(col_energy < thresh)[0]
|
||
|
||
if low_energy_cols.size == 0: # fallback to equidistant split in worst case
|
||
return [0, img_gray.shape[1] - 1]
|
||
|
||
# Group consecutive columns
|
||
clusters, current = [], [low_energy_cols[0]]
|
||
for c in low_energy_cols[1:]:
|
||
if c == current[-1] + 1:
|
||
current.append(c)
|
||
else:
|
||
clusters.append(current)
|
||
current = [c]
|
||
clusters.append(current)
|
||
|
||
# Keep clusters that are wide enough (filter noise)
|
||
clusters = [cl for cl in clusters if len(cl) >= self.min_gap]
|
||
|
||
# Use the centre of each cluster as the boundary position
|
||
boundaries = [0] + [int(np.mean(cl)) for cl in clusters] + [img_gray.shape[1] - 1]
|
||
boundaries = sorted(list(set(boundaries))) # de‑duplicate & sort
|
||
return boundaries
|
||
|
||
def split_image(self, img: Image.Image, boundaries: List[int], out_dir: Path, stem: str) -> List[Dict]:
|
||
"""
|
||
Crop and save each panel, returning split information.
|
||
|
||
Parameters
|
||
----------
|
||
img : PIL.Image.Image
|
||
boundaries : List[int]
|
||
Sorted x positions of panel borders.
|
||
out_dir : Path
|
||
Where to write files.
|
||
stem : str
|
||
Base name for panel files.
|
||
|
||
Returns
|
||
-------
|
||
List[Dict]
|
||
List of split information with image data and metadata
|
||
"""
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
splits = []
|
||
|
||
for i in range(len(boundaries) - 1):
|
||
left = boundaries[i]
|
||
right = boundaries[i + 1]
|
||
if right - left < 5: # skip spurious zero‑width crops
|
||
continue
|
||
panel = img.crop((left, 0, right, img.height))
|
||
|
||
# Convert PIL image to OpenCV format for consistency
|
||
panel_cv = cv2.cvtColor(np.array(panel), cv2.COLOR_RGB2BGR)
|
||
|
||
# Save panel file if debug mode
|
||
if self.debug:
|
||
panel_path = out_dir / f"{stem}_panel_{i+1:02d}.png"
|
||
panel.save(panel_path, "PNG")
|
||
print(f"Saved panel {i+1} → {stem}_panel_{i+1:02d}.png")
|
||
|
||
splits.append({
|
||
'image': panel_cv,
|
||
'bounds': (left, 0, right - left, img.height),
|
||
'confidence': 0.9, # High confidence for advanced method
|
||
'method': 'advanced_edge_detection'
|
||
})
|
||
|
||
return splits
|
||
|
||
def split_layout_and_match(self, layout_path: str, master_images: List[str],
|
||
detector_instance=None, n_panels: Optional[int] = None) -> Dict:
|
||
"""
|
||
Main method to split a layout using advanced edge detection and match splits to master images
|
||
|
||
Args:
|
||
layout_path (str): Path to the layout image
|
||
master_images (List[str]): List of master image paths
|
||
detector_instance: The detector instance to use for matching
|
||
n_panels (int, optional): If provided, split into this many equal‑width panels
|
||
|
||
Returns:
|
||
Dict: Detection results with matches from all splits
|
||
"""
|
||
# Load image
|
||
img = Image.open(layout_path).convert("RGB")
|
||
img_gray = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2GRAY)
|
||
|
||
print(f"Processing {os.path.basename(layout_path)} with advanced splitting")
|
||
print(f"Image dimensions: {img.width}x{img.height}")
|
||
print(f"Percentile threshold: {self.percentile}, Min gap: {self.min_gap}")
|
||
|
||
# Determine split boundaries
|
||
if n_panels:
|
||
# Equally spaced boundaries
|
||
w = img.width
|
||
step = w / n_panels
|
||
boundaries = [0] + [int(round(step * k)) for k in range(1, n_panels)] + [w - 1]
|
||
print(f"Using fixed {n_panels} panels with equal spacing")
|
||
else:
|
||
boundaries = self.find_boundaries_auto(img_gray)
|
||
print(f"Auto-detected {len(boundaries) - 1} panels")
|
||
|
||
# Create output directory for splits if debug mode
|
||
out_dir = Path(self.debug_dir) if self.debug else Path("/tmp/advanced_splits")
|
||
stem = Path(layout_path).stem
|
||
|
||
# Split the image
|
||
splits = self.split_image(img, boundaries, out_dir, stem)
|
||
|
||
if not splits:
|
||
print("No splits detected, returning empty results")
|
||
return {
|
||
'layout_path': layout_path,
|
||
'detected_masters': [],
|
||
'panel_count': 0,
|
||
'split_mode': 'advanced',
|
||
'splits_generated': 0,
|
||
'percentile': self.percentile,
|
||
'min_gap': self.min_gap
|
||
}
|
||
|
||
print(f"Generated {len(splits)} splits using advanced method")
|
||
|
||
# Match each split to master images
|
||
all_matches = []
|
||
split_results = []
|
||
|
||
for i, split_info in enumerate(splits):
|
||
print(f"Processing split {i+1}/{len(splits)}")
|
||
|
||
# Save split image temporarily for matching
|
||
split_image = split_info['image']
|
||
temp_split_path = f"/tmp/advanced_split_{i}.jpg"
|
||
cv2.imwrite(temp_split_path, split_image)
|
||
|
||
# Match this split to master images using existing inlier analysis
|
||
if hasattr(detector_instance, 'match_split_to_masters'):
|
||
split_matches = detector_instance.match_split_to_masters(
|
||
temp_split_path, master_images
|
||
)
|
||
else:
|
||
# Use basic inlier analysis if method doesn't exist
|
||
split_matches = self._match_split_basic(temp_split_path, master_images)
|
||
|
||
# Add split metadata to matches
|
||
for match in split_matches:
|
||
match['split_index'] = i
|
||
match['split_bounds'] = split_info['bounds']
|
||
match['split_confidence'] = split_info['confidence']
|
||
match['split_method'] = 'advanced_edge_detection'
|
||
all_matches.append(match)
|
||
|
||
split_results.append({
|
||
'split_index': i,
|
||
'bounds': split_info['bounds'],
|
||
'confidence': split_info['confidence'],
|
||
'method': 'advanced_edge_detection',
|
||
'matches': split_matches
|
||
})
|
||
|
||
# Clean up temporary file
|
||
if os.path.exists(temp_split_path):
|
||
os.remove(temp_split_path)
|
||
|
||
# Aggregate results
|
||
result = {
|
||
'layout_path': layout_path,
|
||
'detected_masters': [match['master_id'] for match in all_matches],
|
||
'panel_count': len(splits),
|
||
'split_mode': 'advanced',
|
||
'splits_generated': len(splits),
|
||
'split_results': split_results,
|
||
'all_matches': all_matches,
|
||
'percentile': self.percentile,
|
||
'min_gap': self.min_gap,
|
||
'boundaries': boundaries
|
||
}
|
||
|
||
# Remove duplicates while preserving highest confidence matches
|
||
result = self._deduplicate_matches(result)
|
||
|
||
return result
|
||
|
||
def split_panels(self, image_path: str, target_panel_count: int) -> List[Dict]:
|
||
"""
|
||
Split a layout image into individual panels (compatibility method for hybrid detector)
|
||
|
||
Args:
|
||
image_path (str): Path to the layout image
|
||
target_panel_count (int): Target number of panels to split into
|
||
|
||
Returns:
|
||
List[Dict]: List of split information with image data and metadata
|
||
"""
|
||
# Load image
|
||
img = Image.open(image_path).convert("RGB")
|
||
img_gray = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2GRAY)
|
||
|
||
print(f"Advanced splitting: Processing {os.path.basename(image_path)}")
|
||
print(f"Image dimensions: {img.width}x{img.height}")
|
||
print(f"Target panels: {target_panel_count}, Percentile: {self.percentile}, Min gap: {self.min_gap}")
|
||
|
||
# Determine split boundaries
|
||
boundaries = self.find_boundaries_auto(img_gray)
|
||
print(f"Auto-detected {len(boundaries) - 1} panels using advanced method")
|
||
|
||
# Create output directory for splits if debug mode
|
||
out_dir = Path(self.debug_dir) if self.debug else Path("/tmp/advanced_splits")
|
||
stem = Path(image_path).stem
|
||
|
||
# Split the image
|
||
splits = self.split_image(img, boundaries, out_dir, stem)
|
||
|
||
if not splits:
|
||
print("No splits detected, falling back to equal division")
|
||
# Fallback to equal division if no splits detected
|
||
w = img.width
|
||
h = img.height
|
||
panel_width = w // target_panel_count
|
||
|
||
splits = []
|
||
for i in range(target_panel_count):
|
||
x = i * panel_width
|
||
width = panel_width if i < target_panel_count - 1 else w - x
|
||
|
||
panel_img = img.crop((x, 0, x + width, h))
|
||
panel_cv = cv2.cvtColor(np.array(panel_img), cv2.COLOR_RGB2BGR)
|
||
|
||
splits.append({
|
||
'image': panel_cv,
|
||
'bounds': (x, 0, width, h),
|
||
'confidence': 0.7,
|
||
'method': 'advanced_fallback_equal_division'
|
||
})
|
||
|
||
return splits
|
||
|
||
def _match_split_basic(self, split_path: str, master_images: List[str]) -> List[Dict]:
|
||
"""Basic matching using OpenCV features (fallback)"""
|
||
matches = []
|
||
|
||
try:
|
||
# Load the split image
|
||
split_img = cv2.imread(split_path, cv2.IMREAD_GRAYSCALE)
|
||
if split_img is None:
|
||
return matches
|
||
|
||
# Initialize feature detector
|
||
akaze = cv2.AKAZE_create()
|
||
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
||
|
||
# Detect keypoints and descriptors for split image
|
||
kp_split, des_split = akaze.detectAndCompute(split_img, None)
|
||
|
||
if des_split is None:
|
||
return matches
|
||
|
||
# Load master images from the master_images directory
|
||
master_images_path = Path("master_images")
|
||
|
||
for master_id in master_images:
|
||
master_path = master_images_path / f"{master_id}.jpg"
|
||
if not master_path.exists():
|
||
continue
|
||
|
||
# Load master image
|
||
master_img = cv2.imread(str(master_path), cv2.IMREAD_GRAYSCALE)
|
||
if master_img is None:
|
||
continue
|
||
|
||
# Detect keypoints and descriptors for master image
|
||
kp_master, des_master = akaze.detectAndCompute(master_img, None)
|
||
|
||
if des_master is None:
|
||
continue
|
||
|
||
# Match features
|
||
matches_raw = bf.knnMatch(des_split, des_master, k=2)
|
||
|
||
# Apply Lowe's ratio test
|
||
good_matches = []
|
||
for match_pair in matches_raw:
|
||
if len(match_pair) == 2:
|
||
m, n = match_pair
|
||
if m.distance < 0.7 * n.distance:
|
||
good_matches.append(m)
|
||
|
||
# If we have enough good matches, try to find homography
|
||
if len(good_matches) >= 10:
|
||
src_pts = np.float32([kp_split[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||
dst_pts = np.float32([kp_master[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||
|
||
try:
|
||
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
|
||
if M is not None:
|
||
inliers = int(np.sum(mask))
|
||
inlier_ratio = inliers / len(good_matches)
|
||
|
||
# Basic confidence scoring
|
||
if inliers >= 15 and inlier_ratio >= 0.6:
|
||
confidence = 'high'
|
||
elif inliers >= 8 and inlier_ratio >= 0.4:
|
||
confidence = 'medium'
|
||
else:
|
||
confidence = 'low'
|
||
|
||
# Only include medium and high confidence matches
|
||
if confidence in ['medium', 'high']:
|
||
matches.append({
|
||
'master_id': master_id,
|
||
'confidence': confidence,
|
||
'inliers': inliers,
|
||
'match_details': {
|
||
'inliers': inliers,
|
||
'good_matches': len(good_matches),
|
||
'inlier_ratio': round(inlier_ratio, 3)
|
||
}
|
||
})
|
||
except:
|
||
continue
|
||
|
||
except Exception as e:
|
||
print(f"Error in basic matching: {e}")
|
||
|
||
return matches
|
||
|
||
def _deduplicate_matches(self, result: Dict) -> Dict:
|
||
"""Remove duplicate matches, keeping highest confidence ones"""
|
||
if not result['all_matches']:
|
||
return result
|
||
|
||
# Group matches by master_id
|
||
master_groups = {}
|
||
for match in result['all_matches']:
|
||
master_id = match['master_id']
|
||
if master_id not in master_groups:
|
||
master_groups[master_id] = []
|
||
master_groups[master_id].append(match)
|
||
|
||
# Keep only the highest confidence match for each master
|
||
deduplicated_matches = []
|
||
for master_id, matches in master_groups.items():
|
||
# Sort by confidence (high > medium > low) and inliers
|
||
confidence_order = {'high': 3, 'medium': 2, 'low': 1}
|
||
best_match = max(matches, key=lambda x: (
|
||
confidence_order.get(x.get('confidence', 'low'), 0),
|
||
x.get('inliers', 0)
|
||
))
|
||
deduplicated_matches.append(best_match)
|
||
|
||
result['all_matches'] = deduplicated_matches
|
||
result['detected_masters'] = [match['master_id'] for match in deduplicated_matches]
|
||
|
||
return result |