master_adapt_detect/vector_detector.py
2025-10-01 14:32:55 -05:00

776 lines
No EOL
34 KiB
Python

#!/usr/bin/env python3
"""
Vector-based Image Detection Module
Extracted from image_detector.py - Contains VectorImageDetector class
Uses Google Vertex AI Multimodal Embeddings for image similarity detection
"""
import os
import json
import time
from pathlib import Path
from typing import List, Dict, Optional
import numpy as np
import pickle
from google.cloud import aiplatform
from vertexai.vision_models import MultiModalEmbeddingModel
import cv2
from panel_splitter import PanelSplitter
class VectorImageDetector:
def __init__(self, similarity_threshold=0.75, splitting_mode="none", min_crop_size=200, crop_padding=20, split_mode=False):
"""Initialize the vector-based image detector using Google Vertex AI Multimodal Embeddings"""
print("Initializing Vector Image Detector with Google Vertex AI...")
# Initialize Vertex AI
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "service-account.json"
aiplatform.init(project="optical-414516", location="us-central1")
# Initialize multimodal embedding model
self.model = MultiModalEmbeddingModel.from_pretrained("multimodalembedding@001")
# Configuration
self.similarity_threshold = similarity_threshold
self.splitting_mode = splitting_mode
self.min_crop_size = min_crop_size
self.crop_padding = crop_padding
self.split_mode = split_mode
# Split mode configuration
if self.split_mode:
self.splitter = PanelSplitter(debug=True)
print("Split mode enabled: Will split multi-panel layouts before matching")
# Paths
self.master_images_path = Path("master_images")
self.layouts_path = Path("layouts")
self.results_path = Path("results")
self.embeddings_cache_path = Path("embeddings_cache")
self.crops_debug_path = Path("crops_debug")
# Create directories
self.results_path.mkdir(exist_ok=True)
self.embeddings_cache_path.mkdir(exist_ok=True)
self.crops_debug_path.mkdir(exist_ok=True)
# Master images data
self.master_images = {}
self.master_files = {}
self.master_embeddings = {}
print(f"✓ Vector detector initialized with similarity threshold: {similarity_threshold}")
print(f"✓ Splitting mode: {splitting_mode}, Min crop size: {min_crop_size}px")
def load_master_images(self) -> Dict[str, str]:
"""Load all master images and create ID mapping using filenames"""
print("Loading master images...")
master_files = list(self.master_images_path.glob("*.jpg"))
print(f"Found {len(master_files)} master images")
for file_path in master_files:
master_id = file_path.stem
self.master_images[master_id] = str(file_path)
self.master_files[master_id] = file_path.name
return self.master_images
def generate_image_embedding(self, image_path: str) -> np.ndarray:
"""Generate 1408-dimensional embedding for an image using Vertex AI"""
try:
from vertexai.vision_models import Image as VertexImage
# Create Vertex AI Image object directly from file path
vertex_image = VertexImage.load_from_file(image_path)
# Get embedding from Vertex AI
response = self.model.get_embeddings(image=vertex_image)
# Extract the embedding vector (1408 dimensions)
embedding = np.array(response.image_embedding)
return embedding
except Exception as e:
print(f"Error generating embedding for {Path(image_path).name}: {e}")
return None
def save_embedding_cache(self, embeddings: Dict, filename: str):
"""Save embeddings to cache file"""
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(embeddings, f)
print(f"Embeddings cached to: {cache_file}")
def load_embedding_cache(self, filename: str) -> Optional[Dict]:
"""Load embeddings from cache file"""
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
if cache_file.exists():
try:
with open(cache_file, 'rb') as f:
embeddings = pickle.load(f)
print(f"Loaded cached embeddings from: {cache_file}")
return embeddings
except Exception as e:
print(f"Error loading cached embeddings: {e}")
return None
def generate_master_embeddings(self, force_regenerate=False) -> Dict[str, np.ndarray]:
"""Generate embeddings for all master images (with caching)"""
cache_filename = "master_embeddings"
# Try to load from cache first
if not force_regenerate:
cached_embeddings = self.load_embedding_cache(cache_filename)
if cached_embeddings is not None:
# Verify all master images are in cache
if set(cached_embeddings.keys()) == set(self.master_images.keys()):
self.master_embeddings = cached_embeddings
print(f"✓ Using cached embeddings for {len(cached_embeddings)} master images")
return self.master_embeddings
else:
print("Cache incomplete, regenerating embeddings...")
print(f"Generating embeddings for {len(self.master_images)} master images...")
self.master_embeddings = {}
for i, (master_id, image_path) in enumerate(self.master_images.items(), 1):
print(f" {i}/{len(self.master_images)}: Generating embedding for {master_id}")
embedding = self.generate_image_embedding(image_path)
if embedding is not None:
self.master_embeddings[master_id] = embedding
# Small delay to avoid rate limiting
if i < len(self.master_images):
time.sleep(0.1)
# Cache the embeddings
if self.master_embeddings:
self.save_embedding_cache(self.master_embeddings, cache_filename)
print(f"✓ Generated embeddings for {len(self.master_embeddings)} master images")
return self.master_embeddings
def compute_cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""Compute cosine similarity between two embeddings"""
# Normalize the embeddings
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
# Compute cosine similarity
similarity = np.dot(embedding1, embedding2) / (norm1 * norm2)
return float(similarity)
def detect_layout_type(self, image_path: str) -> str:
"""Analyze layout image to determine if it's single image or composite"""
try:
img = cv2.imread(image_path)
height, width = img.shape[:2]
# Convert to grayscale for analysis
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Detect edges to find potential separators
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
# Look for strong vertical lines (panel separators)
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, height // 10))
vertical_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, vertical_kernel)
# Look for strong horizontal lines (row separators)
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (width // 10, 1))
horizontal_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, horizontal_kernel)
# Count significant vertical and horizontal structures
vertical_density = np.sum(vertical_lines) / (height * width)
horizontal_density = np.sum(horizontal_lines) / (height * width)
# Determine layout type based on structure
if vertical_density > 0.01 or horizontal_density > 0.01:
return "composite"
else:
return "single"
except Exception as e:
print(f"Error analyzing layout type for {Path(image_path).name}: {e}")
return "single" # Default to single if analysis fails
def split_image_by_grid(self, image_path: str) -> List[Dict]:
"""Split composite image into individual components using grid detection"""
try:
layout_name = Path(image_path).name
print(f" Analyzing grid structure for {layout_name}")
# Load image
img = cv2.imread(image_path)
height, width = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Edge detection for finding separators
edges = cv2.Canny(gray, 30, 100, apertureSize=3)
# Detect vertical separators (for horizontal panels)
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, height // 8))
vertical_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, vertical_kernel)
# Find vertical separator positions
vertical_projection = np.sum(vertical_lines, axis=0)
vertical_threshold = np.max(vertical_projection) * 0.6 # More strict threshold
vertical_separators = []
for x in range(width):
if vertical_projection[x] > vertical_threshold:
# Check if this is a new separator (not adjacent to previous)
if not vertical_separators or x - vertical_separators[-1] > 50: # Larger gap requirement
vertical_separators.append(x)
# Detect horizontal separators (for stacked layouts)
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (width // 8, 1))
horizontal_lines = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, horizontal_kernel)
# Find horizontal separator positions
horizontal_projection = np.sum(horizontal_lines, axis=1)
horizontal_threshold = np.max(horizontal_projection) * 0.6 # More strict threshold
horizontal_separators = []
for y in range(height):
if horizontal_projection[y] > horizontal_threshold:
if not horizontal_separators or y - horizontal_separators[-1] > 50: # Larger gap requirement
horizontal_separators.append(y)
# Generate crop regions based on detected separators
crops = []
# Add image boundaries to separator lists
x_boundaries = [0] + vertical_separators + [width]
y_boundaries = [0] + horizontal_separators + [height]
# Remove duplicates and sort
x_boundaries = sorted(list(set(x_boundaries)))
y_boundaries = sorted(list(set(y_boundaries)))
print(f" Found {len(x_boundaries)-1} x {len(y_boundaries)-1} grid sections")
# For horizontal layouts, prefer fallback splitting if grid creates too many small sections
total_sections = (len(x_boundaries)-1) * (len(y_boundaries)-1)
is_wide_horizontal = width > height * 1.5
if is_wide_horizontal and total_sections > 20:
print(f" Grid too complex ({total_sections} sections), using horizontal splitting instead")
crops = self.fallback_split_image(img, width, height)
else:
# Generate all possible rectangular crops
for i in range(len(y_boundaries) - 1):
for j in range(len(x_boundaries) - 1):
y1, y2 = y_boundaries[i], y_boundaries[i + 1]
x1, x2 = x_boundaries[j], x_boundaries[j + 1]
# Add padding and ensure boundaries
x1 = max(0, x1 - self.crop_padding)
y1 = max(0, y1 - self.crop_padding)
x2 = min(width, x2 + self.crop_padding)
y2 = min(height, y2 + self.crop_padding)
crop_width = x2 - x1
crop_height = y2 - y1
# Filter out crops that are too small
if crop_width >= self.min_crop_size and crop_height >= self.min_crop_size:
crop_area = crop_width * crop_height
total_area = width * height
area_ratio = crop_area / total_area
crops.append({
'bbox': (x1, y1, x2, y2),
'width': crop_width,
'height': crop_height,
'area_ratio': area_ratio,
'crop_id': f"grid_{i}_{j}"
})
# If no good crops found, try fallback splitting
if not crops:
print(f" No grid detected, trying fallback splitting")
crops = self.fallback_split_image(img, width, height)
print(f" Generated {len(crops)} crops for analysis")
return crops
except Exception as e:
print(f"Error splitting image {Path(image_path).name}: {e}")
return []
def fallback_split_image(self, img, width: int, height: int) -> List[Dict]:
"""Improved horizontal splitting focusing on major structural separators"""
crops = []
# Only process wide images for horizontal splitting
if width <= height * 1.2:
print(f" Image not wide enough for horizontal splitting, treating as single panel")
crops.append({
'bbox': (0, 0, width, height),
'width': width,
'height': height,
'area_ratio': 1.0,
'crop_id': "single"
})
return crops
print(f" Using improved horizontal splitting for {width}x{height} image")
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Method 1: Structural edge detection for full-height separators
edges = cv2.Canny(gray, 30, 100)
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, height // 3))
vertical_edges = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, vertical_kernel)
edge_projection = np.sum(vertical_edges, axis=0)
# Method 2: Intensity histogram analysis
horizontal_hist = np.sum(gray, axis=0)
inverted_hist = np.max(horizontal_hist) - horizontal_hist
# Smooth both signals
from scipy.ndimage import gaussian_filter1d
smoothed_edges = gaussian_filter1d(edge_projection, sigma=15)
smoothed_hist = gaussian_filter1d(inverted_hist, sigma=15)
# Adaptive parameters based on image size
if width < 2000:
min_panel_width = width // 4 # At least 25% per panel
max_panels = 3
elif width < 5000:
min_panel_width = width // 6 # At least 16% per panel
max_panels = 6
else:
min_panel_width = width // 12 # At least 8% per panel
max_panels = 15
print(f" Min panel width: {min_panel_width}px, Max panels: {max_panels}")
# Find separator candidates with conservative thresholds
edge_threshold = np.max(smoothed_edges) * 0.5 # Higher threshold for stronger edges
hist_threshold = np.max(smoothed_hist) * 0.4 # Higher threshold for deeper valleys
from scipy.signal import find_peaks
# Edge-based separators
edge_peaks, _ = find_peaks(smoothed_edges,
distance=min_panel_width,
height=edge_threshold,
prominence=np.max(smoothed_edges) * 0.3)
# Histogram-based separators
hist_peaks, _ = find_peaks(smoothed_hist,
distance=min_panel_width,
height=hist_threshold,
prominence=np.max(smoothed_hist) * 0.2)
print(f" Edge peaks: {len(edge_peaks)}, Histogram peaks: {len(hist_peaks)}")
# Combine separators and filter boundary areas
all_separators = set(edge_peaks) | set(hist_peaks)
boundary_margin = width * 0.1 # 10% margin from edges
valid_separators = [s for s in all_separators
if boundary_margin < s < width - boundary_margin]
# Remove separators too close to each other
valid_separators = sorted(valid_separators)
final_separators = []
for sep in valid_separators:
if not final_separators or sep - final_separators[-1] >= min_panel_width:
final_separators.append(sep)
# Limit to reasonable number of panels and keep strongest separators
if len(final_separators) >= max_panels:
separator_scores = []
for sep in final_separators:
edge_score = smoothed_edges[sep] if sep < len(smoothed_edges) else 0
hist_score = smoothed_hist[sep] if sep < len(smoothed_hist) else 0
combined_score = edge_score + hist_score
separator_scores.append((sep, combined_score))
separator_scores.sort(key=lambda x: x[1], reverse=True)
final_separators = [s[0] for s in separator_scores[:max_panels-1]]
final_separators.sort()
print(f" Final separators: {final_separators}")
# Create crops
x_boundaries = [0] + final_separators + [width]
for i in range(len(x_boundaries) - 1):
x1, x2 = x_boundaries[i], x_boundaries[i + 1]
if x2 - x1 >= self.min_crop_size:
crops.append({
'bbox': (x1, 0, x2, height),
'width': x2 - x1,
'height': height,
'area_ratio': (x2 - x1) / width,
'crop_id': f"panel_{i}"
})
print(f" Generated {len(crops)} improved horizontal crops")
return crops
def save_crop_debug_images(self, image_path: str, crops: List[Dict]):
"""Save cropped images for debugging purposes"""
try:
layout_name = Path(image_path).stem
img = cv2.imread(image_path)
for i, crop in enumerate(crops):
x1, y1, x2, y2 = crop['bbox']
cropped = img[y1:y2, x1:x2]
debug_filename = f"{layout_name}_crop_{i}_{crop['crop_id']}.jpg"
debug_path = self.crops_debug_path / debug_filename
cv2.imwrite(str(debug_path), cropped)
except Exception as e:
print(f"Warning: Failed to save debug crops: {e}")
def generate_crop_embedding(self, image_path: str, crop_info: Dict) -> Optional[np.ndarray]:
"""Generate embedding for a specific crop of an image"""
try:
# Load full image
img = cv2.imread(image_path)
# Extract crop region
x1, y1, x2, y2 = crop_info['bbox']
cropped_img = img[y1:y2, x1:x2]
# Save crop to temporary file for embedding generation
temp_crop_path = self.crops_debug_path / f"temp_crop.jpg"
cv2.imwrite(str(temp_crop_path), cropped_img)
# Generate embedding for crop
embedding = self.generate_image_embedding(str(temp_crop_path))
# Clean up temp file
if temp_crop_path.exists():
temp_crop_path.unlink()
return embedding
except Exception as e:
print(f"Error generating crop embedding: {e}")
return None
def detect_masters_in_layout_vector(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
"""Detect which master images appear in a layout using vector similarity with optional splitting"""
layout_name = Path(layout_path).name
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Vector mode: {self.splitting_mode})")
try:
# Step 1: Determine if we should use splitting
if self.splitting_mode == "none":
return self.detect_whole_image(layout_path, layout_name)
elif self.splitting_mode == "auto":
layout_type = self.detect_layout_type(layout_path)
if layout_type == "single":
return self.detect_whole_image(layout_path, layout_name)
else:
return self.detect_with_splitting(layout_path, layout_name)
elif self.splitting_mode == "grid":
return self.detect_with_splitting(layout_path, layout_name)
else:
# Default to whole image
return self.detect_whole_image(layout_path, layout_name)
except Exception as e:
error_msg = f"Error analyzing {layout_name} with vector embeddings: {e}"
print(error_msg)
return {
'detected_masters': [],
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': 'Vector embedding analysis failed',
'error': str(e),
'processing_mode': f'vector_embedding_{self.splitting_mode}'
}
def detect_whole_image(self, layout_path: str, layout_name: str) -> Dict:
"""Detect masters using whole image comparison"""
print(f" Processing whole image: {layout_name}")
# Generate embedding for layout image
layout_embedding = self.generate_image_embedding(layout_path)
if layout_embedding is None:
raise Exception("Failed to generate layout embedding")
# Compare with all master embeddings
similarities = {}
detected_masters = []
print(f" Comparing against {len(self.master_embeddings)} master images...")
for master_id, master_embedding in self.master_embeddings.items():
similarity = self.compute_cosine_similarity(layout_embedding, master_embedding)
similarities[master_id] = similarity
if similarity >= self.similarity_threshold:
detected_masters.append(master_id)
# Sort detected masters by similarity (highest first)
detected_masters.sort(key=lambda x: similarities[x], reverse=True)
# Create analysis text
top_similarities = sorted(similarities.items(), key=lambda x: x[1], reverse=True)[:5]
analysis_parts = [
f"Whole image vector analysis using Google Vertex AI embeddings (1408 dimensions).",
f"Similarity threshold: {self.similarity_threshold}",
f"Found {len(detected_masters)} matches above threshold.",
f"Top 5 similarities: " + ", ".join([f"{mid}({sim:.3f})" for mid, sim in top_similarities])
]
analysis = " ".join(analysis_parts)
print(f"✓ Completed {layout_name} - Found {len(detected_masters)} matches")
if detected_masters:
print(f" Matches: {', '.join(detected_masters)}")
return {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters ],
'analysis': analysis,
'similarities': dict(top_similarities),
'processing_mode': 'vector_embedding_whole',
'similarity_threshold': self.similarity_threshold,
'embedding_dimensions': 1408
}
def detect_with_splitting(self, layout_path: str, layout_name: str) -> Dict:
"""Detect masters using image splitting and crop comparison"""
print(f" Processing with grid splitting: {layout_name}")
# Step 1: Split the image into crops
crops = self.split_image_by_grid(layout_path)
if not crops:
print(f" No valid crops found, falling back to whole image")
return self.detect_whole_image(layout_path, layout_name)
# Step 2: Save debug crops if needed
self.save_crop_debug_images(layout_path, crops)
# Step 3: Process each crop
all_crop_results = []
crop_similarities = {}
for i, crop in enumerate(crops):
print(f" Processing crop {i+1}/{len(crops)} ({crop['crop_id']})")
# Generate embedding for this crop
crop_embedding = self.generate_crop_embedding(layout_path, crop)
if crop_embedding is None:
continue
# Compare crop against all masters
crop_result = {
'crop_id': crop['crop_id'],
'crop_info': crop,
'similarities': {},
'matches': []
}
for master_id, master_embedding in self.master_embeddings.items():
similarity = self.compute_cosine_similarity(crop_embedding, master_embedding)
crop_result['similarities'][master_id] = similarity
if similarity >= self.similarity_threshold:
crop_result['matches'].append(master_id)
# Sort matches by similarity
crop_result['matches'].sort(key=lambda x: crop_result['similarities'][x], reverse=True)
all_crop_results.append(crop_result)
# Track all similarities for global analysis
for master_id, sim in crop_result['similarities'].items():
if master_id not in crop_similarities or sim > crop_similarities[master_id]:
crop_similarities[master_id] = sim
# Step 4: Aggregate results across all crops
detected_masters = []
final_similarities = {}
# Collect all unique matches with their best similarity scores
for crop_result in all_crop_results:
for match in crop_result['matches']:
if match not in detected_masters:
detected_masters.append(match)
final_similarities[match] = crop_result['similarities'][match]
else:
# Update with higher similarity if found
if crop_result['similarities'][match] > final_similarities[match]:
final_similarities[match] = crop_result['similarities'][match]
# Sort by best similarity
detected_masters.sort(key=lambda x: final_similarities.get(x, 0), reverse=True)
# Get top overall similarities for analysis
top_similarities = sorted(crop_similarities.items(), key=lambda x: x[1], reverse=True)[:5]
# Create analysis
analysis_parts = [
f"Grid-based splitting analysis using Google Vertex AI embeddings (1408 dimensions).",
f"Split into {len(crops)} crops, processed {len(all_crop_results)} successfully.",
f"Similarity threshold: {self.similarity_threshold}",
f"Found {len(detected_masters)} unique matches across all crops.",
f"Top 5 similarities: " + ", ".join([f"{mid}({sim:.3f})" for mid, sim in top_similarities])
]
analysis = " ".join(analysis_parts)
print(f"✓ Completed {layout_name} - Found {len(detected_masters)} matches across {len(crops)} crops")
if detected_masters:
print(f" Matches: {', '.join(detected_masters)}")
return {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters ],
'analysis': analysis,
'similarities': dict(top_similarities),
'processing_mode': 'vector_embedding_grid',
'similarity_threshold': self.similarity_threshold,
'embedding_dimensions': 1408,
'crops_processed': len(all_crop_results),
'total_crops': len(crops),
'crop_results': all_crop_results # Detailed crop-by-crop results
}
def process_all_layouts_vector(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
"""Process all layout images using vector embeddings"""
print("Starting vector-based batch processing...")
# Load master images
self.load_master_images()
# Generate master embeddings (with caching)
self.generate_master_embeddings()
if not self.master_embeddings:
raise Exception("No master embeddings available")
# Get layout files
if specific_file:
# Process only the specific file
layout_files = [self.layouts_path / specific_file]
if not layout_files[0].exists():
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
print(f"Processing specific file: {specific_file}")
else:
layout_files = list(self.layouts_path.glob("*.jpg"))
layout_files.sort() # Ensure consistent alphabetical ordering
print(f"Found {len(layout_files)} layout files")
if layout_files:
print(f"First file will be: {layout_files[0].name}")
if limit:
layout_files = layout_files[:limit]
print(f"Processing first {limit} layouts only")
total_layouts = len(layout_files)
print(f"Processing {total_layouts} layout images using vector embeddings")
print("=" * 60)
results = {}
start_time = time.time()
for i, layout_path in enumerate(layout_files, 1):
layout_id = layout_path.stem
# Detect images using vector similarity
result = self.detect_masters_in_layout_vector(str(layout_path), i, total_layouts)
layout_result = {
'layout_filename': layout_path.name,
'detected_master_ids': result['detected_master_ids'],
'detected_master_filenames': result['detected_master_filenames'],
'analysis': result['analysis'],
'processing_mode': 'vector_embedding',
'similarity_threshold': self.similarity_threshold,
'embedding_dimensions': 1408
}
if 'similarities' in result:
layout_result['similarities'] = result['similarities']
if 'error' in result:
layout_result['error'] = result['error']
results[layout_id] = layout_result
# Progress update
elapsed = time.time() - start_time
avg_time = elapsed / i
remaining = (total_layouts - i) * avg_time
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
# Save progress periodically
if i % 20 == 0:
self.save_results(results, f"vector_progress_{i}")
total_time = time.time() - start_time
print(f"\n✓ Completed vector processing of {total_layouts} layouts in {total_time/60:.1f} minutes")
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
return results
def save_results(self, results: Dict, filename: str = "vector_detection_results") -> str:
"""Save results to JSON file"""
output_path = self.results_path / f"{filename}.json"
# Add metadata
output_data = {
'metadata': {
'total_layouts_processed': len(results),
'total_master_images': len(self.master_images),
'master_images_available': list(self.master_files.keys()),
'processing_mode': 'vector_embedding',
'similarity_threshold': self.similarity_threshold,
'embedding_dimensions': 1408,
'embedding_model': 'Google Vertex AI multimodalembedding@001'
},
'results': results
}
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"Results saved to: {output_path}")
return str(output_path)
def generate_summary(self, results: Dict) -> Dict:
"""Generate summary statistics for vector detection"""
total_layouts = len(results)
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
# Count master image occurrences
master_counts = {}
for result in results.values():
for master_id in result['detected_master_ids']:
master_counts[master_id] = master_counts.get(master_id, 0) + 1
summary = {
'total_layouts_processed': total_layouts,
'layouts_with_matches': layouts_with_matches,
'layouts_without_matches': total_layouts - layouts_with_matches,
'master_image_usage': master_counts,
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
'processing_mode': 'vector_embedding',
'similarity_threshold': self.similarity_threshold,
'embedding_dimensions': 1408
}
return summary