776 lines
No EOL
34 KiB
Python
776 lines
No EOL
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Vector-based Image Detection Module
|
|
Extracted from image_detector.py - Contains VectorImageDetector class
|
|
Uses Google Vertex AI Multimodal Embeddings for image similarity detection
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
import numpy as np
|
|
import pickle
|
|
from google.cloud import aiplatform
|
|
from vertexai.vision_models import MultiModalEmbeddingModel
|
|
import cv2
|
|
from panel_splitter import PanelSplitter
|
|
|
|
|
|
class VectorImageDetector:
|
|
def __init__(self, similarity_threshold=0.75, splitting_mode="none", min_crop_size=200, crop_padding=20, split_mode=False):
|
|
"""Initialize the vector-based image detector using Google Vertex AI Multimodal Embeddings"""
|
|
print("Initializing Vector Image Detector with Google Vertex AI...")
|
|
|
|
# Initialize Vertex AI
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "service-account.json"
|
|
aiplatform.init(project="optical-414516", location="us-central1")
|
|
|
|
# Initialize multimodal embedding model
|
|
self.model = MultiModalEmbeddingModel.from_pretrained("multimodalembedding@001")
|
|
|
|
# Configuration
|
|
self.similarity_threshold = similarity_threshold
|
|
self.splitting_mode = splitting_mode
|
|
self.min_crop_size = min_crop_size
|
|
self.crop_padding = crop_padding
|
|
self.split_mode = split_mode
|
|
|
|
# Split mode configuration
|
|
if self.split_mode:
|
|
self.splitter = PanelSplitter(debug=True)
|
|
print("Split mode enabled: Will split multi-panel layouts before matching")
|
|
|
|
# Paths
|
|
self.master_images_path = Path("master_images")
|
|
self.layouts_path = Path("layouts")
|
|
self.results_path = Path("results")
|
|
self.embeddings_cache_path = Path("embeddings_cache")
|
|
self.crops_debug_path = Path("crops_debug")
|
|
|
|
# Create directories
|
|
self.results_path.mkdir(exist_ok=True)
|
|
self.embeddings_cache_path.mkdir(exist_ok=True)
|
|
self.crops_debug_path.mkdir(exist_ok=True)
|
|
|
|
# Master images data
|
|
self.master_images = {}
|
|
self.master_files = {}
|
|
self.master_embeddings = {}
|
|
|
|
print(f"✓ Vector detector initialized with similarity threshold: {similarity_threshold}")
|
|
print(f"✓ Splitting mode: {splitting_mode}, Min crop size: {min_crop_size}px")
|
|
|
|
def load_master_images(self) -> Dict[str, str]:
|
|
"""Load all master images and create ID mapping using filenames"""
|
|
print("Loading master images...")
|
|
|
|
master_files = list(self.master_images_path.glob("*.jpg"))
|
|
print(f"Found {len(master_files)} master images")
|
|
|
|
for file_path in master_files:
|
|
master_id = file_path.stem
|
|
self.master_images[master_id] = str(file_path)
|
|
self.master_files[master_id] = file_path.name
|
|
|
|
return self.master_images
|
|
|
|
def generate_image_embedding(self, image_path: str) -> np.ndarray:
|
|
"""Generate 1408-dimensional embedding for an image using Vertex AI"""
|
|
try:
|
|
from vertexai.vision_models import Image as VertexImage
|
|
|
|
# Create Vertex AI Image object directly from file path
|
|
vertex_image = VertexImage.load_from_file(image_path)
|
|
|
|
# Get embedding from Vertex AI
|
|
response = self.model.get_embeddings(image=vertex_image)
|
|
|
|
# Extract the embedding vector (1408 dimensions)
|
|
embedding = np.array(response.image_embedding)
|
|
|
|
return embedding
|
|
|
|
except Exception as e:
|
|
print(f"Error generating embedding for {Path(image_path).name}: {e}")
|
|
return None
|
|
|
|
def save_embedding_cache(self, embeddings: Dict, filename: str):
|
|
"""Save embeddings to cache file"""
|
|
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
|
|
with open(cache_file, 'wb') as f:
|
|
pickle.dump(embeddings, f)
|
|
print(f"Embeddings cached to: {cache_file}")
|
|
|
|
def load_embedding_cache(self, filename: str) -> Optional[Dict]:
|
|
"""Load embeddings from cache file"""
|
|
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
|
|
if cache_file.exists():
|
|
try:
|
|
with open(cache_file, 'rb') as f:
|
|
embeddings = pickle.load(f)
|
|
print(f"Loaded cached embeddings from: {cache_file}")
|
|
return embeddings
|
|
except Exception as e:
|
|
print(f"Error loading cached embeddings: {e}")
|
|
return None
|
|
|
|
def generate_master_embeddings(self, force_regenerate=False) -> Dict[str, np.ndarray]:
|
|
"""Generate embeddings for all master images (with caching)"""
|
|
cache_filename = "master_embeddings"
|
|
|
|
# Try to load from cache first
|
|
if not force_regenerate:
|
|
cached_embeddings = self.load_embedding_cache(cache_filename)
|
|
if cached_embeddings is not None:
|
|
# Verify all master images are in cache
|
|
if set(cached_embeddings.keys()) == set(self.master_images.keys()):
|
|
self.master_embeddings = cached_embeddings
|
|
print(f"✓ Using cached embeddings for {len(cached_embeddings)} master images")
|
|
return self.master_embeddings
|
|
else:
|
|
print("Cache incomplete, regenerating embeddings...")
|
|
|
|
print(f"Generating embeddings for {len(self.master_images)} master images...")
|
|
self.master_embeddings = {}
|
|
|
|
for i, (master_id, image_path) in enumerate(self.master_images.items(), 1):
|
|
print(f" {i}/{len(self.master_images)}: Generating embedding for {master_id}")
|
|
|
|
embedding = self.generate_image_embedding(image_path)
|
|
if embedding is not None:
|
|
self.master_embeddings[master_id] = embedding
|
|
|
|
# Small delay to avoid rate limiting
|
|
if i < len(self.master_images):
|
|
time.sleep(0.1)
|
|
|
|
# Cache the embeddings
|
|
if self.master_embeddings:
|
|
self.save_embedding_cache(self.master_embeddings, cache_filename)
|
|
|
|
print(f"✓ Generated embeddings for {len(self.master_embeddings)} master images")
|
|
return self.master_embeddings
|
|
|
|
def compute_cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
|
|
"""Compute cosine similarity between two embeddings"""
|
|
# Normalize the embeddings
|
|
norm1 = np.linalg.norm(embedding1)
|
|
norm2 = np.linalg.norm(embedding2)
|
|
|
|
if norm1 == 0 or norm2 == 0:
|
|
return 0.0
|
|
|
|
# Compute cosine similarity
|
|
similarity = np.dot(embedding1, embedding2) / (norm1 * norm2)
|
|
return float(similarity)
|
|
|
|
def detect_layout_type(self, image_path: str) -> str:
|
|
"""Analyze layout image to determine if it's single image or composite"""
|
|
try:
|
|
img = cv2.imread(image_path)
|
|
height, width = img.shape[:2]
|
|
|
|
# Convert to grayscale for analysis
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Detect edges to find potential separators
|
|
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
|
|
|
|
# Look for strong vertical lines (panel separators)
|
|
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, height // 10))
|
|
vertical_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, vertical_kernel)
|
|
|
|
# Look for strong horizontal lines (row separators)
|
|
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (width // 10, 1))
|
|
horizontal_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, horizontal_kernel)
|
|
|
|
# Count significant vertical and horizontal structures
|
|
vertical_density = np.sum(vertical_lines) / (height * width)
|
|
horizontal_density = np.sum(horizontal_lines) / (height * width)
|
|
|
|
# Determine layout type based on structure
|
|
if vertical_density > 0.01 or horizontal_density > 0.01:
|
|
return "composite"
|
|
else:
|
|
return "single"
|
|
|
|
except Exception as e:
|
|
print(f"Error analyzing layout type for {Path(image_path).name}: {e}")
|
|
return "single" # Default to single if analysis fails
|
|
|
|
def split_image_by_grid(self, image_path: str) -> List[Dict]:
|
|
"""Split composite image into individual components using grid detection"""
|
|
try:
|
|
layout_name = Path(image_path).name
|
|
print(f" Analyzing grid structure for {layout_name}")
|
|
|
|
# Load image
|
|
img = cv2.imread(image_path)
|
|
height, width = img.shape[:2]
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Edge detection for finding separators
|
|
edges = cv2.Canny(gray, 30, 100, apertureSize=3)
|
|
|
|
# Detect vertical separators (for horizontal panels)
|
|
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, height // 8))
|
|
vertical_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, vertical_kernel)
|
|
|
|
# Find vertical separator positions
|
|
vertical_projection = np.sum(vertical_lines, axis=0)
|
|
vertical_threshold = np.max(vertical_projection) * 0.6 # More strict threshold
|
|
vertical_separators = []
|
|
|
|
for x in range(width):
|
|
if vertical_projection[x] > vertical_threshold:
|
|
# Check if this is a new separator (not adjacent to previous)
|
|
if not vertical_separators or x - vertical_separators[-1] > 50: # Larger gap requirement
|
|
vertical_separators.append(x)
|
|
|
|
# Detect horizontal separators (for stacked layouts)
|
|
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (width // 8, 1))
|
|
horizontal_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, horizontal_kernel)
|
|
|
|
# Find horizontal separator positions
|
|
horizontal_projection = np.sum(horizontal_lines, axis=1)
|
|
horizontal_threshold = np.max(horizontal_projection) * 0.6 # More strict threshold
|
|
horizontal_separators = []
|
|
|
|
for y in range(height):
|
|
if horizontal_projection[y] > horizontal_threshold:
|
|
if not horizontal_separators or y - horizontal_separators[-1] > 50: # Larger gap requirement
|
|
horizontal_separators.append(y)
|
|
|
|
# Generate crop regions based on detected separators
|
|
crops = []
|
|
|
|
# Add image boundaries to separator lists
|
|
x_boundaries = [0] + vertical_separators + [width]
|
|
y_boundaries = [0] + horizontal_separators + [height]
|
|
|
|
# Remove duplicates and sort
|
|
x_boundaries = sorted(list(set(x_boundaries)))
|
|
y_boundaries = sorted(list(set(y_boundaries)))
|
|
|
|
print(f" Found {len(x_boundaries)-1} x {len(y_boundaries)-1} grid sections")
|
|
|
|
# For horizontal layouts, prefer fallback splitting if grid creates too many small sections
|
|
total_sections = (len(x_boundaries)-1) * (len(y_boundaries)-1)
|
|
is_wide_horizontal = width > height * 1.5
|
|
|
|
if is_wide_horizontal and total_sections > 20:
|
|
print(f" Grid too complex ({total_sections} sections), using horizontal splitting instead")
|
|
crops = self.fallback_split_image(img, width, height)
|
|
else:
|
|
# Generate all possible rectangular crops
|
|
for i in range(len(y_boundaries) - 1):
|
|
for j in range(len(x_boundaries) - 1):
|
|
y1, y2 = y_boundaries[i], y_boundaries[i + 1]
|
|
x1, x2 = x_boundaries[j], x_boundaries[j + 1]
|
|
|
|
# Add padding and ensure boundaries
|
|
x1 = max(0, x1 - self.crop_padding)
|
|
y1 = max(0, y1 - self.crop_padding)
|
|
x2 = min(width, x2 + self.crop_padding)
|
|
y2 = min(height, y2 + self.crop_padding)
|
|
|
|
crop_width = x2 - x1
|
|
crop_height = y2 - y1
|
|
|
|
# Filter out crops that are too small
|
|
if crop_width >= self.min_crop_size and crop_height >= self.min_crop_size:
|
|
crop_area = crop_width * crop_height
|
|
total_area = width * height
|
|
area_ratio = crop_area / total_area
|
|
|
|
crops.append({
|
|
'bbox': (x1, y1, x2, y2),
|
|
'width': crop_width,
|
|
'height': crop_height,
|
|
'area_ratio': area_ratio,
|
|
'crop_id': f"grid_{i}_{j}"
|
|
})
|
|
|
|
# If no good crops found, try fallback splitting
|
|
if not crops:
|
|
print(f" No grid detected, trying fallback splitting")
|
|
crops = self.fallback_split_image(img, width, height)
|
|
|
|
print(f" Generated {len(crops)} crops for analysis")
|
|
return crops
|
|
|
|
except Exception as e:
|
|
print(f"Error splitting image {Path(image_path).name}: {e}")
|
|
return []
|
|
|
|
def fallback_split_image(self, img, width: int, height: int) -> List[Dict]:
|
|
"""Improved horizontal splitting focusing on major structural separators"""
|
|
crops = []
|
|
|
|
# Only process wide images for horizontal splitting
|
|
if width <= height * 1.2:
|
|
print(f" Image not wide enough for horizontal splitting, treating as single panel")
|
|
crops.append({
|
|
'bbox': (0, 0, width, height),
|
|
'width': width,
|
|
'height': height,
|
|
'area_ratio': 1.0,
|
|
'crop_id': "single"
|
|
})
|
|
return crops
|
|
|
|
print(f" Using improved horizontal splitting for {width}x{height} image")
|
|
|
|
# Convert to grayscale
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Method 1: Structural edge detection for full-height separators
|
|
edges = cv2.Canny(gray, 30, 100)
|
|
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, height // 3))
|
|
vertical_edges = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, vertical_kernel)
|
|
edge_projection = np.sum(vertical_edges, axis=0)
|
|
|
|
# Method 2: Intensity histogram analysis
|
|
horizontal_hist = np.sum(gray, axis=0)
|
|
inverted_hist = np.max(horizontal_hist) - horizontal_hist
|
|
|
|
# Smooth both signals
|
|
from scipy.ndimage import gaussian_filter1d
|
|
smoothed_edges = gaussian_filter1d(edge_projection, sigma=15)
|
|
smoothed_hist = gaussian_filter1d(inverted_hist, sigma=15)
|
|
|
|
# Adaptive parameters based on image size
|
|
if width < 2000:
|
|
min_panel_width = width // 4 # At least 25% per panel
|
|
max_panels = 3
|
|
elif width < 5000:
|
|
min_panel_width = width // 6 # At least 16% per panel
|
|
max_panels = 6
|
|
else:
|
|
min_panel_width = width // 12 # At least 8% per panel
|
|
max_panels = 15
|
|
|
|
print(f" Min panel width: {min_panel_width}px, Max panels: {max_panels}")
|
|
|
|
# Find separator candidates with conservative thresholds
|
|
edge_threshold = np.max(smoothed_edges) * 0.5 # Higher threshold for stronger edges
|
|
hist_threshold = np.max(smoothed_hist) * 0.4 # Higher threshold for deeper valleys
|
|
|
|
from scipy.signal import find_peaks
|
|
|
|
# Edge-based separators
|
|
edge_peaks, _ = find_peaks(smoothed_edges,
|
|
distance=min_panel_width,
|
|
height=edge_threshold,
|
|
prominence=np.max(smoothed_edges) * 0.3)
|
|
|
|
# Histogram-based separators
|
|
hist_peaks, _ = find_peaks(smoothed_hist,
|
|
distance=min_panel_width,
|
|
height=hist_threshold,
|
|
prominence=np.max(smoothed_hist) * 0.2)
|
|
|
|
print(f" Edge peaks: {len(edge_peaks)}, Histogram peaks: {len(hist_peaks)}")
|
|
|
|
# Combine separators and filter boundary areas
|
|
all_separators = set(edge_peaks) | set(hist_peaks)
|
|
boundary_margin = width * 0.1 # 10% margin from edges
|
|
valid_separators = [s for s in all_separators
|
|
if boundary_margin < s < width - boundary_margin]
|
|
|
|
# Remove separators too close to each other
|
|
valid_separators = sorted(valid_separators)
|
|
final_separators = []
|
|
for sep in valid_separators:
|
|
if not final_separators or sep - final_separators[-1] >= min_panel_width:
|
|
final_separators.append(sep)
|
|
|
|
# Limit to reasonable number of panels and keep strongest separators
|
|
if len(final_separators) >= max_panels:
|
|
separator_scores = []
|
|
for sep in final_separators:
|
|
edge_score = smoothed_edges[sep] if sep < len(smoothed_edges) else 0
|
|
hist_score = smoothed_hist[sep] if sep < len(smoothed_hist) else 0
|
|
combined_score = edge_score + hist_score
|
|
separator_scores.append((sep, combined_score))
|
|
|
|
separator_scores.sort(key=lambda x: x[1], reverse=True)
|
|
final_separators = [s[0] for s in separator_scores[:max_panels-1]]
|
|
final_separators.sort()
|
|
|
|
print(f" Final separators: {final_separators}")
|
|
|
|
# Create crops
|
|
x_boundaries = [0] + final_separators + [width]
|
|
|
|
for i in range(len(x_boundaries) - 1):
|
|
x1, x2 = x_boundaries[i], x_boundaries[i + 1]
|
|
|
|
if x2 - x1 >= self.min_crop_size:
|
|
crops.append({
|
|
'bbox': (x1, 0, x2, height),
|
|
'width': x2 - x1,
|
|
'height': height,
|
|
'area_ratio': (x2 - x1) / width,
|
|
'crop_id': f"panel_{i}"
|
|
})
|
|
|
|
print(f" Generated {len(crops)} improved horizontal crops")
|
|
|
|
return crops
|
|
|
|
def save_crop_debug_images(self, image_path: str, crops: List[Dict]):
|
|
"""Save cropped images for debugging purposes"""
|
|
try:
|
|
layout_name = Path(image_path).stem
|
|
img = cv2.imread(image_path)
|
|
|
|
for i, crop in enumerate(crops):
|
|
x1, y1, x2, y2 = crop['bbox']
|
|
cropped = img[y1:y2, x1:x2]
|
|
|
|
debug_filename = f"{layout_name}_crop_{i}_{crop['crop_id']}.jpg"
|
|
debug_path = self.crops_debug_path / debug_filename
|
|
cv2.imwrite(str(debug_path), cropped)
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Failed to save debug crops: {e}")
|
|
|
|
def generate_crop_embedding(self, image_path: str, crop_info: Dict) -> Optional[np.ndarray]:
|
|
"""Generate embedding for a specific crop of an image"""
|
|
try:
|
|
# Load full image
|
|
img = cv2.imread(image_path)
|
|
|
|
# Extract crop region
|
|
x1, y1, x2, y2 = crop_info['bbox']
|
|
cropped_img = img[y1:y2, x1:x2]
|
|
|
|
# Save crop to temporary file for embedding generation
|
|
temp_crop_path = self.crops_debug_path / f"temp_crop.jpg"
|
|
cv2.imwrite(str(temp_crop_path), cropped_img)
|
|
|
|
# Generate embedding for crop
|
|
embedding = self.generate_image_embedding(str(temp_crop_path))
|
|
|
|
# Clean up temp file
|
|
if temp_crop_path.exists():
|
|
temp_crop_path.unlink()
|
|
|
|
return embedding
|
|
|
|
except Exception as e:
|
|
print(f"Error generating crop embedding: {e}")
|
|
return None
|
|
|
|
def detect_masters_in_layout_vector(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
|
|
"""Detect which master images appear in a layout using vector similarity with optional splitting"""
|
|
layout_name = Path(layout_path).name
|
|
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Vector mode: {self.splitting_mode})")
|
|
|
|
try:
|
|
# Step 1: Determine if we should use splitting
|
|
if self.splitting_mode == "none":
|
|
return self.detect_whole_image(layout_path, layout_name)
|
|
elif self.splitting_mode == "auto":
|
|
layout_type = self.detect_layout_type(layout_path)
|
|
if layout_type == "single":
|
|
return self.detect_whole_image(layout_path, layout_name)
|
|
else:
|
|
return self.detect_with_splitting(layout_path, layout_name)
|
|
elif self.splitting_mode == "grid":
|
|
return self.detect_with_splitting(layout_path, layout_name)
|
|
else:
|
|
# Default to whole image
|
|
return self.detect_whole_image(layout_path, layout_name)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error analyzing {layout_name} with vector embeddings: {e}"
|
|
print(error_msg)
|
|
return {
|
|
'detected_masters': [],
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': 'Vector embedding analysis failed',
|
|
'error': str(e),
|
|
'processing_mode': f'vector_embedding_{self.splitting_mode}'
|
|
}
|
|
|
|
def detect_whole_image(self, layout_path: str, layout_name: str) -> Dict:
|
|
"""Detect masters using whole image comparison"""
|
|
print(f" Processing whole image: {layout_name}")
|
|
|
|
# Generate embedding for layout image
|
|
layout_embedding = self.generate_image_embedding(layout_path)
|
|
|
|
if layout_embedding is None:
|
|
raise Exception("Failed to generate layout embedding")
|
|
|
|
# Compare with all master embeddings
|
|
similarities = {}
|
|
detected_masters = []
|
|
|
|
print(f" Comparing against {len(self.master_embeddings)} master images...")
|
|
for master_id, master_embedding in self.master_embeddings.items():
|
|
similarity = self.compute_cosine_similarity(layout_embedding, master_embedding)
|
|
similarities[master_id] = similarity
|
|
|
|
if similarity >= self.similarity_threshold:
|
|
detected_masters.append(master_id)
|
|
|
|
# Sort detected masters by similarity (highest first)
|
|
detected_masters.sort(key=lambda x: similarities[x], reverse=True)
|
|
|
|
# Create analysis text
|
|
top_similarities = sorted(similarities.items(), key=lambda x: x[1], reverse=True)[:5]
|
|
analysis_parts = [
|
|
f"Whole image vector analysis using Google Vertex AI embeddings (1408 dimensions).",
|
|
f"Similarity threshold: {self.similarity_threshold}",
|
|
f"Found {len(detected_masters)} matches above threshold.",
|
|
f"Top 5 similarities: " + ", ".join([f"{mid}({sim:.3f})" for mid, sim in top_similarities])
|
|
]
|
|
analysis = " ".join(analysis_parts)
|
|
|
|
print(f"✓ Completed {layout_name} - Found {len(detected_masters)} matches")
|
|
if detected_masters:
|
|
print(f" Matches: {', '.join(detected_masters)}")
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters ],
|
|
'analysis': analysis,
|
|
'similarities': dict(top_similarities),
|
|
'processing_mode': 'vector_embedding_whole',
|
|
'similarity_threshold': self.similarity_threshold,
|
|
'embedding_dimensions': 1408
|
|
}
|
|
|
|
def detect_with_splitting(self, layout_path: str, layout_name: str) -> Dict:
|
|
"""Detect masters using image splitting and crop comparison"""
|
|
print(f" Processing with grid splitting: {layout_name}")
|
|
|
|
# Step 1: Split the image into crops
|
|
crops = self.split_image_by_grid(layout_path)
|
|
|
|
if not crops:
|
|
print(f" No valid crops found, falling back to whole image")
|
|
return self.detect_whole_image(layout_path, layout_name)
|
|
|
|
# Step 2: Save debug crops if needed
|
|
self.save_crop_debug_images(layout_path, crops)
|
|
|
|
# Step 3: Process each crop
|
|
all_crop_results = []
|
|
crop_similarities = {}
|
|
|
|
for i, crop in enumerate(crops):
|
|
print(f" Processing crop {i+1}/{len(crops)} ({crop['crop_id']})")
|
|
|
|
# Generate embedding for this crop
|
|
crop_embedding = self.generate_crop_embedding(layout_path, crop)
|
|
|
|
if crop_embedding is None:
|
|
continue
|
|
|
|
# Compare crop against all masters
|
|
crop_result = {
|
|
'crop_id': crop['crop_id'],
|
|
'crop_info': crop,
|
|
'similarities': {},
|
|
'matches': []
|
|
}
|
|
|
|
for master_id, master_embedding in self.master_embeddings.items():
|
|
similarity = self.compute_cosine_similarity(crop_embedding, master_embedding)
|
|
crop_result['similarities'][master_id] = similarity
|
|
|
|
if similarity >= self.similarity_threshold:
|
|
crop_result['matches'].append(master_id)
|
|
|
|
# Sort matches by similarity
|
|
crop_result['matches'].sort(key=lambda x: crop_result['similarities'][x], reverse=True)
|
|
all_crop_results.append(crop_result)
|
|
|
|
# Track all similarities for global analysis
|
|
for master_id, sim in crop_result['similarities'].items():
|
|
if master_id not in crop_similarities or sim > crop_similarities[master_id]:
|
|
crop_similarities[master_id] = sim
|
|
|
|
# Step 4: Aggregate results across all crops
|
|
detected_masters = []
|
|
final_similarities = {}
|
|
|
|
# Collect all unique matches with their best similarity scores
|
|
for crop_result in all_crop_results:
|
|
for match in crop_result['matches']:
|
|
if match not in detected_masters:
|
|
detected_masters.append(match)
|
|
final_similarities[match] = crop_result['similarities'][match]
|
|
else:
|
|
# Update with higher similarity if found
|
|
if crop_result['similarities'][match] > final_similarities[match]:
|
|
final_similarities[match] = crop_result['similarities'][match]
|
|
|
|
# Sort by best similarity
|
|
detected_masters.sort(key=lambda x: final_similarities.get(x, 0), reverse=True)
|
|
|
|
# Get top overall similarities for analysis
|
|
top_similarities = sorted(crop_similarities.items(), key=lambda x: x[1], reverse=True)[:5]
|
|
|
|
# Create analysis
|
|
analysis_parts = [
|
|
f"Grid-based splitting analysis using Google Vertex AI embeddings (1408 dimensions).",
|
|
f"Split into {len(crops)} crops, processed {len(all_crop_results)} successfully.",
|
|
f"Similarity threshold: {self.similarity_threshold}",
|
|
f"Found {len(detected_masters)} unique matches across all crops.",
|
|
f"Top 5 similarities: " + ", ".join([f"{mid}({sim:.3f})" for mid, sim in top_similarities])
|
|
]
|
|
analysis = " ".join(analysis_parts)
|
|
|
|
print(f"✓ Completed {layout_name} - Found {len(detected_masters)} matches across {len(crops)} crops")
|
|
if detected_masters:
|
|
print(f" Matches: {', '.join(detected_masters)}")
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters ],
|
|
'analysis': analysis,
|
|
'similarities': dict(top_similarities),
|
|
'processing_mode': 'vector_embedding_grid',
|
|
'similarity_threshold': self.similarity_threshold,
|
|
'embedding_dimensions': 1408,
|
|
'crops_processed': len(all_crop_results),
|
|
'total_crops': len(crops),
|
|
'crop_results': all_crop_results # Detailed crop-by-crop results
|
|
}
|
|
|
|
def process_all_layouts_vector(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
|
|
"""Process all layout images using vector embeddings"""
|
|
print("Starting vector-based batch processing...")
|
|
|
|
# Load master images
|
|
self.load_master_images()
|
|
|
|
# Generate master embeddings (with caching)
|
|
self.generate_master_embeddings()
|
|
|
|
if not self.master_embeddings:
|
|
raise Exception("No master embeddings available")
|
|
|
|
# Get layout files
|
|
if specific_file:
|
|
# Process only the specific file
|
|
layout_files = [self.layouts_path / specific_file]
|
|
if not layout_files[0].exists():
|
|
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
|
|
print(f"Processing specific file: {specific_file}")
|
|
else:
|
|
layout_files = list(self.layouts_path.glob("*.jpg"))
|
|
layout_files.sort() # Ensure consistent alphabetical ordering
|
|
|
|
print(f"Found {len(layout_files)} layout files")
|
|
if layout_files:
|
|
print(f"First file will be: {layout_files[0].name}")
|
|
|
|
if limit:
|
|
layout_files = layout_files[:limit]
|
|
print(f"Processing first {limit} layouts only")
|
|
|
|
total_layouts = len(layout_files)
|
|
print(f"Processing {total_layouts} layout images using vector embeddings")
|
|
print("=" * 60)
|
|
|
|
results = {}
|
|
start_time = time.time()
|
|
|
|
for i, layout_path in enumerate(layout_files, 1):
|
|
layout_id = layout_path.stem
|
|
|
|
# Detect images using vector similarity
|
|
result = self.detect_masters_in_layout_vector(str(layout_path), i, total_layouts)
|
|
|
|
layout_result = {
|
|
'layout_filename': layout_path.name,
|
|
'detected_master_ids': result['detected_master_ids'],
|
|
'detected_master_filenames': result['detected_master_filenames'],
|
|
'analysis': result['analysis'],
|
|
'processing_mode': 'vector_embedding',
|
|
'similarity_threshold': self.similarity_threshold,
|
|
'embedding_dimensions': 1408
|
|
}
|
|
|
|
if 'similarities' in result:
|
|
layout_result['similarities'] = result['similarities']
|
|
|
|
if 'error' in result:
|
|
layout_result['error'] = result['error']
|
|
|
|
results[layout_id] = layout_result
|
|
|
|
# Progress update
|
|
elapsed = time.time() - start_time
|
|
avg_time = elapsed / i
|
|
remaining = (total_layouts - i) * avg_time
|
|
|
|
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
|
|
|
|
# Save progress periodically
|
|
if i % 20 == 0:
|
|
self.save_results(results, f"vector_progress_{i}")
|
|
|
|
total_time = time.time() - start_time
|
|
print(f"\n✓ Completed vector processing of {total_layouts} layouts in {total_time/60:.1f} minutes")
|
|
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
|
|
return results
|
|
|
|
def save_results(self, results: Dict, filename: str = "vector_detection_results") -> str:
|
|
"""Save results to JSON file"""
|
|
output_path = self.results_path / f"{filename}.json"
|
|
|
|
# Add metadata
|
|
output_data = {
|
|
'metadata': {
|
|
'total_layouts_processed': len(results),
|
|
'total_master_images': len(self.master_images),
|
|
'master_images_available': list(self.master_files.keys()),
|
|
'processing_mode': 'vector_embedding',
|
|
'similarity_threshold': self.similarity_threshold,
|
|
'embedding_dimensions': 1408,
|
|
'embedding_model': 'Google Vertex AI multimodalembedding@001'
|
|
},
|
|
'results': results
|
|
}
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(output_data, f, indent=2)
|
|
|
|
print(f"Results saved to: {output_path}")
|
|
return str(output_path)
|
|
|
|
def generate_summary(self, results: Dict) -> Dict:
|
|
"""Generate summary statistics for vector detection"""
|
|
total_layouts = len(results)
|
|
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
|
|
|
|
# Count master image occurrences
|
|
master_counts = {}
|
|
for result in results.values():
|
|
for master_id in result['detected_master_ids']:
|
|
master_counts[master_id] = master_counts.get(master_id, 0) + 1
|
|
|
|
summary = {
|
|
'total_layouts_processed': total_layouts,
|
|
'layouts_with_matches': layouts_with_matches,
|
|
'layouts_without_matches': total_layouts - layouts_with_matches,
|
|
'master_image_usage': master_counts,
|
|
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
|
|
'processing_mode': 'vector_embedding',
|
|
'similarity_threshold': self.similarity_threshold,
|
|
'embedding_dimensions': 1408
|
|
}
|
|
|
|
return summary |