master_adapt_detect/hybrid_detector.py
2025-10-01 14:32:55 -05:00

2939 lines
No EOL
143 KiB
Python

#!/usr/bin/env python3
"""
Hybrid Image Detection Module
Combines OpenAI O3 panel counting with local inlier analysis for cost-efficient detection
"""
import os
import json
import time
import base64
import queue
from pathlib import Path
from typing import List, Dict, Optional
import numpy as np
import cv2
import concurrent.futures
import threading
import multiprocessing
import psutil
import pickle
from openai_detector import OpenAIImageDetector
from panel_splitter import PanelSplitter
from advanced_splitter import AdvancedPanelSplitter
from simple_splitter import SimplePanelSplitter
from memory_manager import MemoryManager, memory_safe_execution, reduce_feature_count
from cost_calculator import cost_calculator, extract_token_usage_from_response
# Vector mode imports
try:
from google.cloud import aiplatform
from vertexai.vision_models import MultiModalEmbeddingModel, Image as VertexImage
VERTEX_AI_AVAILABLE = True
except ImportError:
VERTEX_AI_AVAILABLE = False
def process_single_master_inlier_analysis(layout_path, master_id, master_path, min_good_matches=10, max_features=15000):
"""
Standalone function for processing a single master inlier analysis in a separate process.
Memory-safe version with feature limiting.
"""
try:
import cv2
import numpy as np
import psutil
import gc
import os
from pathlib import Path
# Note: cost_calculator import removed from multiprocessing function
# Initialize OpenCV components in this process
akaze = cv2.AKAZE_create()
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
# Load images in grayscale for feature detection
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
master_img = cv2.imread(master_path, cv2.IMREAD_GRAYSCALE)
if layout_img is None or master_img is None:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'error': 'Could not read one or both images'
}
# Check memory before feature detection
memory = psutil.virtual_memory()
if memory.percent > 85:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'error': f'Memory usage too high: {memory.percent:.1f}%'
}
# Detect keypoints and descriptors
kp1, des1 = akaze.detectAndCompute(layout_img, None)
kp2, des2 = akaze.detectAndCompute(master_img, None)
if des1 is None or des2 is None:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'error': 'No features detected in one or both images'
}
# Limit features to prevent memory explosion
if len(kp1) > max_features:
# Keep best features based on response
responses = [kp.response for kp in kp1]
indices = np.argsort(responses)[-max_features:]
kp1 = [kp1[i] for i in indices]
des1 = des1[indices]
if len(kp2) > max_features:
responses = [kp.response for kp in kp2]
indices = np.argsort(responses)[-max_features:]
kp2 = [kp2[i] for i in indices]
des2 = des2[indices]
# Match features using k-nearest neighbors
matches = bf.knnMatch(des1, des2, k=2)
# Apply Lowe's ratio test to filter good matches
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.80 * n.distance:
good_matches.append(m)
if len(good_matches) < min_good_matches:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'good_matches': len(good_matches),
'reason': f'Insufficient good matches: {len(good_matches)} < {min_good_matches}'
}
# Extract matched points
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
# Find homography using RANSAC
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 7.0)
if mask is None:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'good_matches': len(good_matches),
'error': 'Homography estimation failed'
}
# Count inliers
inliers = int(np.sum(mask))
# Determine confidence based on inlier count and ratio
inlier_ratio = inliers / len(good_matches)
if inliers >= 30 and inlier_ratio >= 0.5:
confidence = 'high'
elif inliers >= 15 and inlier_ratio >= 0.3:
confidence = 'medium'
else:
confidence = 'low'
return {
'master_id': master_id,
'inliers': inliers,
'confidence': confidence,
'good_matches': len(good_matches),
'inlier_ratio': round(inlier_ratio, 3),
'total_features_layout': len(kp1),
'total_features_master': len(kp2)
}
except Exception as e:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'error': str(e)
}
class InlierAnalysisCoordinator:
"""
Coordinates serial execution of inlier analysis tasks while allowing parallel layout processing.
Ensures that only one inlier analysis runs at a time to avoid overwhelming the system.
"""
def __init__(self, local_workers, memory_manager, min_good_matches=10):
self.local_workers = local_workers
self.memory_manager = memory_manager
self.min_good_matches = min_good_matches
# Task coordination
self.task_queue = queue.Queue()
self.worker_thread = None
self.shutdown_event = threading.Event()
self.active_analysis_lock = threading.Lock()
# Statistics
self.total_tasks_processed = 0
self.current_task_info = None
self.stats_lock = threading.Lock()
def start(self):
"""Start the inlier analysis worker thread"""
if self.worker_thread is None or not self.worker_thread.is_alive():
self.shutdown_event.clear()
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
print(f"InlierAnalysisCoordinator started with {self.local_workers} workers")
def stop(self):
"""Stop the inlier analysis worker thread"""
if self.worker_thread and self.worker_thread.is_alive():
self.shutdown_event.set()
self.worker_thread.join(timeout=5.0)
print("InlierAnalysisCoordinator stopped")
def submit_analysis(self, layout_id, analysis_type, analysis_params, result_future):
"""
Submit an inlier analysis task to the serial queue
Args:
layout_id: Unique identifier for the layout
analysis_type: Type of analysis ('direct' or 'split')
analysis_params: Parameters for the analysis
result_future: Future object to signal completion
"""
task = {
'layout_id': layout_id,
'analysis_type': analysis_type,
'analysis_params': analysis_params,
'result_future': result_future,
'submitted_at': time.time()
}
self.task_queue.put(task)
print(f" → Submitted {analysis_type} analysis for {layout_id} to inlier queue (queue size: {self.task_queue.qsize()})")
def get_queue_size(self):
"""Get current queue size"""
return self.task_queue.qsize()
def get_current_task_info(self):
"""Get information about currently processing task"""
with self.stats_lock:
return self.current_task_info.copy() if self.current_task_info else None
def _worker_loop(self):
"""Main worker loop - processes inlier analysis tasks serially"""
print("InlierAnalysisCoordinator worker loop started")
while not self.shutdown_event.is_set():
try:
# Get next task with timeout
task = self.task_queue.get(timeout=1.0)
# Process the task while holding the analysis lock
with self.active_analysis_lock:
self._process_inlier_analysis_task(task)
# Mark task as done
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Error in inlier analysis worker loop: {e}")
# Continue processing other tasks
continue
def _process_inlier_analysis_task(self, task):
"""Process a single inlier analysis task"""
layout_id = task['layout_id']
analysis_type = task['analysis_type']
analysis_params = task['analysis_params']
result_future = task['result_future']
# Update current task info
with self.stats_lock:
self.current_task_info = {
'layout_id': layout_id,
'analysis_type': analysis_type,
'started_at': time.time(),
'queue_wait_time': time.time() - task['submitted_at']
}
print(f" → Processing {analysis_type} inlier analysis for {layout_id} (queue wait: {self.current_task_info['queue_wait_time']:.1f}s)")
try:
# Perform the actual analysis based on type
if analysis_type == 'direct':
result = self._perform_direct_inlier_analysis(layout_id, analysis_params)
elif analysis_type == 'split':
result = self._perform_split_inlier_analysis(layout_id, analysis_params)
else:
raise ValueError(f"Unknown analysis type: {analysis_type}")
# Signal completion with result
result_future.set_result(result)
# Update statistics
with self.stats_lock:
self.total_tasks_processed += 1
processing_time = time.time() - self.current_task_info['started_at']
print(f" → Completed {analysis_type} analysis for {layout_id} in {processing_time:.1f}s")
self.current_task_info = None
except Exception as e:
print(f" → Error processing {analysis_type} analysis for {layout_id}: {e}")
# Signal error to the waiting layout worker
result_future.set_exception(e)
# Clear current task info
with self.stats_lock:
self.current_task_info = None
def _perform_direct_inlier_analysis(self, layout_id, params):
"""Perform direct inlier analysis on a layout"""
layout_path = params['layout_path']
master_images = params['master_images']
vector_mode = params.get('vector_mode', False)
if vector_mode:
return self._perform_vector_similarity_analysis(layout_path, params)
else:
return self._perform_opencv_inlier_analysis(layout_path, master_images, params)
def _perform_split_inlier_analysis(self, layout_id, params):
"""Perform inlier analysis on split panels"""
split_panels = params['split_panels']
master_images = params['master_images']
vector_mode = params.get('vector_mode', False)
if vector_mode:
return self._perform_split_vector_analysis(split_panels, params)
else:
return self._perform_split_opencv_analysis(split_panels, master_images, params)
def _perform_opencv_inlier_analysis(self, layout_path, master_images, params):
"""Perform OpenCV-based inlier analysis using existing multiprocessing logic"""
try:
# Initialize OpenCV components
akaze = cv2.AKAZE_create()
# Load layout image for feature detection
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
if layout_img is None:
raise Exception(f"Could not load layout image: {layout_path}")
# Detect features in layout image
layout_kp, layout_desc = akaze.detectAndCompute(layout_img, None)
if layout_desc is None:
raise Exception("No features detected in layout image")
feature_count = len(layout_kp)
# Dynamically adjust worker count based on feature count and memory
safe_workers = self._calculate_safe_worker_count(feature_count)
# Prepare tasks for multiprocessing
tasks = []
max_features = min(10000, feature_count // 2) if feature_count > 20000 else 10000
for master_id, master_path in master_images.items():
tasks.append((layout_path, master_id, master_path, self.min_good_matches, max_features))
# Process masters in parallel using existing function
master_results = []
with concurrent.futures.ProcessPoolExecutor(max_workers=safe_workers) as executor:
future_to_master = {
executor.submit(process_single_master_inlier_analysis, *task_args): task_args[1]
for task_args in tasks
}
for future in concurrent.futures.as_completed(future_to_master):
master_id = future_to_master[future]
try:
result = future.result()
if result.get('inliers', 0) > 0 and result.get('confidence', 'low') != 'low':
master_results.append({
'master_id': master_id,
'inliers': result.get('inliers', 0),
'confidence': result.get('confidence', 'low'),
'details': result
})
except Exception as e:
print(f" → Error processing master {master_id}: {e}")
# Sort and process results
master_results.sort(key=lambda x: x['inliers'], reverse=True)
# Apply thresholds
detected_masters = []
inlier_scores = {}
if master_results:
best_match = master_results[0]
best_inliers = best_match['inliers']
inlier_threshold = params.get('inlier_threshold', 0.65)
inlier_ratio_threshold = params.get('inlier_ratio_threshold', 0.4)
min_inliers = max(self.min_good_matches,
best_inliers * inlier_ratio_threshold,
int(best_inliers * inlier_threshold))
for result in master_results:
inliers = result['inliers']
confidence = result['confidence']
master_id = result['master_id']
inlier_scores[master_id] = inliers
if (inliers >= min_inliers and
confidence in ['high', 'medium'] and
inliers >= self.min_good_matches):
detected_masters.append(master_id)
return {
'detected_masters': detected_masters,
'inlier_scores': inlier_scores,
'total_masters_checked': len(master_images),
'potential_matches_found': len(master_results),
'analysis_mode': 'opencv_inlier_analysis',
'layout_features': feature_count,
'workers_used': safe_workers
}
except Exception as e:
return {
'detected_masters': [],
'inlier_scores': {},
'error': str(e),
'analysis_mode': 'opencv_inlier_analysis_error'
}
def _perform_vector_similarity_analysis(self, layout_path, params):
"""Perform vector similarity analysis"""
try:
embedding_model = params['embedding_model']
master_embeddings = params['master_embeddings']
similarity_threshold = params.get('similarity_threshold', 0.75)
# Generate embedding for layout
layout_embedding = self._generate_layout_embedding(layout_path, embedding_model)
if layout_embedding is None:
raise Exception("Failed to generate layout embedding")
# Compare with master embeddings
similarities = {}
detected_masters = []
for master_id, master_embedding in master_embeddings.items():
similarity = self._compute_cosine_similarity(layout_embedding, master_embedding)
similarities[master_id] = similarity
if similarity >= similarity_threshold:
detected_masters.append(master_id)
# Sort by similarity
detected_masters.sort(key=lambda x: similarities[x], reverse=True)
return {
'detected_masters': detected_masters,
'inlier_scores': similarities, # Use similarities as scores for compatibility
'total_masters_checked': len(master_embeddings),
'analysis_mode': 'vector_similarity_analysis',
'similarity_threshold': similarity_threshold
}
except Exception as e:
return {
'detected_masters': [],
'inlier_scores': {},
'error': str(e),
'analysis_mode': 'vector_similarity_analysis_error'
}
def _perform_split_opencv_analysis(self, split_panels, master_images, params):
"""Perform OpenCV inlier analysis on split panels"""
try:
# Prepare tasks for all split panels against all masters
tasks = []
split_panel_paths = []
for i, split_info in enumerate(split_panels):
# Save split panel to temporary file
split_path = f"/tmp/split_panel_{i}_{int(time.time())}.jpg"
cv2.imwrite(split_path, split_info['image'])
split_panel_paths.append(split_path)
# Create tasks for this split panel against all masters
for master_id, master_path in master_images.items():
tasks.append((split_path, master_id, master_path, self.min_good_matches, 15000))
# Process all tasks in parallel
all_results = []
with concurrent.futures.ProcessPoolExecutor(max_workers=self.local_workers) as executor:
future_to_task = {
executor.submit(process_single_master_inlier_analysis, *task_args): task_args
for task_args in tasks
}
for future in concurrent.futures.as_completed(future_to_task):
try:
result = future.result()
if result.get('confidence') != 'low' and result.get('inliers', 0) > 0:
all_results.append(result)
except Exception as e:
print(f" → Error processing split task: {e}")
# Clean up temporary files
for split_path in split_panel_paths:
try:
os.remove(split_path)
except:
pass
# Process results
detected_masters = []
inlier_scores = {}
if all_results:
all_results.sort(key=lambda x: x['inliers'], reverse=True)
# Apply thresholds
best_inliers = all_results[0]['inliers']
inlier_threshold = params.get('inlier_threshold', 0.65)
inlier_ratio_threshold = params.get('inlier_ratio_threshold', 0.4)
min_inliers = max(self.min_good_matches,
best_inliers * inlier_ratio_threshold,
int(best_inliers * inlier_threshold))
for result in all_results:
master_id = result['master_id']
inliers = result['inliers']
# Keep highest score for each master
if master_id not in inlier_scores or inliers > inlier_scores[master_id]:
inlier_scores[master_id] = inliers
if (inliers >= min_inliers and
result['confidence'] in ['high', 'medium'] and
inliers >= self.min_good_matches and
master_id not in detected_masters):
detected_masters.append(master_id)
return {
'detected_masters': detected_masters,
'inlier_scores': inlier_scores,
'total_combinations_processed': len(tasks),
'potential_matches_found': len(all_results),
'analysis_mode': 'split_opencv_analysis',
'splits_processed': len(split_panels)
}
except Exception as e:
return {
'detected_masters': [],
'inlier_scores': {},
'error': str(e),
'analysis_mode': 'split_opencv_analysis_error'
}
def _perform_split_vector_analysis(self, split_panels, params):
"""Perform vector similarity analysis on split panels"""
try:
embedding_model = params['embedding_model']
master_embeddings = params['master_embeddings']
similarity_threshold = params.get('similarity_threshold', 0.75)
all_results = []
split_panel_paths = []
for i, split_info in enumerate(split_panels):
# Save split panel to temporary file
split_path = f"/tmp/split_panel_{i}_{int(time.time())}.jpg"
cv2.imwrite(split_path, split_info['image'])
split_panel_paths.append(split_path)
# Generate embedding for this split panel
split_embedding = self._generate_layout_embedding(split_path, embedding_model)
if split_embedding is None:
continue
# Compare against all master embeddings
for master_id, master_embedding in master_embeddings.items():
similarity = self._compute_cosine_similarity(split_embedding, master_embedding)
if similarity >= similarity_threshold:
all_results.append({
'master_id': master_id,
'similarity': similarity,
'split_panel': i
})
# Clean up temporary files
for split_path in split_panel_paths:
try:
os.remove(split_path)
except:
pass
# Process results
detected_masters = []
similarity_scores = {}
for result in all_results:
master_id = result['master_id']
similarity = result['similarity']
# Keep highest similarity for each master
if master_id not in similarity_scores or similarity > similarity_scores[master_id]:
similarity_scores[master_id] = similarity
if master_id not in detected_masters:
detected_masters.append(master_id)
return {
'detected_masters': detected_masters,
'inlier_scores': similarity_scores, # Use similarities as scores for compatibility
'total_combinations_processed': len(split_panels) * len(master_embeddings),
'potential_matches_found': len(all_results),
'analysis_mode': 'split_vector_analysis',
'splits_processed': len(split_panels),
'similarity_threshold': similarity_threshold
}
except Exception as e:
return {
'detected_masters': [],
'inlier_scores': {},
'error': str(e),
'analysis_mode': 'split_vector_analysis_error'
}
def _calculate_safe_worker_count(self, feature_count):
"""Calculate safe worker count based on feature count and memory"""
if feature_count > 50000:
safe_workers = max(1, self.local_workers // 2)
elif feature_count > 30000:
safe_workers = max(1, int(self.local_workers * 0.75))
else:
safe_workers = self.local_workers
# Further limit based on memory
if self.memory_manager:
safe_workers = min(safe_workers, self.memory_manager.limit_concurrent_processes(safe_workers))
return safe_workers
def _generate_layout_embedding(self, layout_path, embedding_model):
"""Generate embedding for layout image"""
try:
from vertexai.vision_models import Image as VertexImage
vertex_image = VertexImage.load_from_file(layout_path)
response = embedding_model.get_embeddings(image=vertex_image)
return np.array(response.image_embedding)
except Exception as e:
print(f" → Error generating embedding: {e}")
return None
def _compute_cosine_similarity(self, embedding1, embedding2):
"""Compute cosine similarity between two embeddings"""
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(np.dot(embedding1, embedding2) / (norm1 * norm2))
class ProgressTracker:
"""
Thread-safe progress tracking for parallel layout processing
"""
def __init__(self, total_layouts):
self.total_layouts = total_layouts
self.completed_layouts = 0
self.in_progress_layouts = 0
self.failed_layouts = 0
self.inlier_queue_size = 0
self.lock = threading.Lock()
self.start_time = time.time()
def start_layout(self):
"""Mark a layout as started"""
with self.lock:
self.in_progress_layouts += 1
def complete_layout(self, success=True):
"""Mark a layout as completed"""
with self.lock:
self.in_progress_layouts -= 1
if success:
self.completed_layouts += 1
else:
self.failed_layouts += 1
def update_queue_size(self, size):
"""Update inlier queue size"""
with self.lock:
self.inlier_queue_size = size
def get_progress_info(self):
"""Get current progress information"""
with self.lock:
elapsed = time.time() - self.start_time
completed = self.completed_layouts
if completed > 0:
avg_time = elapsed / completed
remaining = (self.total_layouts - completed) * avg_time
eta_mins = remaining / 60
else:
eta_mins = 0
return {
'total': self.total_layouts,
'completed': completed,
'in_progress': self.in_progress_layouts,
'failed': self.failed_layouts,
'queue_size': self.inlier_queue_size,
'elapsed_mins': elapsed / 60,
'eta_mins': eta_mins,
'percentage': (completed / self.total_layouts * 100) if self.total_layouts > 0 else 0
}
def print_progress(self):
"""Print current progress"""
info = self.get_progress_info()
print(f"Progress: {info['completed']}/{info['total']} ({info['percentage']:.1f}%) "
f"| In Progress: {info['in_progress']} | Failed: {info['failed']} "
f"| Queue: {info['queue_size']} | ETA: {info['eta_mins']:.1f}min")
class HybridImageDetector(OpenAIImageDetector):
def __init__(self, panel_threshold=2, inlier_threshold=0.65,
enable_greyscale=False, enable_contrast_enhancement=False,
min_good_matches=10, inlier_ratio_threshold=0.4,
openai_workers=None, local_workers=None, split_mode=False,
split_advanced=False, split_simple=False, percentile=10, min_gap=5,
vector_mode=False, similarity_threshold=0.75, fallback_one_at_a_time=False,
parallel_layouts=False, layout_workers=None, max_concurrent_layouts=None,
no_truncation=False, **kwargs):
"""
Initialize the hybrid image detector
Args:
panel_threshold: Maximum panels to use local analysis (default: 2)
inlier_threshold: Minimum similarity threshold for local analysis (default: 0.65)
enable_greyscale: Enable greyscale processing (default: False for hybrid mode)
enable_contrast_enhancement: Enable contrast enhancement (default: False for hybrid mode)
min_good_matches: Minimum good matches for RANSAC (default: 10)
inlier_ratio_threshold: Minimum inlier ratio for confident matches (default: 0.4)
openai_workers: Number of workers for OpenAI analysis (default: auto-detect)
local_workers: Number of workers for local analysis (default: auto-detect)
split_mode: Enable traditional panel splitting
split_advanced: Enable advanced panel splitting with edge detection
split_simple: Enable simple panel splitting with even division (hybrid mode only)
percentile: Percentile threshold for gutter detection in advanced splitting (default: 10)
min_gap: Minimum gap size for gutter detection in advanced splitting (default: 5)
vector_mode: Enable vector similarity search instead of inlier analysis (default: False)
similarity_threshold: Similarity threshold for vector mode (default: 0.75)
fallback_one_at_a_time: Enable fallback to OpenAI one-at-a-time when matched masters < detected panels (default: False)
parallel_layouts: Enable parallel layout processing with serial inlier analysis coordination (default: False)
layout_workers: Number of concurrent layout workers for parallel processing (default: auto-detect)
max_concurrent_layouts: Maximum layouts processing simultaneously (default: same as layout_workers)
no_truncation: Disable truncation of match results (keeps all matches instead of limiting to panel count) (default: False)
"""
# Initialize parent class with OpenAI workers
super().__init__(
enable_greyscale=enable_greyscale,
enable_contrast_enhancement=enable_contrast_enhancement,
max_concurrent_workers=openai_workers or 5, # Temporary, will be updated after loading masters
split_mode=split_mode,
**kwargs
)
self.panel_threshold = panel_threshold
self.inlier_threshold = inlier_threshold
self.min_good_matches = min_good_matches
self.inlier_ratio_threshold = inlier_ratio_threshold
self.split_advanced = split_advanced
self.split_simple = split_simple
self.percentile = percentile
# Vector mode configuration
self.vector_mode = vector_mode
self.similarity_threshold = similarity_threshold
# Fallback configuration
self.fallback_one_at_a_time = fallback_one_at_a_time
# Parallel processing configuration
self.parallel_layouts = parallel_layouts
self.layout_workers = layout_workers
self.max_concurrent_layouts = max_concurrent_layouts
# Truncation configuration
self.no_truncation = no_truncation
# Initialize memory manager
self.memory_manager = MemoryManager(max_memory_percent=75, max_swap_percent=80)
self.min_gap = min_gap
# Store worker configurations for later auto-detection
self._openai_workers_config = openai_workers
self._local_workers_config = local_workers
self._layout_workers_config = layout_workers
# Initialize worker attributes with temporary values for CLI display
self.openai_workers = openai_workers or "auto"
self.local_workers = local_workers or "auto"
# Initialize parallel processing components
self.inlier_coordinator = None
self.progress_tracker = None
# Initialize OpenCV components for local analysis (only if not using vector mode)
if not self.vector_mode:
self.akaze = cv2.AKAZE_create()
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
# Initialize vector mode components if enabled
if self.vector_mode:
if not VERTEX_AI_AVAILABLE:
raise ImportError("Google Vertex AI libraries not available. Please install: pip install google-cloud-aiplatform")
# Initialize Vertex AI
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "service-account.json"
aiplatform.init(project="optical-414516", location="us-central1")
# Initialize multimodal embedding model
self.embedding_model = MultiModalEmbeddingModel.from_pretrained("multimodalembedding@001")
# Initialize vector-specific attributes
self.master_embeddings = {}
self.embeddings_cache_path = Path("embeddings_cache")
self.embeddings_cache_path.mkdir(exist_ok=True)
print(f"Vector mode initialized: Using Google Vertex AI multimodalembedding@001")
analysis_method = "vector similarity" if self.vector_mode else "local inlier analysis"
print(f"Hybrid detector initialized:")
print(f" Panel threshold: {panel_threshold} (≤{panel_threshold} panels → {analysis_method})")
print(f" Inlier threshold: {inlier_threshold}")
print(f" Vector mode: {self.vector_mode}")
if self.vector_mode:
print(f" Similarity threshold: {similarity_threshold}")
print(f" Greyscale processing: {enable_greyscale}")
print(f" Contrast enhancement: {enable_contrast_enhancement}")
print(f" Worker configuration: OpenAI={openai_workers or 'auto'}, Local={local_workers or 'auto'}")
def _configure_worker_counts(self):
"""Configure optimal worker counts based on system and dataset"""
import os
# Auto-detect OpenAI workers (number of master images for optimal parallelization)
if self._openai_workers_config is None:
self.openai_workers = len(self.master_images)
print(f" OpenAI workers auto-detected: {self.openai_workers} (total masters)")
else:
self.openai_workers = self._openai_workers_config
print(f" OpenAI workers configured: {self.openai_workers}")
# Auto-detect local workers (number of CPU cores minus 2 for optimal performance)
if self._local_workers_config is None:
self.local_workers = max(1, os.cpu_count() - 2)
print(f" Local workers auto-detected: {self.local_workers} (CPU cores - 2)")
else:
self.local_workers = self._local_workers_config
print(f" Local workers configured: {self.local_workers}")
# Auto-detect layout workers for parallel processing
if self.parallel_layouts:
if self._layout_workers_config is None:
# Conservative default: min(4, CPU cores / 2) to avoid memory issues
self.layout_workers = min(4, max(1, os.cpu_count() // 2))
print(f" Layout workers auto-detected: {self.layout_workers} (conservative: min(4, CPU cores / 2))")
else:
self.layout_workers = self._layout_workers_config
print(f" Layout workers configured: {self.layout_workers}")
# Set max concurrent layouts if not specified
if self.max_concurrent_layouts is None:
self.max_concurrent_layouts = self.layout_workers
print(f" Max concurrent layouts: {self.max_concurrent_layouts} (same as layout workers)")
else:
self.layout_workers = 1 # Sequential processing
self.max_concurrent_layouts = 1
# Update the parent class max_concurrent_workers for OpenAI operations
self.max_concurrent_workers = self.openai_workers
def generate_image_embedding(self, image_path: str) -> Optional[np.ndarray]:
"""Generate 1408-dimensional embedding for an image using Vertex AI"""
if not self.vector_mode:
raise ValueError("Vector mode must be enabled to generate embeddings")
try:
# Create Vertex AI Image object directly from file path
vertex_image = VertexImage.load_from_file(image_path)
# Get embedding from Vertex AI
response = self.embedding_model.get_embeddings(image=vertex_image)
# Extract the embedding vector (1408 dimensions)
embedding = np.array(response.image_embedding)
return embedding
except Exception as e:
print(f"Error generating embedding for {Path(image_path).name}: {e}")
return None
def compute_cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""Compute cosine similarity between two embeddings"""
# Normalize the embeddings
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
# Compute cosine similarity
similarity = np.dot(embedding1, embedding2) / (norm1 * norm2)
return float(similarity)
def save_embedding_cache(self, embeddings: Dict, filename: str):
"""Save embeddings to cache file"""
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(embeddings, f)
print(f"Embeddings cached to: {cache_file}")
def load_embedding_cache(self, filename: str) -> Optional[Dict]:
"""Load embeddings from cache file"""
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
if cache_file.exists():
try:
with open(cache_file, 'rb') as f:
embeddings = pickle.load(f)
print(f"Loaded cached embeddings from: {cache_file}")
return embeddings
except Exception as e:
print(f"Error loading cached embeddings: {e}")
return None
def generate_master_embeddings(self, force_regenerate=False) -> Dict[str, np.ndarray]:
"""Generate embeddings for all master images (with caching)"""
if not self.vector_mode:
raise ValueError("Vector mode must be enabled to generate master embeddings")
cache_filename = "master_embeddings"
# Try to load from cache first
if not force_regenerate:
cached_embeddings = self.load_embedding_cache(cache_filename)
if cached_embeddings is not None:
# Verify all master images are in cache
if set(cached_embeddings.keys()) == set(self.master_images.keys()):
self.master_embeddings = cached_embeddings
print(f"✓ Using cached embeddings for {len(cached_embeddings)} master images")
return self.master_embeddings
else:
print("Cache incomplete, regenerating embeddings...")
print(f"Generating embeddings for {len(self.master_images)} master images...")
self.master_embeddings = {}
for i, (master_id, image_path) in enumerate(self.master_images.items(), 1):
print(f" {i}/{len(self.master_images)}: Generating embedding for {master_id}")
embedding = self.generate_image_embedding(image_path)
if embedding is not None:
self.master_embeddings[master_id] = embedding
# Small delay to avoid rate limiting
if i < len(self.master_images):
time.sleep(0.1)
# Cache the embeddings
if self.master_embeddings:
self.save_embedding_cache(self.master_embeddings, cache_filename)
print(f"✓ Generated embeddings for {len(self.master_embeddings)} master images")
return self.master_embeddings
def detect_images_in_layout_hybrid(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
"""
Hybrid detection method that routes to appropriate detection based on panel count
"""
layout_name = Path(layout_path).name
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Hybrid mode)")
try:
# Step 1: Count panels and detect censorship using OpenAI O3 (consolidated call)
print(f" Step 1: Analyzing panels and censorship using OpenAI O3...")
combined_result = self.count_panels_and_detect_censorship(layout_path)
panel_count = combined_result.get('panel_count', 1)
panel_confidence = combined_result.get('panel_confidence', 'unknown')
is_censored = combined_result.get('is_censored', True)
censorship_confidence = combined_result.get('censorship_confidence', 'unknown')
print(f" Panel analysis: {panel_count} panels detected (confidence: {panel_confidence})")
print(f" Censorship analysis: {'CENSORED' if is_censored else 'UNCENSORED'} (confidence: {censorship_confidence})")
# Step 2: Route to appropriate detection method
analysis_method = "vector similarity" if self.vector_mode else "local inlier analysis"
if panel_count <= self.panel_threshold:
print(f" Step 2: Using {analysis_method} (≤{self.panel_threshold} panels)")
detection_result = self.detect_with_local_inlier_analysis(layout_path, layout_name)
detection_result['detection_method'] = 'vector_similarity' if self.vector_mode else 'local_inlier_analysis'
detection_result['api_calls_used'] = 1 # Only consolidated panel counting + censorship
else:
print(f" Step 2: Using split method + {analysis_method} (≥{self.panel_threshold + 1} panels)")
detection_result = self.detect_with_split_and_inlier_analysis(layout_path, layout_name, panel_count)
detection_result['detection_method'] = 'split_and_vector_similarity' if self.vector_mode else 'split_and_inlier_analysis'
detection_result['api_calls_used'] = 1 # Only consolidated panel counting + censorship
# Step 3: Apply CEN refinement if enabled and we have CEN matches
current_masters = detection_result.get('detected_masters', [])
if self.refinement_mode and current_masters:
cen_images = [mid for mid in current_masters if self.is_cen_image(mid)]
if cen_images:
print(f" Step 3: Applying CEN refinement using stored censorship analysis...")
cen_result = self.apply_cen_refinement_with_stored_analysis(detection_result, is_censored, censorship_confidence)
detection_result.update(cen_result)
print(f" CEN refinement completed")
# Step 4: Deduplicate and truncate matches if more than detected panels (using inlier scores)
current_masters = detection_result.get('detected_masters', [])
inlier_scores = detection_result.get('inlier_scores', {})
# First, deduplicate the results to remove exact duplicates
original_count = len(current_masters)
current_masters = self.deduplicate_master_matches(current_masters)
if len(current_masters) != original_count:
duplicates_removed = original_count - len(current_masters)
print(f" Step 4a: Removed {duplicates_removed} duplicate master(s)")
# Update the detection result with deduplicated masters
detection_result['detected_masters'] = current_masters
detection_result['detected_master_ids'] = current_masters
detection_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in current_masters]
if len(current_masters) > panel_count and inlier_scores and not self.no_truncation:
print(f" Step 4b: Truncating {len(current_masters)} matches to top {panel_count} by inlier score...")
# Sort masters by inlier score (descending)
sorted_masters = sorted(current_masters,
key=lambda mid: inlier_scores.get(mid, 0),
reverse=True)
# Keep only top N matches
truncated_masters = sorted_masters[:panel_count]
removed_masters = sorted_masters[panel_count:]
detection_result['detected_masters'] = truncated_masters
detection_result['detected_master_ids'] = truncated_masters
detection_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in truncated_masters]
detection_result['truncation_applied'] = True
detection_result['original_match_count'] = len(current_masters)
detection_result['final_match_count'] = len(truncated_masters)
detection_result['removed_masters'] = removed_masters
detection_result['removed_count'] = len(removed_masters)
print(f" Truncated to {len(truncated_masters)} matches (removed {len(removed_masters)} lower-scoring matches)")
elif len(current_masters) > panel_count and self.no_truncation:
print(f" Step 4b: Truncation disabled - keeping all {len(current_masters)} matches")
detection_result['truncation_applied'] = False
detection_result['original_match_count'] = len(current_masters)
detection_result['final_match_count'] = len(current_masters)
# Step 5: Add panel analysis and censorship information
detection_result['panel_analysis'] = {
'panel_count': panel_count,
'confidence': panel_confidence,
'analysis': combined_result.get('panel_analysis', ''),
'panel_descriptions': combined_result.get('panel_descriptions', [])
}
detection_result['censorship_analysis'] = {
'is_censored': is_censored,
'confidence': censorship_confidence,
'analysis': combined_result.get('censorship_analysis', ''),
'coverage_details': combined_result.get('coverage_details', '')
}
detection_result['panel_count'] = panel_count
detection_result['panel_threshold'] = self.panel_threshold
detection_result['processing_mode'] = 'hybrid'
detection_result['layout_path'] = layout_path
# Step 6: Calculate confidence score based on matches vs panels ratio
final_matches = len(detection_result.get('detected_masters', []))
confidence_percentage = self.calculate_confidence_score(final_matches, panel_count)
detection_result['confidence_score'] = confidence_percentage
detected_count = len(detection_result.get('detected_masters', []))
base_method = "vector similarity" if self.vector_mode else "local analysis"
method_name = base_method if panel_count <= self.panel_threshold else f"split + {base_method}"
print(f" Step 6: Confidence score calculated: {confidence_percentage:.1f}% ({final_matches} matches / {panel_count} panels)")
# Step 7: Apply fallback to OpenAI one-at-a-time if enabled and needed
if self.fallback_one_at_a_time and final_matches < panel_count:
print(f" Step 7: Fallback triggered - {final_matches} matched masters < {panel_count} detected panels")
print(f" Running OpenAI one-at-a-time method with {len(self.master_images)} workers...")
print(f" DEBUG: Available master images: {len(self.master_images)}")
# Create a temporary OpenAI detector for one-at-a-time processing
try:
temp_detector = OpenAIImageDetector(
enable_greyscale=self.enable_greyscale,
enable_contrast_enhancement=self.enable_contrast_enhancement,
contrast_factor=self.contrast_factor,
refinement_mode=self.refinement_mode,
one_at_a_time_mode=True,
max_concurrent_workers=len(self.master_images) # Use number of masters as worker count
)
# Manually set the master images since the constructor might not load them
temp_detector.master_images = self.master_images.copy()
print(f" DEBUG: Copied {len(self.master_images)} master images to temp detector")
except Exception as e:
print(f" ERROR: Failed to create temporary OpenAI detector: {e}")
print(f" Fallback cancelled due to detector creation failure")
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
return detection_result
print(f" DEBUG: Temporary detector created with {temp_detector.max_concurrent_workers} workers")
print(f" DEBUG: Temp detector has {len(temp_detector.master_images)} master images")
print(f" DEBUG: Temp detector API key present: {temp_detector.api_key is not None}")
print(f" DEBUG: Temp detector one-at-a-time mode: {temp_detector.one_at_a_time_mode}")
print(f" DEBUG: Calling detect_images_in_layout_one_at_a_time...")
# Run OpenAI one-at-a-time detection with stored censorship data
try:
stored_censorship_data = {
'is_censored': is_censored,
'confidence': censorship_confidence
}
fallback_result = temp_detector.detect_images_in_layout_one_at_a_time(layout_path, layout_index, total_layouts, stored_censorship_data)
print(f" DEBUG: OpenAI one-at-a-time call completed successfully")
except Exception as e:
print(f" ERROR: OpenAI one-at-a-time call failed: {e}")
print(f" Fallback cancelled due to OpenAI call failure")
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
return detection_result
print(f" DEBUG: Fallback result type: {type(fallback_result)}")
print(f" DEBUG: Fallback result keys: {list(fallback_result.keys()) if fallback_result else 'None'}")
if fallback_result:
detected_masters = fallback_result.get('detected_masters', [])
print(f" DEBUG: Raw detected masters: {detected_masters}")
print(f" DEBUG: Number of raw detected masters: {len(detected_masters)}")
if detected_masters:
print(f" Fallback successful - Found {len(detected_masters)} matches")
# Apply CEN refinement if enabled
if self.refinement_mode:
print(f" DEBUG: Applying CEN refinement...")
fallback_result = self.apply_cen_refinement_with_stored_analysis(fallback_result, is_censored, censorship_confidence)
print(f" DEBUG: After CEN refinement: {fallback_result.get('detected_masters', [])}")
# Apply deduplication
print(f" DEBUG: Applying deduplication...")
original_count = len(fallback_result['detected_masters'])
fallback_result['detected_masters'] = self.deduplicate_master_matches(fallback_result['detected_masters'])
fallback_result['detected_master_ids'] = fallback_result['detected_masters']
fallback_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in fallback_result['detected_masters']]
deduplicated_count = len(fallback_result['detected_masters'])
print(f" DEBUG: Deduplication: {original_count} -> {deduplicated_count}")
# Update metadata to reflect fallback usage
fallback_result['fallback_applied'] = True
fallback_result['original_detection_method'] = detection_result.get('detection_method', 'unknown')
fallback_result['original_match_count'] = final_matches
fallback_result['fallback_match_count'] = len(fallback_result['detected_masters'])
fallback_result['detection_method'] = 'openai_one_at_a_time_fallback'
fallback_result['processing_mode'] = 'hybrid_with_fallback'
# Preserve original analysis data
fallback_result['panel_analysis'] = detection_result['panel_analysis']
fallback_result['censorship_analysis'] = detection_result['censorship_analysis']
fallback_result['panel_count'] = panel_count
fallback_result['panel_threshold'] = self.panel_threshold
fallback_result['layout_path'] = layout_path
# Recalculate confidence score
fallback_matches = len(fallback_result['detected_masters'])
fallback_result['confidence_score'] = self.calculate_confidence_score(fallback_matches, panel_count)
# Update API call count to include fallback calls
base_api_calls = 1 # Initial hybrid analysis call
fallback_api_calls = fallback_result.get('api_calls_made', 0)
fallback_result['api_calls_used'] = base_api_calls + fallback_api_calls
print(f"✓ Fallback completed {layout_name} - Found {fallback_matches} matches using OpenAI one-at-a-time")
return fallback_result
else:
print(f" DEBUG: No detected masters in fallback result")
else:
print(f" DEBUG: Fallback result is None or empty")
print(f" Fallback failed - No additional matches found, keeping original results")
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
return detection_result
except Exception as e:
error_msg = f"Error in hybrid analysis for {layout_name}: {e}"
print(error_msg)
return {
'detected_masters': [],
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': f'Hybrid analysis failed: {error_msg}',
'error': str(e),
'processing_mode': 'hybrid_error',
'detection_method': 'error',
'api_calls_used': 0
}
def detect_with_local_inlier_analysis(self, layout_path: str, layout_name: str) -> Dict:
"""
Detect masters using local analysis (inlier analysis or vector similarity based on vector_mode)
"""
if self.vector_mode:
return self.detect_with_vector_similarity(layout_path, layout_name)
else:
return self.detect_with_inlier_analysis(layout_path, layout_name)
def detect_with_vector_similarity(self, layout_path: str, layout_name: str) -> Dict:
"""
Detect masters using vector similarity search
"""
print(f" → Analyzing {layout_name} with vector similarity (cosine similarity on embeddings)...")
try:
# Generate embedding for layout image
layout_embedding = self.generate_image_embedding(layout_path)
if layout_embedding is None:
raise Exception("Failed to generate layout embedding")
# Compare with all master embeddings
similarities = {}
detected_masters = []
print(f" → Comparing against {len(self.master_embeddings)} master embeddings...")
for master_id, master_embedding in self.master_embeddings.items():
similarity = self.compute_cosine_similarity(layout_embedding, master_embedding)
similarities[master_id] = similarity
if similarity >= self.similarity_threshold:
detected_masters.append(master_id)
# Sort detected masters by similarity (highest first)
detected_masters.sort(key=lambda x: similarities[x], reverse=True)
# Get top similarities for analysis
top_similarities = sorted(similarities.items(), key=lambda x: x[1], reverse=True)[:5]
# Create analysis text
analysis_parts = [
f"Vector similarity analysis using Google Vertex AI embeddings (1408 dimensions).",
f"Similarity threshold: {self.similarity_threshold}",
f"Found {len(detected_masters)} matches above threshold.",
f"Top 5 similarities: " + ", ".join([f"{mid}({sim:.3f})" for mid, sim in top_similarities])
]
# Apply deduplication (for consistency with inlier analysis)
original_detected = detected_masters[:]
detected_masters = self.deduplicate_master_matches(detected_masters)
if len(detected_masters) != len(original_detected):
duplicates_removed = len(original_detected) - len(detected_masters)
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
analysis = " ".join(analysis_parts)
print(f" → Vector similarity analysis completed: {len(detected_masters)} matches found")
# Create similarity scores dict for potential truncation
similarity_scores = {mid: similarities[mid] for mid in detected_masters}
return {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
'analysis': analysis,
'processing_mode': 'vector_similarity',
'total_masters_checked': len(self.master_embeddings),
'confident_matches': len(detected_masters),
'similarity_threshold': self.similarity_threshold,
'deduplication_applied': len(detected_masters) != len(original_detected),
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
'original_detected_masters': original_detected,
'inlier_scores': similarity_scores, # Using similarity scores for truncation compatibility
'top_similarities': dict(top_similarities)
}
except Exception as e:
error_msg = f"Error in vector similarity analysis for {layout_name}: {e}"
print(f"{error_msg}")
return {
'detected_masters': [],
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': f'Vector similarity analysis failed: {error_msg}',
'error': str(e),
'processing_mode': 'vector_similarity_error'
}
def detect_with_inlier_analysis(self, layout_path: str, layout_name: str) -> Dict:
"""
Detect masters using local OpenCV-based inlier analysis with multiprocessing
Memory-safe version with dynamic worker adjustment
"""
print(f" → Analyzing {layout_name} with local inlier analysis using {self.local_workers} processes...")
try:
# Check memory before starting
if not self.memory_manager.is_memory_safe():
print(" → Memory usage high, waiting for safe levels...")
if not self.memory_manager.wait_for_memory_safe():
raise Exception("Memory usage too high to safely process")
# Load layout image in grayscale for feature detection (for initial check)
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
if layout_img is None:
raise Exception(f"Could not load layout image: {layout_path}")
# Detect features in layout image for reporting
layout_kp, layout_desc = self.akaze.detectAndCompute(layout_img, None)
if layout_desc is None:
raise Exception("No features detected in layout image")
feature_count = len(layout_kp)
print(f" → Detected {feature_count} features in layout")
# Dynamically adjust worker count based on feature count and memory
if feature_count > 50000:
# High feature count - reduce parallelism
safe_workers = max(1, self.local_workers // 2)
print(f" → High feature count ({feature_count}), reducing workers to {safe_workers}")
elif feature_count > 30000:
safe_workers = max(1, int(self.local_workers * 0.75))
print(f" → Medium feature count ({feature_count}), reducing workers to {safe_workers}")
else:
safe_workers = self.local_workers
# Further limit based on available memory
safe_workers = min(safe_workers, self.memory_manager.limit_concurrent_processes(safe_workers))
# Prepare tasks for multiprocessing
total_masters = len(self.master_images)
tasks = []
# Add max_features parameter to prevent memory explosion
max_features = min(10000, feature_count // 2) if feature_count > 20000 else 10000
for master_id, master_path in self.master_images.items():
tasks.append((layout_path, master_id, master_path, self.min_good_matches, max_features))
print(f" → Processing {total_masters} masters using {safe_workers} concurrent processes...")
print(f" → Feature limit per master: {max_features}")
# Process masters in parallel
master_results = []
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=safe_workers) as executor:
# Submit all tasks
future_to_master = {
executor.submit(process_single_master_inlier_analysis, *task_args): task_args[1]
for task_args in tasks
}
completed_count = 0
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_master):
master_id = future_to_master[future]
completed_count += 1
# Progress update and memory monitoring every 10 completed masters
if completed_count % 10 == 0:
usage = self.memory_manager.get_memory_usage()
print(f" → Completed {completed_count}/{total_masters} masters... Memory: {usage['memory_percent']:.1f}%")
# If memory is getting high, warn and potentially pause
if usage['memory_percent'] > 80:
print(f" → WARNING: High memory usage {usage['memory_percent']:.1f}%, monitoring closely...")
if usage['swap_percent'] > 20:
print(f" → WARNING: Swap usage {usage['swap_percent']:.1f}%, may slow down...")
try:
result = future.result()
# Add to results if above threshold
inliers = result.get('inliers', 0)
confidence = result.get('confidence', 'low')
if inliers > 0 and confidence != 'low':
master_results.append({
'master_id': master_id,
'inliers': inliers,
'confidence': confidence,
'details': result
})
except Exception as e:
print(f" → Error processing {master_id}: {e}")
# Sort by inlier count (descending)
master_results.sort(key=lambda x: x['inliers'], reverse=True)
# Apply thresholds and select confident matches
detected_masters = []
inlier_scores = {}
analysis_parts = []
if master_results:
# Get the best match for reference
best_match = master_results[0]
best_inliers = best_match['inliers']
# Apply absolute and relative thresholds
min_inliers = max(self.min_good_matches, best_inliers * self.inlier_ratio_threshold, int(best_inliers * self.inlier_threshold))
for result in master_results:
inliers = result['inliers']
confidence = result['confidence']
master_id = result['master_id']
# Store inlier score for all results (for potential truncation)
inlier_scores[master_id] = inliers
# Check if this match meets our criteria
if (inliers >= min_inliers and
confidence in ['high', 'medium'] and
inliers >= self.min_good_matches):
detected_masters.append(master_id)
# Create analysis text
analysis_parts = [
f"Multiprocessing local inlier analysis using OpenCV AKAZE features ({self.local_workers} processes).",
f"Processed {total_masters} master images against {len(layout_kp)} layout features.",
f"Found {len(master_results)} potential matches, {len(detected_masters)} above threshold.",
f"Best match: {best_match['master_id']} ({best_inliers} inliers, {best_match['confidence']} confidence)"
]
else:
analysis_parts = [
f"Multiprocessing local inlier analysis using OpenCV AKAZE features ({self.local_workers} processes).",
f"Processed {total_masters} master images against {len(layout_kp)} layout features.",
f"No confident matches found above threshold."
]
# Apply deduplication (shouldn't be needed for local analysis, but for safety)
original_detected = detected_masters[:]
detected_masters = self.deduplicate_master_matches(detected_masters)
if len(detected_masters) != len(original_detected):
duplicates_removed = len(original_detected) - len(detected_masters)
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
analysis = " ".join(analysis_parts)
print(f" → Local analysis completed: {len(detected_masters)} matches found using {safe_workers} processes")
# Force garbage collection to free memory
import gc
gc.collect()
return {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
'analysis': analysis,
'processing_mode': 'local_inlier_analysis_multiprocess',
'total_masters_checked': total_masters,
'potential_matches_found': len(master_results),
'confident_matches': len(detected_masters),
'inlier_threshold': min_inliers if master_results else self.min_good_matches,
'master_analysis_results': master_results[:10], # Top 10 for debugging
'concurrent_workers': self.local_workers,
'deduplication_applied': len(detected_masters) != len(original_detected),
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
'original_detected_masters': original_detected,
'inlier_scores': inlier_scores # Track inlier scores for truncation
}
except Exception as e:
error_msg = f"Error in local inlier analysis for {layout_name}: {e}"
print(f"{error_msg}")
return {
'detected_masters': [],
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': f'Local inlier analysis failed: {error_msg}',
'error': str(e),
'processing_mode': 'local_inlier_analysis_error'
}
def detect_with_split_and_inlier_analysis(self, layout_path: str, layout_name: str, panel_count: int) -> Dict:
"""
Detect masters using split method followed by local analysis (inlier analysis or vector similarity based on vector_mode)
"""
if self.vector_mode:
return self.detect_with_split_and_vector_similarity(layout_path, layout_name, panel_count)
else:
return self.detect_with_split_and_inlier_analysis_opencv(layout_path, layout_name, panel_count)
def detect_with_split_and_vector_similarity(self, layout_path: str, layout_name: str, panel_count: int) -> Dict:
"""
Detect masters using split method followed by vector similarity analysis on individual panels
"""
print(f" → Analyzing {layout_name} with split method + vector similarity...")
try:
# Initialize panel splitter if not already done
if not hasattr(self, 'splitter'):
if self.split_simple:
self.splitter = SimplePanelSplitter(debug=False)
elif self.split_advanced:
self.splitter = AdvancedPanelSplitter(
percentile=self.percentile,
min_gap=self.min_gap,
debug=False
)
else:
self.splitter = PanelSplitter()
# Step 1: Split layout into individual panels using already-known panel count
print(f" → Splitting layout into {panel_count} panels (using pre-analyzed count)...")
split_panels = self.splitter.split_panels(layout_path, panel_count)
if not split_panels:
raise Exception("Panel splitting failed: No panels generated")
splits_generated = len(split_panels)
print(f" → Successfully split into {splits_generated} individual panel images")
# Step 2: Apply vector similarity analysis to each split panel
print(f" → Analyzing {splits_generated} split panels using vector similarity...")
all_results = []
split_panel_paths = []
for i, split_info in enumerate(split_panels):
# Save split panel to temporary file for analysis
split_path = f"/tmp/split_panel_{i}_{os.path.basename(layout_path)}"
cv2.imwrite(split_path, split_info['image'])
split_panel_paths.append(split_path)
print(f" → Processing split panel {i+1}/{splits_generated}...")
# Generate embedding for this split panel
split_embedding = self.generate_image_embedding(split_path)
if split_embedding is None:
print(f" → Failed to generate embedding for split panel {i+1}")
continue
# Compare split panel against all master embeddings
for master_id, master_embedding in self.master_embeddings.items():
similarity = self.compute_cosine_similarity(split_embedding, master_embedding)
if similarity >= self.similarity_threshold:
all_results.append({
'master_id': master_id,
'similarity': similarity,
'confidence': 'high' if similarity >= 0.9 else 'medium',
'split_panel': i,
'split_path': split_path
})
# Clean up temporary split panel files
for split_path in split_panel_paths:
try:
os.remove(split_path)
except:
pass
# Step 3: Process results and find best matches
detected_masters = []
similarity_scores = {}
if all_results:
# Sort by similarity (descending)
all_results.sort(key=lambda x: x['similarity'], reverse=True)
# Track similarity scores for all results (for potential truncation)
for result in all_results:
master_id = result['master_id']
similarity = result['similarity']
# Keep the highest similarity score for each master (in case of multiple panel matches)
if master_id not in similarity_scores or similarity > similarity_scores[master_id]:
similarity_scores[master_id] = similarity
# Add to detected masters if not already present
if master_id not in detected_masters:
detected_masters.append(master_id)
# Apply deduplication
original_detected = detected_masters[:]
detected_masters = self.deduplicate_master_matches(detected_masters)
if self.split_simple:
splitter_type = "SimplePanelSplitter (even division)"
elif self.split_advanced:
splitter_type = "AdvancedPanelSplitter (edge detection)"
else:
splitter_type = "PanelSplitter (multiple CV methods)"
analysis_parts = [
f"Multi-panel layout processed using split method + vector similarity analysis.",
f"Layout split into {splits_generated} individual panels using {splitter_type} (no additional API calls).",
f"Each panel analyzed against {len(self.master_embeddings)} master embeddings using Google Vertex AI (1408 dimensions).",
f"Similarity threshold: {self.similarity_threshold}",
f"Found {len(detected_masters)} matches after deduplication."
]
if len(detected_masters) != len(original_detected):
duplicates_removed = len(original_detected) - len(detected_masters)
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
analysis = " ".join(analysis_parts)
print(f" → Split + vector similarity analysis completed: {len(detected_masters)} matches found")
return {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
'analysis': analysis,
'processing_mode': 'split_and_vector_similarity',
'splits_generated': splits_generated,
'panel_count': panel_count,
'deduplication_applied': len(detected_masters) != len(original_detected),
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
'original_detected_masters': original_detected,
'total_combinations_processed': len(all_results),
'potential_matches_found': len(all_results),
'inlier_scores': similarity_scores, # Using similarity scores for truncation compatibility
'similarity_threshold': self.similarity_threshold
}
except Exception as e:
error_msg = f"Error in split + vector similarity analysis for {layout_name}: {e}"
print(f"{error_msg}")
return {
'detected_masters': [],
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': f'Split + vector similarity analysis failed: {error_msg}',
'error': str(e),
'processing_mode': 'split_and_vector_similarity_error'
}
def detect_with_split_and_inlier_analysis_opencv(self, layout_path: str, layout_name: str, panel_count: int) -> Dict:
"""
Detect masters using split method followed by local inlier analysis on individual panels
"""
print(f" → Analyzing {layout_name} with split method + inlier analysis...")
try:
# Initialize panel splitter if not already done
if not hasattr(self, 'splitter'):
if self.split_simple:
self.splitter = SimplePanelSplitter(debug=False)
elif self.split_advanced:
self.splitter = AdvancedPanelSplitter(
percentile=self.percentile,
min_gap=self.min_gap,
debug=False
)
else:
self.splitter = PanelSplitter()
# Step 1: Split layout into individual panels using already-known panel count
print(f" → Splitting layout into {panel_count} panels (using pre-analyzed count)...")
split_panels = self.splitter.split_panels(layout_path, panel_count)
if not split_panels:
raise Exception("Panel splitting failed: No panels generated")
splits_generated = len(split_panels)
print(f" → Successfully split into {splits_generated} individual panel images")
# Step 2: Apply multiprocessing inlier analysis to each split panel
print(f" → Analyzing {splits_generated} split panels using {self.local_workers} concurrent processes...")
# Prepare tasks for multiprocessing - analyze each split panel against all masters
tasks = []
split_panel_paths = []
for i, split_info in enumerate(split_panels):
# Save split panel to temporary file for analysis
split_path = f"/tmp/split_panel_{i}_{os.path.basename(layout_path)}"
cv2.imwrite(split_path, split_info['image'])
split_panel_paths.append(split_path)
# Create tasks for this split panel against all masters
for master_id, master_path in self.master_images.items():
tasks.append((split_path, master_id, master_path, self.min_good_matches))
# Process all split panels against all masters in parallel
all_results = []
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=self.local_workers) as executor:
future_to_task = {
executor.submit(process_single_master_inlier_analysis, *task_args): task_args
for task_args in tasks
}
completed_count = 0
for future in concurrent.futures.as_completed(future_to_task):
task_args = future_to_task[future]
completed_count += 1
# Progress update
if completed_count % 50 == 0:
total_tasks = len(tasks)
print(f" → Completed {completed_count}/{total_tasks} split-master combinations...")
try:
result = future.result()
if result.get('confidence') != 'low' and result.get('inliers', 0) > 0:
all_results.append(result)
except Exception as e:
print(f" → Error processing split task: {e}")
# Clean up temporary split panel files
for split_path in split_panel_paths:
try:
os.remove(split_path)
except:
pass
# Step 3: Process results and find best matches
detected_masters = []
inlier_scores = {}
if all_results:
# Sort by inlier count (descending)
all_results.sort(key=lambda x: x['inliers'], reverse=True)
# Apply thresholds
best_inliers = all_results[0]['inliers']
min_inliers = max(self.min_good_matches, best_inliers * self.inlier_ratio_threshold, int(best_inliers * self.inlier_threshold))
# Track inlier scores for all results (for potential truncation)
for result in all_results:
master_id = result['master_id']
inliers = result['inliers']
# Keep the highest inlier score for each master (in case of multiple panel matches)
if master_id not in inlier_scores or inliers > inlier_scores[master_id]:
inlier_scores[master_id] = inliers
if (inliers >= min_inliers and
result['confidence'] in ['high', 'medium'] and
inliers >= self.min_good_matches):
detected_masters.append(master_id)
# Apply deduplication
original_detected = detected_masters[:]
detected_masters = self.deduplicate_master_matches(detected_masters)
if self.split_simple:
splitter_type = "SimplePanelSplitter (even division)"
elif self.split_advanced:
splitter_type = "AdvancedPanelSplitter (edge detection)"
else:
splitter_type = "PanelSplitter (multiple CV methods)"
analysis_parts = [
f"Multi-panel layout processed using split method + local inlier analysis.",
f"Layout split into {splits_generated} individual panels using {splitter_type} (no additional API calls).",
f"Each panel analyzed against {len(self.master_images)} masters using OpenCV AKAZE features with multiprocessing ({self.local_workers} processes).",
f"Processed {len(tasks)} split-master combinations in parallel.",
f"Found {len(detected_masters)} matches after deduplication."
]
if len(detected_masters) != len(original_detected):
duplicates_removed = len(original_detected) - len(detected_masters)
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
analysis = " ".join(analysis_parts)
print(f" → Split + inlier analysis completed: {len(detected_masters)} matches found using {self.local_workers} processes")
return {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
'analysis': analysis,
'processing_mode': 'split_and_inlier_analysis',
'splits_generated': splits_generated,
'panel_count': panel_count,
'concurrent_workers': self.local_workers,
'deduplication_applied': len(detected_masters) != len(original_detected),
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
'original_detected_masters': original_detected,
'total_combinations_processed': len(tasks),
'potential_matches_found': len(all_results),
'inlier_scores': inlier_scores # Track inlier scores for truncation
}
except Exception as e:
error_msg = f"Error in split + inlier analysis for {layout_name}: {e}"
print(f"{error_msg}")
return {
'detected_masters': [],
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': f'Split + inlier analysis failed: {error_msg}',
'error': str(e),
'processing_mode': 'split_and_inlier_analysis_error'
}
def process_all_layouts_hybrid(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
"""
Process all layout images using hybrid detection
"""
print("Starting hybrid batch processing...")
analysis_method = "vector similarity" if self.vector_mode else "local analysis"
print(f"Panel threshold: ≤{self.panel_threshold} panels → {analysis_method}, ≥{self.panel_threshold + 1} panels → split + {analysis_method}")
# Load master images
self.load_master_images()
# Generate master embeddings if vector mode is enabled
if self.vector_mode:
self.generate_master_embeddings()
if not self.master_embeddings:
raise Exception("No master embeddings available for vector mode")
# Configure worker counts now that we know the number of masters
self._configure_worker_counts()
# Get layout files
if specific_file:
# Process only the specific file
layout_files = [self.layouts_path / specific_file]
if not layout_files[0].exists():
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
print(f"Processing specific file: {specific_file}")
else:
layout_files = list(self.layouts_path.glob("*.jpg"))
if limit:
layout_files = layout_files[:limit]
print(f"Processing first {limit} layouts only")
total_layouts = len(layout_files)
print(f"Processing {total_layouts} layout images in hybrid mode")
print("=" * 60)
results = {}
start_time = time.time()
# Track hybrid statistics
local_analysis_count = 0
split_analysis_count = 0
total_api_calls = 0
truncation_count = 0
total_truncated_matches = 0
for i, layout_path in enumerate(layout_files, 1):
layout_id = layout_path.stem
# Detect images using hybrid method
result = self.detect_images_in_layout_hybrid(str(layout_path), i, total_layouts)
# Track statistics
detection_method = result.get('detection_method')
if detection_method in ['local_inlier_analysis', 'vector_similarity']:
local_analysis_count += 1
elif detection_method in ['split_and_inlier_analysis', 'split_and_vector_similarity']:
split_analysis_count += 1
total_api_calls += result.get('api_calls_used', 0)
if result.get('truncation_applied'):
truncation_count += 1
total_truncated_matches += result.get('removed_count', 0)
layout_result = {
'layout_filename': layout_path.name,
'detected_master_ids': result.get('detected_master_ids', []),
'detected_master_filenames': result.get('detected_master_filenames', []),
'analysis': result.get('analysis', ''),
'detection_method': result.get('detection_method', 'unknown'),
'panel_count': result.get('panel_count', 1),
'panel_threshold': self.panel_threshold,
'processing_mode': 'hybrid'
}
# Add method-specific fields
if 'panel_analysis' in result:
layout_result['panel_analysis'] = result['panel_analysis']
if 'total_masters_checked' in result:
layout_result['total_masters_checked'] = result['total_masters_checked']
if 'splits_generated' in result:
layout_result['splits_generated'] = result['splits_generated']
layout_result['split_results'] = result.get('split_results', [])
# Add deduplication fields if applied
if result.get('deduplication_applied'):
layout_result['deduplication_applied'] = result['deduplication_applied']
layout_result['duplicates_removed'] = result['duplicates_removed']
layout_result['original_detected_masters'] = result['original_detected_masters']
# Add refinement fields if applied
if result.get('refinement_applied'):
layout_result['refinement_applied'] = result['refinement_applied']
layout_result['refinement_details'] = result['refinement_details']
layout_result['censorship_analysis'] = result['censorship_analysis']
# Add truncation fields if applied
if result.get('truncation_applied'):
layout_result['truncation_applied'] = result['truncation_applied']
layout_result['original_match_count'] = result['original_match_count']
layout_result['final_match_count'] = result['final_match_count']
layout_result['removed_count'] = result['removed_count']
if 'error' in result:
layout_result['error'] = result['error']
# Add cost breakdown for this layout
cost_breakdown = cost_calculator.get_layout_cost_breakdown(layout_path.name)
if cost_breakdown:
layout_result['cost_breakdown'] = cost_breakdown
results[layout_id] = layout_result
# Progress update with time estimate
elapsed = time.time() - start_time
avg_time = elapsed / i
remaining = (total_layouts - i) * avg_time
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
# Save progress periodically
if i % 20 == 0:
self.save_results(results, f"hybrid_progress_{i}")
total_time = time.time() - start_time
# Print hybrid statistics
print(f"\n{'='*60}")
print("HYBRID PROCESSING STATISTICS")
print(f"{'='*60}")
print(f"Total layouts processed: {total_layouts}")
print(f"Local analysis used: {local_analysis_count} ({local_analysis_count/total_layouts*100:.1f}%)")
print(f"Split + inlier analysis used: {split_analysis_count} ({split_analysis_count/total_layouts*100:.1f}%)")
print(f"Truncation applied: {truncation_count} layouts ({truncation_count/total_layouts*100:.1f}%)")
print(f"Total matches truncated: {total_truncated_matches}")
print(f"Total API calls made: {total_api_calls}")
print(f"Average API calls per layout: {total_api_calls/total_layouts:.1f}")
print(f"Estimated cost savings vs one-at-a-time: {(1 - total_api_calls/(total_layouts * (len(self.master_images) + 1)))*100:.1f}%")
print(f"Total processing time: {total_time/60:.1f} minutes")
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
return results
def process_all_layouts_hybrid_parallel(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
"""
Process all layout images using hybrid detection with parallel layout processing
"""
print("Starting hybrid batch processing with parallel layout workers...")
analysis_method = "vector similarity" if self.vector_mode else "local analysis"
print(f"Panel threshold: ≤{self.panel_threshold} panels → {analysis_method}, ≥{self.panel_threshold + 1} panels → split + {analysis_method}")
print(f"Parallel processing enabled: {self.layout_workers} layout workers, max {self.max_concurrent_layouts} concurrent layouts")
# Load master images
self.load_master_images()
# Generate master embeddings if vector mode is enabled
if self.vector_mode:
self.generate_master_embeddings()
if not self.master_embeddings:
raise Exception("No master embeddings available for vector mode")
# Configure worker counts now that we know the number of masters
self._configure_worker_counts()
# Initialize inlier analysis coordinator
self.inlier_coordinator = InlierAnalysisCoordinator(
self.local_workers,
self.memory_manager,
self.min_good_matches
)
self.inlier_coordinator.start()
# Get layout files
layout_files = self._prepare_layout_files(limit, specific_file)
total_layouts = len(layout_files)
# Initialize progress tracker
self.progress_tracker = ProgressTracker(total_layouts)
print(f"Processing {total_layouts} layout images in parallel hybrid mode")
print("=" * 60)
# Process layouts in parallel
results = {}
start_time = time.time()
# Track statistics
local_analysis_count = 0
split_analysis_count = 0
total_api_calls = 0
truncation_count = 0
total_truncated_matches = 0
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=self.layout_workers) as executor:
# Submit all layout processing tasks
future_to_layout = {
executor.submit(self._process_single_layout_parallel, layout_path, i, total_layouts): layout_path
for i, layout_path in enumerate(layout_files, 1)
}
# Progress monitoring thread
progress_thread = threading.Thread(target=self._monitor_progress, daemon=True)
progress_thread.start()
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_layout):
layout_path = future_to_layout[future]
layout_id = layout_path.stem
try:
result = future.result()
# Track statistics
detection_method = result.get('detection_method', 'unknown')
if detection_method in ['local_inlier_analysis', 'vector_similarity']:
local_analysis_count += 1
elif detection_method in ['split_and_inlier_analysis', 'split_and_vector_similarity']:
split_analysis_count += 1
total_api_calls += result.get('api_calls_used', 0)
if result.get('truncation_applied'):
truncation_count += 1
total_truncated_matches += result.get('removed_count', 0)
# Create layout result
layout_result = self._create_layout_result(result, layout_path)
results[layout_id] = layout_result
# Update progress
self.progress_tracker.complete_layout(success=True)
except Exception as e:
print(f"Error processing layout {layout_path.name}: {e}")
# Handle worker failure with potential adjustments
error_result = self._handle_worker_failure(layout_path.name, e)
results[layout_id] = error_result
self.progress_tracker.complete_layout(success=False)
# Monitor memory and adjust workers if needed
self._monitor_memory_and_adjust_workers()
# Save progress periodically
if len(results) % 20 == 0:
self.save_results(results, f"hybrid_parallel_progress_{len(results)}")
finally:
# Stop coordinator
if self.inlier_coordinator:
self.inlier_coordinator.stop()
total_time = time.time() - start_time
# Print statistics
self._print_parallel_statistics(
total_layouts, local_analysis_count, split_analysis_count,
truncation_count, total_truncated_matches, total_api_calls, total_time
)
return results
def _prepare_layout_files(self, limit: Optional[int], specific_file: Optional[str]) -> List:
"""Prepare list of layout files to process"""
if specific_file:
# Process only the specific file
layout_files = [self.layouts_path / specific_file]
if not layout_files[0].exists():
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
print(f"Processing specific file: {specific_file}")
else:
layout_files = list(self.layouts_path.glob("*.jpg"))
if limit:
layout_files = layout_files[:limit]
print(f"Processing first {limit} layouts only")
return layout_files
def _process_single_layout_parallel(self, layout_path, layout_index, total_layouts):
"""Process a single layout in parallel mode"""
layout_name = layout_path.name
layout_id = layout_path.stem
# Mark layout as started
self.progress_tracker.start_layout()
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Parallel hybrid mode)")
try:
# Phase 1: OpenAI API calls (can be parallel across layouts)
print(f" Step 1: Analyzing panels and censorship using OpenAI O3...")
combined_result = self.count_panels_and_detect_censorship(str(layout_path))
panel_count = combined_result.get('panel_count', 1)
panel_confidence = combined_result.get('panel_confidence', 'unknown')
is_censored = combined_result.get('is_censored', True)
censorship_confidence = combined_result.get('censorship_confidence', 'unknown')
print(f" Panel analysis: {panel_count} panels detected (confidence: {panel_confidence})")
print(f" Censorship analysis: {'CENSORED' if is_censored else 'UNCENSORED'} (confidence: {censorship_confidence})")
# Phase 2: Determine analysis method and coordinate with serial inlier analysis
analysis_method = "vector similarity" if self.vector_mode else "local inlier analysis"
inlier_future = concurrent.futures.Future()
if panel_count <= self.panel_threshold:
print(f" Step 2: Submitting {analysis_method} (≤{self.panel_threshold} panels) to inlier queue...")
# Prepare analysis parameters
analysis_params = {
'layout_path': str(layout_path),
'master_images': self.master_images,
'vector_mode': self.vector_mode,
'inlier_threshold': self.inlier_threshold,
'inlier_ratio_threshold': self.inlier_ratio_threshold
}
# Add vector mode specific parameters
if self.vector_mode:
analysis_params['embedding_model'] = self.embedding_model
analysis_params['master_embeddings'] = self.master_embeddings
analysis_params['similarity_threshold'] = self.similarity_threshold
# Submit to inlier analysis queue
self.inlier_coordinator.submit_analysis(
layout_id, 'direct', analysis_params, inlier_future
)
detection_method = 'vector_similarity' if self.vector_mode else 'local_inlier_analysis'
else:
print(f" Step 2: Splitting layout and submitting {analysis_method} (≥{self.panel_threshold + 1} panels) to inlier queue...")
# Split the layout first
split_panels = self._split_layout(layout_path, panel_count)
# Prepare analysis parameters
analysis_params = {
'split_panels': split_panels,
'master_images': self.master_images,
'vector_mode': self.vector_mode,
'inlier_threshold': self.inlier_threshold,
'inlier_ratio_threshold': self.inlier_ratio_threshold
}
# Add vector mode specific parameters
if self.vector_mode:
analysis_params['embedding_model'] = self.embedding_model
analysis_params['master_embeddings'] = self.master_embeddings
analysis_params['similarity_threshold'] = self.similarity_threshold
# Submit to inlier analysis queue
self.inlier_coordinator.submit_analysis(
layout_id, 'split', analysis_params, inlier_future
)
detection_method = 'split_and_vector_similarity' if self.vector_mode else 'split_and_inlier_analysis'
# Wait for inlier analysis to complete
print(f" Step 3: Waiting for inlier analysis completion...")
inlier_result = inlier_future.result() # No timeout - wait indefinitely for completion
# Phase 3: Post-process results (can be parallel across layouts)
detection_result = self._post_process_parallel_results(
inlier_result, combined_result, panel_count, detection_method, layout_path
)
detected_count = len(detection_result.get('detected_masters', []))
base_method = "vector similarity" if self.vector_mode else "local analysis"
method_name = base_method if panel_count <= self.panel_threshold else f"split + {base_method}"
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
return detection_result
except Exception as e:
error_msg = f"Error in parallel hybrid analysis for {layout_name}: {e}"
print(error_msg)
return self._create_error_result(layout_path, e)
def _split_layout(self, layout_path, panel_count):
"""Split layout into panels"""
# Initialize panel splitter if not already done
if not hasattr(self, 'splitter'):
if self.split_simple:
self.splitter = SimplePanelSplitter(debug=False)
elif self.split_advanced:
self.splitter = AdvancedPanelSplitter(
percentile=self.percentile,
min_gap=self.min_gap,
debug=False
)
else:
self.splitter = PanelSplitter()
# Split layout into panels
return self.splitter.split_panels(str(layout_path), panel_count)
def _post_process_parallel_results(self, inlier_result, combined_result, panel_count, detection_method, layout_path):
"""Post-process results from parallel inlier analysis"""
# Extract basic results
detected_masters = inlier_result.get('detected_masters', [])
inlier_scores = inlier_result.get('inlier_scores', {})
# Apply CEN refinement if enabled
if self.refinement_mode and detected_masters:
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
if cen_images:
print(f" Step 4: Applying CEN refinement...")
is_censored = combined_result.get('is_censored', True)
censorship_confidence = combined_result.get('censorship_confidence', 'unknown')
refined_result = self.apply_cen_refinement_with_stored_analysis(
{'detected_masters': detected_masters, 'layout_path': str(layout_path)},
is_censored, censorship_confidence
)
detected_masters = refined_result.get('detected_masters', [])
# Apply deduplication
original_count = len(detected_masters)
detected_masters = self.deduplicate_master_matches(detected_masters)
duplicates_removed = original_count - len(detected_masters)
if duplicates_removed > 0:
print(f" Step 5: Removed {duplicates_removed} duplicate master(s)")
# Apply truncation if needed
if len(detected_masters) > panel_count and inlier_scores and not self.no_truncation:
print(f" Step 6: Truncating {len(detected_masters)} matches to top {panel_count} by score...")
# Sort masters by score (descending)
sorted_masters = sorted(detected_masters,
key=lambda mid: inlier_scores.get(mid, 0),
reverse=True)
# Keep only top N matches
truncated_masters = sorted_masters[:panel_count]
removed_masters = sorted_masters[panel_count:]
detected_masters = truncated_masters
truncation_applied = True
original_match_count = len(sorted_masters)
removed_count = len(removed_masters)
elif len(detected_masters) > panel_count and self.no_truncation:
print(f" Step 6: Truncation disabled - keeping all {len(detected_masters)} matches")
truncation_applied = False
original_match_count = len(detected_masters)
removed_count = 0
else:
truncation_applied = False
original_match_count = len(detected_masters)
removed_count = 0
# Calculate confidence score
final_matches = len(detected_masters)
confidence_score = self.calculate_confidence_score(final_matches, panel_count)
# Apply fallback to OpenAI one-at-a-time if enabled and needed
if self.fallback_one_at_a_time and final_matches < panel_count:
print(f" Step 7: Fallback triggered - {final_matches} matched masters < {panel_count} detected panels")
print(f" Running OpenAI one-at-a-time method with {len(self.master_images)} workers...")
# Check resource usage before starting fallback
self._check_resource_usage()
# Create a temporary OpenAI detector for one-at-a-time processing
try:
# Force garbage collection before creating detector
import gc
gc.collect()
# Limit concurrent workers to prevent file descriptor exhaustion
max_workers = min(len(self.master_images), 20) # Cap at 20 to prevent resource exhaustion
temp_detector = OpenAIImageDetector(
enable_greyscale=self.enable_greyscale,
enable_contrast_enhancement=self.enable_contrast_enhancement,
contrast_factor=self.contrast_factor,
refinement_mode=self.refinement_mode,
one_at_a_time_mode=True,
max_concurrent_workers=max_workers
)
# Set the master images
temp_detector.master_images = self.master_images.copy()
print(f" Using {max_workers} workers for fallback (limited to prevent resource exhaustion)")
# Run OpenAI one-at-a-time detection with stored censorship data
stored_censorship_data = {
'is_censored': combined_result.get('is_censored', True),
'confidence': combined_result.get('censorship_confidence', 'unknown')
}
fallback_result = temp_detector.detect_images_in_layout_one_at_a_time(
str(layout_path), 0, 1, stored_censorship_data
)
# Clean up the temporary detector
if hasattr(temp_detector, 'cleanup_temp_files'):
temp_detector.cleanup_temp_files()
# Force garbage collection after fallback
del temp_detector
gc.collect()
if fallback_result and fallback_result.get('detected_masters'):
fallback_masters = fallback_result['detected_masters']
print(f" Fallback successful - Found {len(fallback_masters)} matches")
# Apply CEN refinement if enabled
if self.refinement_mode:
fallback_result = self.apply_cen_refinement_with_stored_analysis(
fallback_result,
combined_result.get('is_censored', True),
combined_result.get('censorship_confidence', 'unknown')
)
# Apply deduplication
original_count = len(fallback_result['detected_masters'])
fallback_result['detected_masters'] = self.deduplicate_master_matches(fallback_result['detected_masters'])
# Use fallback results
detected_masters = fallback_result['detected_masters']
final_matches = len(detected_masters)
confidence_score = self.calculate_confidence_score(final_matches, panel_count)
print(f" Fallback completed - Using {final_matches} matches from OpenAI one-at-a-time")
# Mark that fallback was used
fallback_applied = True
original_detection_method = detection_method
detection_method = 'openai_one_at_a_time_fallback'
fallback_api_calls = fallback_result.get('api_calls_made', 0)
else:
print(f" Fallback failed - No additional matches found, keeping original results")
fallback_applied = False
original_detection_method = detection_method
fallback_api_calls = 0
except Exception as e:
print(f" ERROR: Fallback failed with error: {e}")
# Handle specific "Too many open files" error
if "Too many open files" in str(e) or "Errno 24" in str(e):
print(f" → This is a resource exhaustion issue - forcing cleanup...")
import gc
gc.collect()
# Try to clean up any temp detector that might exist
if 'temp_detector' in locals():
try:
if hasattr(temp_detector, 'cleanup_temp_files'):
temp_detector.cleanup_temp_files()
del temp_detector
except:
pass
print(f" → Cleanup completed, continuing with original results...")
fallback_applied = False
original_detection_method = detection_method
fallback_api_calls = 0
else:
fallback_applied = False
original_detection_method = detection_method
fallback_api_calls = 0
# Build result dictionary
result = {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
'analysis': f"Parallel hybrid processing: {detection_method}",
'detection_method': detection_method,
'panel_count': panel_count,
'panel_threshold': self.panel_threshold,
'processing_mode': 'hybrid_parallel',
'layout_path': str(layout_path),
'confidence_score': confidence_score,
'api_calls_used': 1 + fallback_api_calls, # Consolidated panel counting + censorship + fallback calls
'truncation_applied': truncation_applied,
'original_match_count': original_match_count,
'final_match_count': final_matches,
'removed_count': removed_count,
'deduplication_applied': duplicates_removed > 0,
'duplicates_removed': duplicates_removed,
'fallback_applied': fallback_applied
}
# Add fallback-specific fields if fallback was used
if fallback_applied:
result['original_detection_method'] = original_detection_method
result['processing_mode'] = 'hybrid_parallel_with_fallback'
# Add panel and censorship analysis
result['panel_analysis'] = {
'panel_count': panel_count,
'confidence': combined_result.get('panel_confidence', 'unknown'),
'analysis': combined_result.get('panel_analysis', ''),
'panel_descriptions': combined_result.get('panel_descriptions', [])
}
result['censorship_analysis'] = {
'is_censored': combined_result.get('is_censored', True),
'confidence': combined_result.get('censorship_confidence', 'unknown'),
'analysis': combined_result.get('censorship_analysis', ''),
'coverage_details': combined_result.get('coverage_details', '')
}
# Add inlier analysis specific results
if 'analysis_mode' in inlier_result:
result['inlier_analysis_mode'] = inlier_result['analysis_mode']
if 'total_masters_checked' in inlier_result:
result['total_masters_checked'] = inlier_result['total_masters_checked']
if 'potential_matches_found' in inlier_result:
result['potential_matches_found'] = inlier_result['potential_matches_found']
if 'workers_used' in inlier_result:
result['inlier_workers_used'] = inlier_result['workers_used']
return result
def _create_layout_result(self, result, layout_path):
"""Create standardized layout result dictionary"""
layout_result = {
'layout_filename': layout_path.name,
'detected_master_ids': result.get('detected_master_ids', []),
'detected_master_filenames': result.get('detected_master_filenames', []),
'analysis': result.get('analysis', ''),
'detection_method': result.get('detection_method', 'unknown'),
'panel_count': result.get('panel_count', 1),
'panel_threshold': self.panel_threshold,
'processing_mode': result.get('processing_mode', 'hybrid_parallel'),
'confidence_score': result.get('confidence_score', 0.0)
}
# Add optional fields if present
optional_fields = [
'panel_analysis', 'censorship_analysis', 'total_masters_checked',
'truncation_applied', 'original_match_count', 'final_match_count',
'removed_count', 'deduplication_applied', 'duplicates_removed',
'inlier_analysis_mode', 'potential_matches_found', 'inlier_workers_used'
]
for field in optional_fields:
if field in result:
layout_result[field] = result[field]
# Add cost breakdown if available
cost_breakdown = cost_calculator.get_layout_cost_breakdown(layout_path.name)
if cost_breakdown:
layout_result['cost_breakdown'] = cost_breakdown
return layout_result
def _create_error_result(self, layout_path, error):
"""Create error result dictionary"""
return {
'layout_filename': layout_path.name,
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': f'Error in parallel hybrid processing: {error}',
'detection_method': 'error',
'panel_count': 0,
'panel_threshold': self.panel_threshold,
'processing_mode': 'hybrid_parallel_error',
'confidence_score': 0.0,
'error': str(error)
}
def _monitor_progress(self):
"""Monitor and display progress periodically"""
monitor_cycles = 0
last_completed = 0
stall_count = 0
while True:
time.sleep(10) # Update every 10 seconds
monitor_cycles += 1
# Update queue size
if self.inlier_coordinator:
queue_size = self.inlier_coordinator.get_queue_size()
self.progress_tracker.update_queue_size(queue_size)
# Check for potential stalls
info = self.progress_tracker.get_progress_info()
current_completed = info['completed']
# Detect stall condition
if current_completed == last_completed and queue_size > 0:
stall_count += 1
if stall_count >= 6: # 60 seconds of no progress
print(f" → STALL DETECTED: No progress for 60s with {queue_size} items in queue")
current_task = self.inlier_coordinator.get_current_task_info()
if current_task:
processing_time = time.time() - current_task['started_at']
print(f" → Current task: {current_task['layout_id']} ({current_task['analysis_type']}) - {processing_time:.1f}s")
# If a task is taking too long, provide information but don't timeout
if current_task and processing_time > 300: # 5 minutes
print(f" → INFO: Long-running task detected ({processing_time:.1f}s) - continuing to wait for completion")
# Automatic queue pressure relief
if queue_size >= 3 and stall_count >= 6:
print(f" → QUEUE PRESSURE RELIEF: Reducing layout workers to help with bottleneck")
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
original_workers = self.layout_workers
self.layout_workers = max(1, self.layout_workers - 1)
print(f" → Reduced layout workers: {original_workers}{self.layout_workers}")
else:
print(f" → Cannot reduce layout workers further (current: {getattr(self, 'layout_workers', 'unknown')})")
stall_count = 0 # Reset after taking action
else:
stall_count = 0
last_completed = current_completed
# Print progress
self.progress_tracker.print_progress()
# Monitor memory and adjust workers every 30 seconds (3 cycles)
if monitor_cycles % 3 == 0:
self._monitor_memory_and_adjust_workers()
# Check if we're done
info = self.progress_tracker.get_progress_info()
if info['completed'] + info['failed'] >= info['total']:
break
def _print_parallel_statistics(self, total_layouts, local_analysis_count, split_analysis_count,
truncation_count, total_truncated_matches, total_api_calls, total_time):
"""Print parallel processing statistics"""
print(f"\n{'='*60}")
print("PARALLEL HYBRID PROCESSING STATISTICS")
print(f"{'='*60}")
print(f"Total layouts processed: {total_layouts}")
print(f"Layout workers used: {self.layout_workers}")
print(f"Local analysis used: {local_analysis_count} ({local_analysis_count/total_layouts*100:.1f}%)")
print(f"Split + analysis used: {split_analysis_count} ({split_analysis_count/total_layouts*100:.1f}%)")
print(f"Truncation applied: {truncation_count} layouts ({truncation_count/total_layouts*100:.1f}%)")
print(f"Total matches truncated: {total_truncated_matches}")
print(f"Total API calls made: {total_api_calls}")
print(f"Average API calls per layout: {total_api_calls/total_layouts:.1f}")
print(f"Estimated cost savings vs one-at-a-time: {(1 - total_api_calls/(total_layouts * (len(self.master_images) + 1)))*100:.1f}%")
print(f"Total processing time: {total_time/60:.1f} minutes")
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
# Estimate speedup
if hasattr(self, 'layout_workers') and self.layout_workers > 1:
estimated_sequential_time = total_time * self.layout_workers
print(f"Estimated sequential time: {estimated_sequential_time/60:.1f} minutes")
print(f"Parallel speedup: {estimated_sequential_time/total_time:.1f}x")
def _monitor_memory_and_adjust_workers(self):
"""Monitor memory usage and dynamically adjust worker counts"""
try:
memory_usage = psutil.virtual_memory().percent
swap_usage = psutil.swap_memory().percent
# Get queue information for bottleneck detection
queue_size = 0
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
queue_size = self.inlier_coordinator.get_queue_size()
# Check if we need to reduce workers due to memory pressure
# Very lenient swap usage threshold since swap being full is acceptable
# Also consider queue pressure as a factor
memory_pressure = memory_usage > 85 or (swap_usage > 95 and memory_usage > 80)
queue_pressure = queue_size >= 3
if memory_pressure or queue_pressure:
adjustments_made = False
if memory_pressure:
print(f" → Memory pressure detected: {memory_usage:.1f}% RAM, {swap_usage:.1f}% swap")
# For memory pressure, reduce both types of workers
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
original_workers = self.layout_workers
self.layout_workers = max(1, self.layout_workers - 1)
print(f" → Reduced layout workers: {original_workers}{self.layout_workers}")
adjustments_made = True
if hasattr(self, 'local_workers') and isinstance(self.local_workers, int) and self.local_workers > 1:
original_local = self.local_workers
self.local_workers = max(1, self.local_workers - 1)
print(f" → Reduced local workers: {original_local}{self.local_workers}")
# Update inlier coordinator if it exists
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
self.inlier_coordinator.local_workers = self.local_workers
adjustments_made = True
if queue_pressure:
print(f" → Queue pressure detected: {queue_size} items in inlier queue")
# For queue pressure, ONLY reduce layout workers (producers)
# DO NOT reduce local workers (consumers) - that makes the problem worse!
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
original_workers = self.layout_workers
self.layout_workers = max(1, self.layout_workers - 1)
print(f" → Reduced layout workers to reduce queue pressure: {original_workers}{self.layout_workers}")
adjustments_made = True
else:
print(f" → Cannot reduce layout workers further (current: {self.layout_workers})")
return adjustments_made
# Check if we can safely increase workers
# More lenient conditions for increasing workers
elif memory_usage < 75 and swap_usage < 80:
cpu_count = os.cpu_count()
# Can we increase layout workers?
if (hasattr(self, 'layout_workers') and
hasattr(self, '_layout_workers_config') and
self._layout_workers_config is None and # Only if auto-detected
isinstance(self.layout_workers, int) and
self.layout_workers < min(4, cpu_count // 2)):
original_workers = self.layout_workers
self.layout_workers = min(4, self.layout_workers + 1)
print(f" → Increased layout workers: {original_workers}{self.layout_workers}")
return True # Adjustments made
# Can we increase local workers?
if (hasattr(self, 'local_workers') and
hasattr(self, '_local_workers_config') and
self._local_workers_config is None and # Only if auto-detected
isinstance(self.local_workers, int) and
self.local_workers < max(1, cpu_count - 2)):
original_local = self.local_workers
self.local_workers = min(cpu_count - 2, self.local_workers + 1)
print(f" → Increased local workers: {original_local}{self.local_workers}")
# Update inlier coordinator if it exists
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
self.inlier_coordinator.local_workers = self.local_workers
return True # Adjustments made
return False # No adjustments needed
except Exception as e:
print(f" → Error monitoring memory: {e}")
return False
def _handle_worker_failure(self, failed_layout, exception):
"""Handle individual layout worker failures gracefully"""
print(f" → Worker failure detected for {failed_layout}: {exception}")
# Check if it's a memory-related error
error_str = str(exception).lower()
if any(keyword in error_str for keyword in ['memory', 'out of memory', 'memoryerror', 'killed']):
print(f" → Memory-related failure detected, reducing worker counts")
# Emergency reduction of all workers
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
self.layout_workers = max(1, self.layout_workers // 2)
print(f" → Emergency: Reduced layout workers to {self.layout_workers}")
if hasattr(self, 'local_workers') and isinstance(self.local_workers, int) and self.local_workers > 1:
self.local_workers = max(1, self.local_workers // 2)
print(f" → Emergency: Reduced local workers to {self.local_workers}")
# Update inlier coordinator
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
self.inlier_coordinator.local_workers = self.local_workers
# Return an error result
return {
'layout_filename': failed_layout,
'detected_master_ids': [],
'detected_master_filenames': [],
'analysis': f'Worker failure: {exception}',
'detection_method': 'worker_failure',
'panel_count': 0,
'panel_threshold': self.panel_threshold,
'processing_mode': 'hybrid_parallel_worker_failure',
'confidence_score': 0.0,
'error': str(exception),
'worker_failure': True
}
def _check_resource_usage(self):
"""Check and log current resource usage"""
try:
import resource
import gc
# Get current file descriptor usage
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
# Get current process info
import os
pid = os.getpid()
# Count open file descriptors (macOS specific)
try:
import subprocess
result = subprocess.run(['lsof', '-p', str(pid)], capture_output=True, text=True)
open_files = len(result.stdout.strip().split('\n')) - 1 # -1 for header
except:
open_files = "unknown"
# Force garbage collection
gc.collect()
print(f" → Resource check: {open_files} open files, limit: {soft_limit}/{hard_limit}")
# Warn if getting close to limit
if isinstance(open_files, int) and open_files > soft_limit * 0.8:
print(f" → WARNING: High file descriptor usage ({open_files}/{soft_limit})")
print(f" → Consider reducing concurrent workers or increasing system limits")
except Exception as e:
print(f" → Could not check resource usage: {e}")
def save_results(self, results: Dict, filename: str = "hybrid_detection_results") -> str:
"""Save hybrid detection results to JSON file"""
output_path = self.results_path / f"{filename}.json"
# Add metadata
model_description = 'openai_o3_plus_vector_similarity' if self.vector_mode else 'openai_o3_plus_local_analysis'
processing_mode = 'hybrid_vector' if self.vector_mode else 'hybrid'
output_data = {
'metadata': {
'total_layouts_processed': len(results),
'total_master_images': len(self.master_images),
'master_images_available': list(self.master_files.keys()),
'provider': 'hybrid',
'model': model_description,
'panel_threshold': self.panel_threshold,
'inlier_threshold': self.inlier_threshold,
'greyscale_enabled': self.enable_greyscale,
'contrast_enhancement_enabled': self.enable_contrast_enhancement,
'processing_mode': processing_mode,
'vector_mode': self.vector_mode
},
'results': results
}
# Add vector mode specific metadata
if self.vector_mode:
output_data['metadata']['similarity_threshold'] = self.similarity_threshold
output_data['metadata']['embedding_model'] = 'Google Vertex AI multimodalembedding@001'
output_data['metadata']['embedding_dimensions'] = 1408
# Add cost tracking metadata if enabled
if cost_calculator.enable_tracking:
session_summary = cost_calculator.get_session_summary()
if session_summary['tracking_enabled']:
output_data['metadata']['cost_tracking'] = session_summary
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"Results saved to: {output_path}")
return str(output_path)
def generate_summary(self, results: Dict) -> Dict:
"""Generate summary statistics for hybrid detection"""
total_layouts = len(results)
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
# Count detection methods used
local_analysis_count = sum(1 for r in results.values() if r.get('detection_method') in ['local_inlier_analysis', 'vector_similarity'])
split_analysis_count = sum(1 for r in results.values() if r.get('detection_method') in ['split_and_inlier_analysis', 'split_and_vector_similarity'])
# Count master image occurrences
master_counts = {}
for result in results.values():
for master_id in result['detected_master_ids']:
master_counts[master_id] = master_counts.get(master_id, 0) + 1
# Deduplication statistics
layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False))
total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values())
# Truncation statistics
layouts_with_truncation = sum(1 for r in results.values() if r.get('truncation_applied', False))
total_matches_removed = sum(r.get('removed_count', 0) for r in results.values())
summary = {
'total_layouts_processed': total_layouts,
'layouts_with_matches': layouts_with_matches,
'layouts_without_matches': total_layouts - layouts_with_matches,
'local_analysis_used': local_analysis_count,
'split_analysis_used': split_analysis_count,
'local_analysis_percentage': round(local_analysis_count / total_layouts * 100, 1) if total_layouts > 0 else 0,
'split_analysis_percentage': round(split_analysis_count / total_layouts * 100, 1) if total_layouts > 0 else 0,
'master_image_usage': master_counts,
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
# Deduplication stats
'layouts_with_deduplication': layouts_with_deduplication,
'total_duplicates_removed': total_duplicates_removed,
'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0,
# Truncation stats
'layouts_with_truncation': layouts_with_truncation,
'total_matches_removed_by_truncation': total_matches_removed,
'truncation_rate': round(layouts_with_truncation / total_layouts * 100, 1) if total_layouts > 0 else 0,
'provider': 'hybrid',
'model': 'openai_o3_plus_vector_similarity' if self.vector_mode else 'openai_o3_plus_local_analysis_plus_split',
'panel_threshold': self.panel_threshold,
'inlier_threshold': self.inlier_threshold,
'vector_mode': self.vector_mode
}
# Add vector mode specific fields
if self.vector_mode:
summary['similarity_threshold'] = self.similarity_threshold
summary['embedding_model'] = 'Google Vertex AI multimodalembedding@001'
summary['embedding_dimensions'] = 1408
return summary
def apply_cen_refinement_with_stored_analysis(self, initial_results: Dict, is_layout_censored: bool, censorship_confidence: str) -> Dict:
"""
Apply CEN refinement using stored censorship analysis from the consolidated API call
"""
layout_name = Path(initial_results.get('layout_path', 'unknown')).name
detected_masters = initial_results.get('detected_masters', [])
# Find CEN images in the results
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
if not cen_images:
# No CEN images found, return original results
return initial_results
print(f" Refining {len(cen_images)} CEN matches for {layout_name}")
print(f" Using stored censorship analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {censorship_confidence})")
refined_masters = []
refinement_details = []
changes_made = 0
# Process each detected image
for master_id in detected_masters:
if self.is_cen_image(master_id):
# This is a CEN image
non_cen_id = self.find_corresponding_non_cen_image(master_id)
if not is_layout_censored and non_cen_id:
# Layout is uncensored, switch to non-CEN version
refined_masters.append(non_cen_id)
refinement_details.append({
'original_cen_match': master_id,
'non_cen_alternative': non_cen_id,
'final_choice': non_cen_id,
'confidence': censorship_confidence,
'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}",
'changed': True,
'reason': 'layout_uncensored'
})
changes_made += 1
print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)")
else:
# Layout is censored or no non-CEN alternative, keep CEN version
refined_masters.append(master_id)
reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative'
refinement_details.append({
'original_cen_match': master_id,
'non_cen_alternative': non_cen_id,
'final_choice': master_id,
'confidence': censorship_confidence,
'analysis': f"Layout is censored, keeping CEN version: {master_id}",
'changed': False,
'reason': reason
})
print(f" → Kept {master_id} (layout is censored)")
else:
# Not a CEN image, keep as is
refined_masters.append(master_id)
# Update the results
refined_results = initial_results.copy()
refined_results['detected_masters'] = refined_masters
refined_results['detected_master_ids'] = refined_masters
refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters]
# Store refinement information
refined_results['cen_refinement_applied'] = True
refined_results['cen_refinement_details'] = refinement_details
refined_results['cen_refinement_changes'] = changes_made
refined_results['censorship_used_stored_analysis'] = True
print(f" → CEN refinement completed: {changes_made} changes made")
return refined_results
def calculate_confidence_score(self, final_matches: int, panel_count: int) -> float:
"""
Calculate confidence score as percentage ratio of final matches to detected panels
Args:
final_matches: Number of final detected matches after deduplication and refinement
panel_count: Number of panels detected by OpenAI O3
Returns:
float: Confidence percentage (0.0 to 100.0)
"""
if panel_count == 0:
return 0.0
# Calculate raw percentage
raw_percentage = (final_matches / panel_count) * 100
# Cap at 100% (can't have more confidence than 100%)
confidence_percentage = min(raw_percentage, 100.0)
return confidence_percentage