2939 lines
No EOL
143 KiB
Python
2939 lines
No EOL
143 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Hybrid Image Detection Module
|
|
Combines OpenAI O3 panel counting with local inlier analysis for cost-efficient detection
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import base64
|
|
import queue
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
import numpy as np
|
|
import cv2
|
|
import concurrent.futures
|
|
import threading
|
|
import multiprocessing
|
|
import psutil
|
|
import pickle
|
|
from openai_detector import OpenAIImageDetector
|
|
from panel_splitter import PanelSplitter
|
|
from advanced_splitter import AdvancedPanelSplitter
|
|
from simple_splitter import SimplePanelSplitter
|
|
from memory_manager import MemoryManager, memory_safe_execution, reduce_feature_count
|
|
from cost_calculator import cost_calculator, extract_token_usage_from_response
|
|
|
|
# Vector mode imports
|
|
try:
|
|
from google.cloud import aiplatform
|
|
from vertexai.vision_models import MultiModalEmbeddingModel, Image as VertexImage
|
|
VERTEX_AI_AVAILABLE = True
|
|
except ImportError:
|
|
VERTEX_AI_AVAILABLE = False
|
|
|
|
|
|
def process_single_master_inlier_analysis(layout_path, master_id, master_path, min_good_matches=10, max_features=15000):
|
|
"""
|
|
Standalone function for processing a single master inlier analysis in a separate process.
|
|
Memory-safe version with feature limiting.
|
|
"""
|
|
try:
|
|
import cv2
|
|
import numpy as np
|
|
import psutil
|
|
import gc
|
|
import os
|
|
from pathlib import Path
|
|
# Note: cost_calculator import removed from multiprocessing function
|
|
|
|
|
|
# Initialize OpenCV components in this process
|
|
akaze = cv2.AKAZE_create()
|
|
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
|
|
|
# Load images in grayscale for feature detection
|
|
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
|
|
master_img = cv2.imread(master_path, cv2.IMREAD_GRAYSCALE)
|
|
|
|
if layout_img is None or master_img is None:
|
|
return {
|
|
'master_id': master_id,
|
|
'inliers': 0,
|
|
'confidence': 'low',
|
|
'error': 'Could not read one or both images'
|
|
}
|
|
|
|
# Check memory before feature detection
|
|
memory = psutil.virtual_memory()
|
|
if memory.percent > 85:
|
|
return {
|
|
'master_id': master_id,
|
|
'inliers': 0,
|
|
'confidence': 'low',
|
|
'error': f'Memory usage too high: {memory.percent:.1f}%'
|
|
}
|
|
|
|
# Detect keypoints and descriptors
|
|
kp1, des1 = akaze.detectAndCompute(layout_img, None)
|
|
kp2, des2 = akaze.detectAndCompute(master_img, None)
|
|
|
|
if des1 is None or des2 is None:
|
|
return {
|
|
'master_id': master_id,
|
|
'inliers': 0,
|
|
'confidence': 'low',
|
|
'error': 'No features detected in one or both images'
|
|
}
|
|
|
|
# Limit features to prevent memory explosion
|
|
if len(kp1) > max_features:
|
|
# Keep best features based on response
|
|
responses = [kp.response for kp in kp1]
|
|
indices = np.argsort(responses)[-max_features:]
|
|
kp1 = [kp1[i] for i in indices]
|
|
des1 = des1[indices]
|
|
|
|
if len(kp2) > max_features:
|
|
responses = [kp.response for kp in kp2]
|
|
indices = np.argsort(responses)[-max_features:]
|
|
kp2 = [kp2[i] for i in indices]
|
|
des2 = des2[indices]
|
|
|
|
# Match features using k-nearest neighbors
|
|
matches = bf.knnMatch(des1, des2, k=2)
|
|
|
|
# Apply Lowe's ratio test to filter good matches
|
|
good_matches = []
|
|
for match_pair in matches:
|
|
if len(match_pair) == 2:
|
|
m, n = match_pair
|
|
if m.distance < 0.80 * n.distance:
|
|
good_matches.append(m)
|
|
|
|
if len(good_matches) < min_good_matches:
|
|
return {
|
|
'master_id': master_id,
|
|
'inliers': 0,
|
|
'confidence': 'low',
|
|
'good_matches': len(good_matches),
|
|
'reason': f'Insufficient good matches: {len(good_matches)} < {min_good_matches}'
|
|
}
|
|
|
|
# Extract matched points
|
|
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
|
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
|
|
|
# Find homography using RANSAC
|
|
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 7.0)
|
|
|
|
if mask is None:
|
|
return {
|
|
'master_id': master_id,
|
|
'inliers': 0,
|
|
'confidence': 'low',
|
|
'good_matches': len(good_matches),
|
|
'error': 'Homography estimation failed'
|
|
}
|
|
|
|
# Count inliers
|
|
inliers = int(np.sum(mask))
|
|
|
|
# Determine confidence based on inlier count and ratio
|
|
inlier_ratio = inliers / len(good_matches)
|
|
if inliers >= 30 and inlier_ratio >= 0.5:
|
|
confidence = 'high'
|
|
elif inliers >= 15 and inlier_ratio >= 0.3:
|
|
confidence = 'medium'
|
|
else:
|
|
confidence = 'low'
|
|
|
|
return {
|
|
'master_id': master_id,
|
|
'inliers': inliers,
|
|
'confidence': confidence,
|
|
'good_matches': len(good_matches),
|
|
'inlier_ratio': round(inlier_ratio, 3),
|
|
'total_features_layout': len(kp1),
|
|
'total_features_master': len(kp2)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'master_id': master_id,
|
|
'inliers': 0,
|
|
'confidence': 'low',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
class InlierAnalysisCoordinator:
|
|
"""
|
|
Coordinates serial execution of inlier analysis tasks while allowing parallel layout processing.
|
|
Ensures that only one inlier analysis runs at a time to avoid overwhelming the system.
|
|
"""
|
|
|
|
def __init__(self, local_workers, memory_manager, min_good_matches=10):
|
|
self.local_workers = local_workers
|
|
self.memory_manager = memory_manager
|
|
self.min_good_matches = min_good_matches
|
|
|
|
# Task coordination
|
|
self.task_queue = queue.Queue()
|
|
self.worker_thread = None
|
|
self.shutdown_event = threading.Event()
|
|
self.active_analysis_lock = threading.Lock()
|
|
|
|
# Statistics
|
|
self.total_tasks_processed = 0
|
|
self.current_task_info = None
|
|
self.stats_lock = threading.Lock()
|
|
|
|
def start(self):
|
|
"""Start the inlier analysis worker thread"""
|
|
if self.worker_thread is None or not self.worker_thread.is_alive():
|
|
self.shutdown_event.clear()
|
|
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
|
self.worker_thread.start()
|
|
print(f"InlierAnalysisCoordinator started with {self.local_workers} workers")
|
|
|
|
def stop(self):
|
|
"""Stop the inlier analysis worker thread"""
|
|
if self.worker_thread and self.worker_thread.is_alive():
|
|
self.shutdown_event.set()
|
|
self.worker_thread.join(timeout=5.0)
|
|
print("InlierAnalysisCoordinator stopped")
|
|
|
|
def submit_analysis(self, layout_id, analysis_type, analysis_params, result_future):
|
|
"""
|
|
Submit an inlier analysis task to the serial queue
|
|
|
|
Args:
|
|
layout_id: Unique identifier for the layout
|
|
analysis_type: Type of analysis ('direct' or 'split')
|
|
analysis_params: Parameters for the analysis
|
|
result_future: Future object to signal completion
|
|
"""
|
|
task = {
|
|
'layout_id': layout_id,
|
|
'analysis_type': analysis_type,
|
|
'analysis_params': analysis_params,
|
|
'result_future': result_future,
|
|
'submitted_at': time.time()
|
|
}
|
|
|
|
self.task_queue.put(task)
|
|
print(f" → Submitted {analysis_type} analysis for {layout_id} to inlier queue (queue size: {self.task_queue.qsize()})")
|
|
|
|
def get_queue_size(self):
|
|
"""Get current queue size"""
|
|
return self.task_queue.qsize()
|
|
|
|
def get_current_task_info(self):
|
|
"""Get information about currently processing task"""
|
|
with self.stats_lock:
|
|
return self.current_task_info.copy() if self.current_task_info else None
|
|
|
|
def _worker_loop(self):
|
|
"""Main worker loop - processes inlier analysis tasks serially"""
|
|
print("InlierAnalysisCoordinator worker loop started")
|
|
|
|
while not self.shutdown_event.is_set():
|
|
try:
|
|
# Get next task with timeout
|
|
task = self.task_queue.get(timeout=1.0)
|
|
|
|
# Process the task while holding the analysis lock
|
|
with self.active_analysis_lock:
|
|
self._process_inlier_analysis_task(task)
|
|
|
|
# Mark task as done
|
|
self.task_queue.task_done()
|
|
|
|
except queue.Empty:
|
|
continue
|
|
except Exception as e:
|
|
print(f"Error in inlier analysis worker loop: {e}")
|
|
# Continue processing other tasks
|
|
continue
|
|
|
|
def _process_inlier_analysis_task(self, task):
|
|
"""Process a single inlier analysis task"""
|
|
layout_id = task['layout_id']
|
|
analysis_type = task['analysis_type']
|
|
analysis_params = task['analysis_params']
|
|
result_future = task['result_future']
|
|
|
|
# Update current task info
|
|
with self.stats_lock:
|
|
self.current_task_info = {
|
|
'layout_id': layout_id,
|
|
'analysis_type': analysis_type,
|
|
'started_at': time.time(),
|
|
'queue_wait_time': time.time() - task['submitted_at']
|
|
}
|
|
|
|
print(f" → Processing {analysis_type} inlier analysis for {layout_id} (queue wait: {self.current_task_info['queue_wait_time']:.1f}s)")
|
|
|
|
try:
|
|
# Perform the actual analysis based on type
|
|
if analysis_type == 'direct':
|
|
result = self._perform_direct_inlier_analysis(layout_id, analysis_params)
|
|
elif analysis_type == 'split':
|
|
result = self._perform_split_inlier_analysis(layout_id, analysis_params)
|
|
else:
|
|
raise ValueError(f"Unknown analysis type: {analysis_type}")
|
|
|
|
# Signal completion with result
|
|
result_future.set_result(result)
|
|
|
|
# Update statistics
|
|
with self.stats_lock:
|
|
self.total_tasks_processed += 1
|
|
processing_time = time.time() - self.current_task_info['started_at']
|
|
print(f" → Completed {analysis_type} analysis for {layout_id} in {processing_time:.1f}s")
|
|
self.current_task_info = None
|
|
|
|
except Exception as e:
|
|
print(f" → Error processing {analysis_type} analysis for {layout_id}: {e}")
|
|
# Signal error to the waiting layout worker
|
|
result_future.set_exception(e)
|
|
|
|
# Clear current task info
|
|
with self.stats_lock:
|
|
self.current_task_info = None
|
|
|
|
def _perform_direct_inlier_analysis(self, layout_id, params):
|
|
"""Perform direct inlier analysis on a layout"""
|
|
layout_path = params['layout_path']
|
|
master_images = params['master_images']
|
|
vector_mode = params.get('vector_mode', False)
|
|
|
|
if vector_mode:
|
|
return self._perform_vector_similarity_analysis(layout_path, params)
|
|
else:
|
|
return self._perform_opencv_inlier_analysis(layout_path, master_images, params)
|
|
|
|
def _perform_split_inlier_analysis(self, layout_id, params):
|
|
"""Perform inlier analysis on split panels"""
|
|
split_panels = params['split_panels']
|
|
master_images = params['master_images']
|
|
vector_mode = params.get('vector_mode', False)
|
|
|
|
if vector_mode:
|
|
return self._perform_split_vector_analysis(split_panels, params)
|
|
else:
|
|
return self._perform_split_opencv_analysis(split_panels, master_images, params)
|
|
|
|
def _perform_opencv_inlier_analysis(self, layout_path, master_images, params):
|
|
"""Perform OpenCV-based inlier analysis using existing multiprocessing logic"""
|
|
try:
|
|
# Initialize OpenCV components
|
|
akaze = cv2.AKAZE_create()
|
|
|
|
# Load layout image for feature detection
|
|
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
|
|
if layout_img is None:
|
|
raise Exception(f"Could not load layout image: {layout_path}")
|
|
|
|
# Detect features in layout image
|
|
layout_kp, layout_desc = akaze.detectAndCompute(layout_img, None)
|
|
if layout_desc is None:
|
|
raise Exception("No features detected in layout image")
|
|
|
|
feature_count = len(layout_kp)
|
|
|
|
# Dynamically adjust worker count based on feature count and memory
|
|
safe_workers = self._calculate_safe_worker_count(feature_count)
|
|
|
|
# Prepare tasks for multiprocessing
|
|
tasks = []
|
|
max_features = min(10000, feature_count // 2) if feature_count > 20000 else 10000
|
|
|
|
for master_id, master_path in master_images.items():
|
|
tasks.append((layout_path, master_id, master_path, self.min_good_matches, max_features))
|
|
|
|
# Process masters in parallel using existing function
|
|
master_results = []
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=safe_workers) as executor:
|
|
future_to_master = {
|
|
executor.submit(process_single_master_inlier_analysis, *task_args): task_args[1]
|
|
for task_args in tasks
|
|
}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_master):
|
|
master_id = future_to_master[future]
|
|
try:
|
|
result = future.result()
|
|
if result.get('inliers', 0) > 0 and result.get('confidence', 'low') != 'low':
|
|
master_results.append({
|
|
'master_id': master_id,
|
|
'inliers': result.get('inliers', 0),
|
|
'confidence': result.get('confidence', 'low'),
|
|
'details': result
|
|
})
|
|
except Exception as e:
|
|
print(f" → Error processing master {master_id}: {e}")
|
|
|
|
# Sort and process results
|
|
master_results.sort(key=lambda x: x['inliers'], reverse=True)
|
|
|
|
# Apply thresholds
|
|
detected_masters = []
|
|
inlier_scores = {}
|
|
|
|
if master_results:
|
|
best_match = master_results[0]
|
|
best_inliers = best_match['inliers']
|
|
inlier_threshold = params.get('inlier_threshold', 0.65)
|
|
inlier_ratio_threshold = params.get('inlier_ratio_threshold', 0.4)
|
|
|
|
min_inliers = max(self.min_good_matches,
|
|
best_inliers * inlier_ratio_threshold,
|
|
int(best_inliers * inlier_threshold))
|
|
|
|
for result in master_results:
|
|
inliers = result['inliers']
|
|
confidence = result['confidence']
|
|
master_id = result['master_id']
|
|
|
|
inlier_scores[master_id] = inliers
|
|
|
|
if (inliers >= min_inliers and
|
|
confidence in ['high', 'medium'] and
|
|
inliers >= self.min_good_matches):
|
|
detected_masters.append(master_id)
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'inlier_scores': inlier_scores,
|
|
'total_masters_checked': len(master_images),
|
|
'potential_matches_found': len(master_results),
|
|
'analysis_mode': 'opencv_inlier_analysis',
|
|
'layout_features': feature_count,
|
|
'workers_used': safe_workers
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'detected_masters': [],
|
|
'inlier_scores': {},
|
|
'error': str(e),
|
|
'analysis_mode': 'opencv_inlier_analysis_error'
|
|
}
|
|
|
|
def _perform_vector_similarity_analysis(self, layout_path, params):
|
|
"""Perform vector similarity analysis"""
|
|
try:
|
|
embedding_model = params['embedding_model']
|
|
master_embeddings = params['master_embeddings']
|
|
similarity_threshold = params.get('similarity_threshold', 0.75)
|
|
|
|
# Generate embedding for layout
|
|
layout_embedding = self._generate_layout_embedding(layout_path, embedding_model)
|
|
if layout_embedding is None:
|
|
raise Exception("Failed to generate layout embedding")
|
|
|
|
# Compare with master embeddings
|
|
similarities = {}
|
|
detected_masters = []
|
|
|
|
for master_id, master_embedding in master_embeddings.items():
|
|
similarity = self._compute_cosine_similarity(layout_embedding, master_embedding)
|
|
similarities[master_id] = similarity
|
|
|
|
if similarity >= similarity_threshold:
|
|
detected_masters.append(master_id)
|
|
|
|
# Sort by similarity
|
|
detected_masters.sort(key=lambda x: similarities[x], reverse=True)
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'inlier_scores': similarities, # Use similarities as scores for compatibility
|
|
'total_masters_checked': len(master_embeddings),
|
|
'analysis_mode': 'vector_similarity_analysis',
|
|
'similarity_threshold': similarity_threshold
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'detected_masters': [],
|
|
'inlier_scores': {},
|
|
'error': str(e),
|
|
'analysis_mode': 'vector_similarity_analysis_error'
|
|
}
|
|
|
|
def _perform_split_opencv_analysis(self, split_panels, master_images, params):
|
|
"""Perform OpenCV inlier analysis on split panels"""
|
|
try:
|
|
# Prepare tasks for all split panels against all masters
|
|
tasks = []
|
|
split_panel_paths = []
|
|
|
|
for i, split_info in enumerate(split_panels):
|
|
# Save split panel to temporary file
|
|
split_path = f"/tmp/split_panel_{i}_{int(time.time())}.jpg"
|
|
cv2.imwrite(split_path, split_info['image'])
|
|
split_panel_paths.append(split_path)
|
|
|
|
# Create tasks for this split panel against all masters
|
|
for master_id, master_path in master_images.items():
|
|
tasks.append((split_path, master_id, master_path, self.min_good_matches, 15000))
|
|
|
|
# Process all tasks in parallel
|
|
all_results = []
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=self.local_workers) as executor:
|
|
future_to_task = {
|
|
executor.submit(process_single_master_inlier_analysis, *task_args): task_args
|
|
for task_args in tasks
|
|
}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_task):
|
|
try:
|
|
result = future.result()
|
|
if result.get('confidence') != 'low' and result.get('inliers', 0) > 0:
|
|
all_results.append(result)
|
|
except Exception as e:
|
|
print(f" → Error processing split task: {e}")
|
|
|
|
# Clean up temporary files
|
|
for split_path in split_panel_paths:
|
|
try:
|
|
os.remove(split_path)
|
|
except:
|
|
pass
|
|
|
|
# Process results
|
|
detected_masters = []
|
|
inlier_scores = {}
|
|
|
|
if all_results:
|
|
all_results.sort(key=lambda x: x['inliers'], reverse=True)
|
|
|
|
# Apply thresholds
|
|
best_inliers = all_results[0]['inliers']
|
|
inlier_threshold = params.get('inlier_threshold', 0.65)
|
|
inlier_ratio_threshold = params.get('inlier_ratio_threshold', 0.4)
|
|
|
|
min_inliers = max(self.min_good_matches,
|
|
best_inliers * inlier_ratio_threshold,
|
|
int(best_inliers * inlier_threshold))
|
|
|
|
for result in all_results:
|
|
master_id = result['master_id']
|
|
inliers = result['inliers']
|
|
|
|
# Keep highest score for each master
|
|
if master_id not in inlier_scores or inliers > inlier_scores[master_id]:
|
|
inlier_scores[master_id] = inliers
|
|
|
|
if (inliers >= min_inliers and
|
|
result['confidence'] in ['high', 'medium'] and
|
|
inliers >= self.min_good_matches and
|
|
master_id not in detected_masters):
|
|
detected_masters.append(master_id)
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'inlier_scores': inlier_scores,
|
|
'total_combinations_processed': len(tasks),
|
|
'potential_matches_found': len(all_results),
|
|
'analysis_mode': 'split_opencv_analysis',
|
|
'splits_processed': len(split_panels)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'detected_masters': [],
|
|
'inlier_scores': {},
|
|
'error': str(e),
|
|
'analysis_mode': 'split_opencv_analysis_error'
|
|
}
|
|
|
|
def _perform_split_vector_analysis(self, split_panels, params):
|
|
"""Perform vector similarity analysis on split panels"""
|
|
try:
|
|
embedding_model = params['embedding_model']
|
|
master_embeddings = params['master_embeddings']
|
|
similarity_threshold = params.get('similarity_threshold', 0.75)
|
|
|
|
all_results = []
|
|
split_panel_paths = []
|
|
|
|
for i, split_info in enumerate(split_panels):
|
|
# Save split panel to temporary file
|
|
split_path = f"/tmp/split_panel_{i}_{int(time.time())}.jpg"
|
|
cv2.imwrite(split_path, split_info['image'])
|
|
split_panel_paths.append(split_path)
|
|
|
|
# Generate embedding for this split panel
|
|
split_embedding = self._generate_layout_embedding(split_path, embedding_model)
|
|
if split_embedding is None:
|
|
continue
|
|
|
|
# Compare against all master embeddings
|
|
for master_id, master_embedding in master_embeddings.items():
|
|
similarity = self._compute_cosine_similarity(split_embedding, master_embedding)
|
|
|
|
if similarity >= similarity_threshold:
|
|
all_results.append({
|
|
'master_id': master_id,
|
|
'similarity': similarity,
|
|
'split_panel': i
|
|
})
|
|
|
|
# Clean up temporary files
|
|
for split_path in split_panel_paths:
|
|
try:
|
|
os.remove(split_path)
|
|
except:
|
|
pass
|
|
|
|
# Process results
|
|
detected_masters = []
|
|
similarity_scores = {}
|
|
|
|
for result in all_results:
|
|
master_id = result['master_id']
|
|
similarity = result['similarity']
|
|
|
|
# Keep highest similarity for each master
|
|
if master_id not in similarity_scores or similarity > similarity_scores[master_id]:
|
|
similarity_scores[master_id] = similarity
|
|
|
|
if master_id not in detected_masters:
|
|
detected_masters.append(master_id)
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'inlier_scores': similarity_scores, # Use similarities as scores for compatibility
|
|
'total_combinations_processed': len(split_panels) * len(master_embeddings),
|
|
'potential_matches_found': len(all_results),
|
|
'analysis_mode': 'split_vector_analysis',
|
|
'splits_processed': len(split_panels),
|
|
'similarity_threshold': similarity_threshold
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'detected_masters': [],
|
|
'inlier_scores': {},
|
|
'error': str(e),
|
|
'analysis_mode': 'split_vector_analysis_error'
|
|
}
|
|
|
|
def _calculate_safe_worker_count(self, feature_count):
|
|
"""Calculate safe worker count based on feature count and memory"""
|
|
if feature_count > 50000:
|
|
safe_workers = max(1, self.local_workers // 2)
|
|
elif feature_count > 30000:
|
|
safe_workers = max(1, int(self.local_workers * 0.75))
|
|
else:
|
|
safe_workers = self.local_workers
|
|
|
|
# Further limit based on memory
|
|
if self.memory_manager:
|
|
safe_workers = min(safe_workers, self.memory_manager.limit_concurrent_processes(safe_workers))
|
|
|
|
return safe_workers
|
|
|
|
def _generate_layout_embedding(self, layout_path, embedding_model):
|
|
"""Generate embedding for layout image"""
|
|
try:
|
|
from vertexai.vision_models import Image as VertexImage
|
|
vertex_image = VertexImage.load_from_file(layout_path)
|
|
response = embedding_model.get_embeddings(image=vertex_image)
|
|
return np.array(response.image_embedding)
|
|
except Exception as e:
|
|
print(f" → Error generating embedding: {e}")
|
|
return None
|
|
|
|
def _compute_cosine_similarity(self, embedding1, embedding2):
|
|
"""Compute cosine similarity between two embeddings"""
|
|
norm1 = np.linalg.norm(embedding1)
|
|
norm2 = np.linalg.norm(embedding2)
|
|
|
|
if norm1 == 0 or norm2 == 0:
|
|
return 0.0
|
|
|
|
return float(np.dot(embedding1, embedding2) / (norm1 * norm2))
|
|
|
|
|
|
class ProgressTracker:
|
|
"""
|
|
Thread-safe progress tracking for parallel layout processing
|
|
"""
|
|
|
|
def __init__(self, total_layouts):
|
|
self.total_layouts = total_layouts
|
|
self.completed_layouts = 0
|
|
self.in_progress_layouts = 0
|
|
self.failed_layouts = 0
|
|
self.inlier_queue_size = 0
|
|
self.lock = threading.Lock()
|
|
self.start_time = time.time()
|
|
|
|
def start_layout(self):
|
|
"""Mark a layout as started"""
|
|
with self.lock:
|
|
self.in_progress_layouts += 1
|
|
|
|
def complete_layout(self, success=True):
|
|
"""Mark a layout as completed"""
|
|
with self.lock:
|
|
self.in_progress_layouts -= 1
|
|
if success:
|
|
self.completed_layouts += 1
|
|
else:
|
|
self.failed_layouts += 1
|
|
|
|
def update_queue_size(self, size):
|
|
"""Update inlier queue size"""
|
|
with self.lock:
|
|
self.inlier_queue_size = size
|
|
|
|
def get_progress_info(self):
|
|
"""Get current progress information"""
|
|
with self.lock:
|
|
elapsed = time.time() - self.start_time
|
|
completed = self.completed_layouts
|
|
|
|
if completed > 0:
|
|
avg_time = elapsed / completed
|
|
remaining = (self.total_layouts - completed) * avg_time
|
|
eta_mins = remaining / 60
|
|
else:
|
|
eta_mins = 0
|
|
|
|
return {
|
|
'total': self.total_layouts,
|
|
'completed': completed,
|
|
'in_progress': self.in_progress_layouts,
|
|
'failed': self.failed_layouts,
|
|
'queue_size': self.inlier_queue_size,
|
|
'elapsed_mins': elapsed / 60,
|
|
'eta_mins': eta_mins,
|
|
'percentage': (completed / self.total_layouts * 100) if self.total_layouts > 0 else 0
|
|
}
|
|
|
|
def print_progress(self):
|
|
"""Print current progress"""
|
|
info = self.get_progress_info()
|
|
print(f"Progress: {info['completed']}/{info['total']} ({info['percentage']:.1f}%) "
|
|
f"| In Progress: {info['in_progress']} | Failed: {info['failed']} "
|
|
f"| Queue: {info['queue_size']} | ETA: {info['eta_mins']:.1f}min")
|
|
|
|
|
|
class HybridImageDetector(OpenAIImageDetector):
|
|
def __init__(self, panel_threshold=2, inlier_threshold=0.65,
|
|
enable_greyscale=False, enable_contrast_enhancement=False,
|
|
min_good_matches=10, inlier_ratio_threshold=0.4,
|
|
openai_workers=None, local_workers=None, split_mode=False,
|
|
split_advanced=False, split_simple=False, percentile=10, min_gap=5,
|
|
vector_mode=False, similarity_threshold=0.75, fallback_one_at_a_time=False,
|
|
parallel_layouts=False, layout_workers=None, max_concurrent_layouts=None,
|
|
no_truncation=False, **kwargs):
|
|
"""
|
|
Initialize the hybrid image detector
|
|
|
|
Args:
|
|
panel_threshold: Maximum panels to use local analysis (default: 2)
|
|
inlier_threshold: Minimum similarity threshold for local analysis (default: 0.65)
|
|
enable_greyscale: Enable greyscale processing (default: False for hybrid mode)
|
|
enable_contrast_enhancement: Enable contrast enhancement (default: False for hybrid mode)
|
|
min_good_matches: Minimum good matches for RANSAC (default: 10)
|
|
inlier_ratio_threshold: Minimum inlier ratio for confident matches (default: 0.4)
|
|
openai_workers: Number of workers for OpenAI analysis (default: auto-detect)
|
|
local_workers: Number of workers for local analysis (default: auto-detect)
|
|
split_mode: Enable traditional panel splitting
|
|
split_advanced: Enable advanced panel splitting with edge detection
|
|
split_simple: Enable simple panel splitting with even division (hybrid mode only)
|
|
percentile: Percentile threshold for gutter detection in advanced splitting (default: 10)
|
|
min_gap: Minimum gap size for gutter detection in advanced splitting (default: 5)
|
|
vector_mode: Enable vector similarity search instead of inlier analysis (default: False)
|
|
similarity_threshold: Similarity threshold for vector mode (default: 0.75)
|
|
fallback_one_at_a_time: Enable fallback to OpenAI one-at-a-time when matched masters < detected panels (default: False)
|
|
parallel_layouts: Enable parallel layout processing with serial inlier analysis coordination (default: False)
|
|
layout_workers: Number of concurrent layout workers for parallel processing (default: auto-detect)
|
|
max_concurrent_layouts: Maximum layouts processing simultaneously (default: same as layout_workers)
|
|
no_truncation: Disable truncation of match results (keeps all matches instead of limiting to panel count) (default: False)
|
|
"""
|
|
# Initialize parent class with OpenAI workers
|
|
super().__init__(
|
|
enable_greyscale=enable_greyscale,
|
|
enable_contrast_enhancement=enable_contrast_enhancement,
|
|
max_concurrent_workers=openai_workers or 5, # Temporary, will be updated after loading masters
|
|
split_mode=split_mode,
|
|
**kwargs
|
|
)
|
|
|
|
self.panel_threshold = panel_threshold
|
|
self.inlier_threshold = inlier_threshold
|
|
self.min_good_matches = min_good_matches
|
|
self.inlier_ratio_threshold = inlier_ratio_threshold
|
|
self.split_advanced = split_advanced
|
|
self.split_simple = split_simple
|
|
self.percentile = percentile
|
|
|
|
# Vector mode configuration
|
|
self.vector_mode = vector_mode
|
|
self.similarity_threshold = similarity_threshold
|
|
|
|
# Fallback configuration
|
|
self.fallback_one_at_a_time = fallback_one_at_a_time
|
|
|
|
# Parallel processing configuration
|
|
self.parallel_layouts = parallel_layouts
|
|
self.layout_workers = layout_workers
|
|
self.max_concurrent_layouts = max_concurrent_layouts
|
|
|
|
# Truncation configuration
|
|
self.no_truncation = no_truncation
|
|
|
|
# Initialize memory manager
|
|
self.memory_manager = MemoryManager(max_memory_percent=75, max_swap_percent=80)
|
|
self.min_gap = min_gap
|
|
|
|
# Store worker configurations for later auto-detection
|
|
self._openai_workers_config = openai_workers
|
|
self._local_workers_config = local_workers
|
|
self._layout_workers_config = layout_workers
|
|
|
|
# Initialize worker attributes with temporary values for CLI display
|
|
self.openai_workers = openai_workers or "auto"
|
|
self.local_workers = local_workers or "auto"
|
|
|
|
# Initialize parallel processing components
|
|
self.inlier_coordinator = None
|
|
self.progress_tracker = None
|
|
|
|
# Initialize OpenCV components for local analysis (only if not using vector mode)
|
|
if not self.vector_mode:
|
|
self.akaze = cv2.AKAZE_create()
|
|
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
|
|
|
# Initialize vector mode components if enabled
|
|
if self.vector_mode:
|
|
if not VERTEX_AI_AVAILABLE:
|
|
raise ImportError("Google Vertex AI libraries not available. Please install: pip install google-cloud-aiplatform")
|
|
|
|
# Initialize Vertex AI
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "service-account.json"
|
|
aiplatform.init(project="optical-414516", location="us-central1")
|
|
|
|
# Initialize multimodal embedding model
|
|
self.embedding_model = MultiModalEmbeddingModel.from_pretrained("multimodalembedding@001")
|
|
|
|
# Initialize vector-specific attributes
|
|
self.master_embeddings = {}
|
|
self.embeddings_cache_path = Path("embeddings_cache")
|
|
self.embeddings_cache_path.mkdir(exist_ok=True)
|
|
|
|
print(f"Vector mode initialized: Using Google Vertex AI multimodalembedding@001")
|
|
|
|
analysis_method = "vector similarity" if self.vector_mode else "local inlier analysis"
|
|
print(f"Hybrid detector initialized:")
|
|
print(f" Panel threshold: {panel_threshold} (≤{panel_threshold} panels → {analysis_method})")
|
|
print(f" Inlier threshold: {inlier_threshold}")
|
|
print(f" Vector mode: {self.vector_mode}")
|
|
if self.vector_mode:
|
|
print(f" Similarity threshold: {similarity_threshold}")
|
|
print(f" Greyscale processing: {enable_greyscale}")
|
|
print(f" Contrast enhancement: {enable_contrast_enhancement}")
|
|
print(f" Worker configuration: OpenAI={openai_workers or 'auto'}, Local={local_workers or 'auto'}")
|
|
|
|
def _configure_worker_counts(self):
|
|
"""Configure optimal worker counts based on system and dataset"""
|
|
import os
|
|
|
|
# Auto-detect OpenAI workers (number of master images for optimal parallelization)
|
|
if self._openai_workers_config is None:
|
|
self.openai_workers = len(self.master_images)
|
|
print(f" OpenAI workers auto-detected: {self.openai_workers} (total masters)")
|
|
else:
|
|
self.openai_workers = self._openai_workers_config
|
|
print(f" OpenAI workers configured: {self.openai_workers}")
|
|
|
|
# Auto-detect local workers (number of CPU cores minus 2 for optimal performance)
|
|
if self._local_workers_config is None:
|
|
self.local_workers = max(1, os.cpu_count() - 2)
|
|
print(f" Local workers auto-detected: {self.local_workers} (CPU cores - 2)")
|
|
else:
|
|
self.local_workers = self._local_workers_config
|
|
print(f" Local workers configured: {self.local_workers}")
|
|
|
|
# Auto-detect layout workers for parallel processing
|
|
if self.parallel_layouts:
|
|
if self._layout_workers_config is None:
|
|
# Conservative default: min(4, CPU cores / 2) to avoid memory issues
|
|
self.layout_workers = min(4, max(1, os.cpu_count() // 2))
|
|
print(f" Layout workers auto-detected: {self.layout_workers} (conservative: min(4, CPU cores / 2))")
|
|
else:
|
|
self.layout_workers = self._layout_workers_config
|
|
print(f" Layout workers configured: {self.layout_workers}")
|
|
|
|
# Set max concurrent layouts if not specified
|
|
if self.max_concurrent_layouts is None:
|
|
self.max_concurrent_layouts = self.layout_workers
|
|
print(f" Max concurrent layouts: {self.max_concurrent_layouts} (same as layout workers)")
|
|
else:
|
|
self.layout_workers = 1 # Sequential processing
|
|
self.max_concurrent_layouts = 1
|
|
|
|
# Update the parent class max_concurrent_workers for OpenAI operations
|
|
self.max_concurrent_workers = self.openai_workers
|
|
|
|
def generate_image_embedding(self, image_path: str) -> Optional[np.ndarray]:
|
|
"""Generate 1408-dimensional embedding for an image using Vertex AI"""
|
|
if not self.vector_mode:
|
|
raise ValueError("Vector mode must be enabled to generate embeddings")
|
|
|
|
try:
|
|
# Create Vertex AI Image object directly from file path
|
|
vertex_image = VertexImage.load_from_file(image_path)
|
|
|
|
# Get embedding from Vertex AI
|
|
response = self.embedding_model.get_embeddings(image=vertex_image)
|
|
|
|
# Extract the embedding vector (1408 dimensions)
|
|
embedding = np.array(response.image_embedding)
|
|
|
|
return embedding
|
|
|
|
except Exception as e:
|
|
print(f"Error generating embedding for {Path(image_path).name}: {e}")
|
|
return None
|
|
|
|
def compute_cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
|
|
"""Compute cosine similarity between two embeddings"""
|
|
# Normalize the embeddings
|
|
norm1 = np.linalg.norm(embedding1)
|
|
norm2 = np.linalg.norm(embedding2)
|
|
|
|
if norm1 == 0 or norm2 == 0:
|
|
return 0.0
|
|
|
|
# Compute cosine similarity
|
|
similarity = np.dot(embedding1, embedding2) / (norm1 * norm2)
|
|
return float(similarity)
|
|
|
|
def save_embedding_cache(self, embeddings: Dict, filename: str):
|
|
"""Save embeddings to cache file"""
|
|
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
|
|
with open(cache_file, 'wb') as f:
|
|
pickle.dump(embeddings, f)
|
|
print(f"Embeddings cached to: {cache_file}")
|
|
|
|
def load_embedding_cache(self, filename: str) -> Optional[Dict]:
|
|
"""Load embeddings from cache file"""
|
|
cache_file = self.embeddings_cache_path / f"{filename}.pkl"
|
|
if cache_file.exists():
|
|
try:
|
|
with open(cache_file, 'rb') as f:
|
|
embeddings = pickle.load(f)
|
|
print(f"Loaded cached embeddings from: {cache_file}")
|
|
return embeddings
|
|
except Exception as e:
|
|
print(f"Error loading cached embeddings: {e}")
|
|
return None
|
|
|
|
def generate_master_embeddings(self, force_regenerate=False) -> Dict[str, np.ndarray]:
|
|
"""Generate embeddings for all master images (with caching)"""
|
|
if not self.vector_mode:
|
|
raise ValueError("Vector mode must be enabled to generate master embeddings")
|
|
|
|
cache_filename = "master_embeddings"
|
|
|
|
# Try to load from cache first
|
|
if not force_regenerate:
|
|
cached_embeddings = self.load_embedding_cache(cache_filename)
|
|
if cached_embeddings is not None:
|
|
# Verify all master images are in cache
|
|
if set(cached_embeddings.keys()) == set(self.master_images.keys()):
|
|
self.master_embeddings = cached_embeddings
|
|
print(f"✓ Using cached embeddings for {len(cached_embeddings)} master images")
|
|
return self.master_embeddings
|
|
else:
|
|
print("Cache incomplete, regenerating embeddings...")
|
|
|
|
print(f"Generating embeddings for {len(self.master_images)} master images...")
|
|
self.master_embeddings = {}
|
|
|
|
for i, (master_id, image_path) in enumerate(self.master_images.items(), 1):
|
|
print(f" {i}/{len(self.master_images)}: Generating embedding for {master_id}")
|
|
|
|
embedding = self.generate_image_embedding(image_path)
|
|
if embedding is not None:
|
|
self.master_embeddings[master_id] = embedding
|
|
|
|
# Small delay to avoid rate limiting
|
|
if i < len(self.master_images):
|
|
time.sleep(0.1)
|
|
|
|
# Cache the embeddings
|
|
if self.master_embeddings:
|
|
self.save_embedding_cache(self.master_embeddings, cache_filename)
|
|
|
|
print(f"✓ Generated embeddings for {len(self.master_embeddings)} master images")
|
|
return self.master_embeddings
|
|
|
|
def detect_images_in_layout_hybrid(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
|
|
"""
|
|
Hybrid detection method that routes to appropriate detection based on panel count
|
|
"""
|
|
layout_name = Path(layout_path).name
|
|
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Hybrid mode)")
|
|
|
|
try:
|
|
# Step 1: Count panels and detect censorship using OpenAI O3 (consolidated call)
|
|
print(f" Step 1: Analyzing panels and censorship using OpenAI O3...")
|
|
combined_result = self.count_panels_and_detect_censorship(layout_path)
|
|
panel_count = combined_result.get('panel_count', 1)
|
|
panel_confidence = combined_result.get('panel_confidence', 'unknown')
|
|
is_censored = combined_result.get('is_censored', True)
|
|
censorship_confidence = combined_result.get('censorship_confidence', 'unknown')
|
|
|
|
print(f" Panel analysis: {panel_count} panels detected (confidence: {panel_confidence})")
|
|
print(f" Censorship analysis: {'CENSORED' if is_censored else 'UNCENSORED'} (confidence: {censorship_confidence})")
|
|
|
|
# Step 2: Route to appropriate detection method
|
|
analysis_method = "vector similarity" if self.vector_mode else "local inlier analysis"
|
|
if panel_count <= self.panel_threshold:
|
|
print(f" Step 2: Using {analysis_method} (≤{self.panel_threshold} panels)")
|
|
detection_result = self.detect_with_local_inlier_analysis(layout_path, layout_name)
|
|
detection_result['detection_method'] = 'vector_similarity' if self.vector_mode else 'local_inlier_analysis'
|
|
detection_result['api_calls_used'] = 1 # Only consolidated panel counting + censorship
|
|
else:
|
|
print(f" Step 2: Using split method + {analysis_method} (≥{self.panel_threshold + 1} panels)")
|
|
detection_result = self.detect_with_split_and_inlier_analysis(layout_path, layout_name, panel_count)
|
|
detection_result['detection_method'] = 'split_and_vector_similarity' if self.vector_mode else 'split_and_inlier_analysis'
|
|
detection_result['api_calls_used'] = 1 # Only consolidated panel counting + censorship
|
|
|
|
# Step 3: Apply CEN refinement if enabled and we have CEN matches
|
|
current_masters = detection_result.get('detected_masters', [])
|
|
if self.refinement_mode and current_masters:
|
|
cen_images = [mid for mid in current_masters if self.is_cen_image(mid)]
|
|
if cen_images:
|
|
print(f" Step 3: Applying CEN refinement using stored censorship analysis...")
|
|
cen_result = self.apply_cen_refinement_with_stored_analysis(detection_result, is_censored, censorship_confidence)
|
|
detection_result.update(cen_result)
|
|
print(f" CEN refinement completed")
|
|
|
|
# Step 4: Deduplicate and truncate matches if more than detected panels (using inlier scores)
|
|
current_masters = detection_result.get('detected_masters', [])
|
|
inlier_scores = detection_result.get('inlier_scores', {})
|
|
|
|
# First, deduplicate the results to remove exact duplicates
|
|
original_count = len(current_masters)
|
|
current_masters = self.deduplicate_master_matches(current_masters)
|
|
if len(current_masters) != original_count:
|
|
duplicates_removed = original_count - len(current_masters)
|
|
print(f" Step 4a: Removed {duplicates_removed} duplicate master(s)")
|
|
# Update the detection result with deduplicated masters
|
|
detection_result['detected_masters'] = current_masters
|
|
detection_result['detected_master_ids'] = current_masters
|
|
detection_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in current_masters]
|
|
|
|
if len(current_masters) > panel_count and inlier_scores and not self.no_truncation:
|
|
print(f" Step 4b: Truncating {len(current_masters)} matches to top {panel_count} by inlier score...")
|
|
|
|
# Sort masters by inlier score (descending)
|
|
sorted_masters = sorted(current_masters,
|
|
key=lambda mid: inlier_scores.get(mid, 0),
|
|
reverse=True)
|
|
|
|
# Keep only top N matches
|
|
truncated_masters = sorted_masters[:panel_count]
|
|
removed_masters = sorted_masters[panel_count:]
|
|
|
|
detection_result['detected_masters'] = truncated_masters
|
|
detection_result['detected_master_ids'] = truncated_masters
|
|
detection_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in truncated_masters]
|
|
detection_result['truncation_applied'] = True
|
|
detection_result['original_match_count'] = len(current_masters)
|
|
detection_result['final_match_count'] = len(truncated_masters)
|
|
detection_result['removed_masters'] = removed_masters
|
|
detection_result['removed_count'] = len(removed_masters)
|
|
|
|
print(f" Truncated to {len(truncated_masters)} matches (removed {len(removed_masters)} lower-scoring matches)")
|
|
elif len(current_masters) > panel_count and self.no_truncation:
|
|
print(f" Step 4b: Truncation disabled - keeping all {len(current_masters)} matches")
|
|
detection_result['truncation_applied'] = False
|
|
detection_result['original_match_count'] = len(current_masters)
|
|
detection_result['final_match_count'] = len(current_masters)
|
|
|
|
# Step 5: Add panel analysis and censorship information
|
|
detection_result['panel_analysis'] = {
|
|
'panel_count': panel_count,
|
|
'confidence': panel_confidence,
|
|
'analysis': combined_result.get('panel_analysis', ''),
|
|
'panel_descriptions': combined_result.get('panel_descriptions', [])
|
|
}
|
|
detection_result['censorship_analysis'] = {
|
|
'is_censored': is_censored,
|
|
'confidence': censorship_confidence,
|
|
'analysis': combined_result.get('censorship_analysis', ''),
|
|
'coverage_details': combined_result.get('coverage_details', '')
|
|
}
|
|
detection_result['panel_count'] = panel_count
|
|
detection_result['panel_threshold'] = self.panel_threshold
|
|
detection_result['processing_mode'] = 'hybrid'
|
|
detection_result['layout_path'] = layout_path
|
|
|
|
# Step 6: Calculate confidence score based on matches vs panels ratio
|
|
final_matches = len(detection_result.get('detected_masters', []))
|
|
confidence_percentage = self.calculate_confidence_score(final_matches, panel_count)
|
|
detection_result['confidence_score'] = confidence_percentage
|
|
|
|
detected_count = len(detection_result.get('detected_masters', []))
|
|
base_method = "vector similarity" if self.vector_mode else "local analysis"
|
|
method_name = base_method if panel_count <= self.panel_threshold else f"split + {base_method}"
|
|
print(f" Step 6: Confidence score calculated: {confidence_percentage:.1f}% ({final_matches} matches / {panel_count} panels)")
|
|
|
|
# Step 7: Apply fallback to OpenAI one-at-a-time if enabled and needed
|
|
if self.fallback_one_at_a_time and final_matches < panel_count:
|
|
print(f" Step 7: Fallback triggered - {final_matches} matched masters < {panel_count} detected panels")
|
|
print(f" Running OpenAI one-at-a-time method with {len(self.master_images)} workers...")
|
|
print(f" DEBUG: Available master images: {len(self.master_images)}")
|
|
|
|
# Create a temporary OpenAI detector for one-at-a-time processing
|
|
try:
|
|
temp_detector = OpenAIImageDetector(
|
|
enable_greyscale=self.enable_greyscale,
|
|
enable_contrast_enhancement=self.enable_contrast_enhancement,
|
|
contrast_factor=self.contrast_factor,
|
|
refinement_mode=self.refinement_mode,
|
|
one_at_a_time_mode=True,
|
|
max_concurrent_workers=len(self.master_images) # Use number of masters as worker count
|
|
)
|
|
|
|
# Manually set the master images since the constructor might not load them
|
|
temp_detector.master_images = self.master_images.copy()
|
|
print(f" DEBUG: Copied {len(self.master_images)} master images to temp detector")
|
|
|
|
except Exception as e:
|
|
print(f" ERROR: Failed to create temporary OpenAI detector: {e}")
|
|
print(f" Fallback cancelled due to detector creation failure")
|
|
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
|
|
return detection_result
|
|
|
|
print(f" DEBUG: Temporary detector created with {temp_detector.max_concurrent_workers} workers")
|
|
print(f" DEBUG: Temp detector has {len(temp_detector.master_images)} master images")
|
|
print(f" DEBUG: Temp detector API key present: {temp_detector.api_key is not None}")
|
|
print(f" DEBUG: Temp detector one-at-a-time mode: {temp_detector.one_at_a_time_mode}")
|
|
print(f" DEBUG: Calling detect_images_in_layout_one_at_a_time...")
|
|
|
|
# Run OpenAI one-at-a-time detection with stored censorship data
|
|
try:
|
|
stored_censorship_data = {
|
|
'is_censored': is_censored,
|
|
'confidence': censorship_confidence
|
|
}
|
|
fallback_result = temp_detector.detect_images_in_layout_one_at_a_time(layout_path, layout_index, total_layouts, stored_censorship_data)
|
|
print(f" DEBUG: OpenAI one-at-a-time call completed successfully")
|
|
except Exception as e:
|
|
print(f" ERROR: OpenAI one-at-a-time call failed: {e}")
|
|
print(f" Fallback cancelled due to OpenAI call failure")
|
|
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
|
|
return detection_result
|
|
|
|
print(f" DEBUG: Fallback result type: {type(fallback_result)}")
|
|
print(f" DEBUG: Fallback result keys: {list(fallback_result.keys()) if fallback_result else 'None'}")
|
|
|
|
if fallback_result:
|
|
detected_masters = fallback_result.get('detected_masters', [])
|
|
print(f" DEBUG: Raw detected masters: {detected_masters}")
|
|
print(f" DEBUG: Number of raw detected masters: {len(detected_masters)}")
|
|
|
|
if detected_masters:
|
|
print(f" Fallback successful - Found {len(detected_masters)} matches")
|
|
|
|
# Apply CEN refinement if enabled
|
|
if self.refinement_mode:
|
|
print(f" DEBUG: Applying CEN refinement...")
|
|
fallback_result = self.apply_cen_refinement_with_stored_analysis(fallback_result, is_censored, censorship_confidence)
|
|
print(f" DEBUG: After CEN refinement: {fallback_result.get('detected_masters', [])}")
|
|
|
|
# Apply deduplication
|
|
print(f" DEBUG: Applying deduplication...")
|
|
original_count = len(fallback_result['detected_masters'])
|
|
fallback_result['detected_masters'] = self.deduplicate_master_matches(fallback_result['detected_masters'])
|
|
fallback_result['detected_master_ids'] = fallback_result['detected_masters']
|
|
fallback_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in fallback_result['detected_masters']]
|
|
deduplicated_count = len(fallback_result['detected_masters'])
|
|
print(f" DEBUG: Deduplication: {original_count} -> {deduplicated_count}")
|
|
|
|
# Update metadata to reflect fallback usage
|
|
fallback_result['fallback_applied'] = True
|
|
fallback_result['original_detection_method'] = detection_result.get('detection_method', 'unknown')
|
|
fallback_result['original_match_count'] = final_matches
|
|
fallback_result['fallback_match_count'] = len(fallback_result['detected_masters'])
|
|
fallback_result['detection_method'] = 'openai_one_at_a_time_fallback'
|
|
fallback_result['processing_mode'] = 'hybrid_with_fallback'
|
|
|
|
# Preserve original analysis data
|
|
fallback_result['panel_analysis'] = detection_result['panel_analysis']
|
|
fallback_result['censorship_analysis'] = detection_result['censorship_analysis']
|
|
fallback_result['panel_count'] = panel_count
|
|
fallback_result['panel_threshold'] = self.panel_threshold
|
|
fallback_result['layout_path'] = layout_path
|
|
|
|
# Recalculate confidence score
|
|
fallback_matches = len(fallback_result['detected_masters'])
|
|
fallback_result['confidence_score'] = self.calculate_confidence_score(fallback_matches, panel_count)
|
|
|
|
# Update API call count to include fallback calls
|
|
base_api_calls = 1 # Initial hybrid analysis call
|
|
fallback_api_calls = fallback_result.get('api_calls_made', 0)
|
|
fallback_result['api_calls_used'] = base_api_calls + fallback_api_calls
|
|
|
|
print(f"✓ Fallback completed {layout_name} - Found {fallback_matches} matches using OpenAI one-at-a-time")
|
|
return fallback_result
|
|
else:
|
|
print(f" DEBUG: No detected masters in fallback result")
|
|
else:
|
|
print(f" DEBUG: Fallback result is None or empty")
|
|
|
|
print(f" Fallback failed - No additional matches found, keeping original results")
|
|
|
|
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
|
|
return detection_result
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in hybrid analysis for {layout_name}: {e}"
|
|
print(error_msg)
|
|
return {
|
|
'detected_masters': [],
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': f'Hybrid analysis failed: {error_msg}',
|
|
'error': str(e),
|
|
'processing_mode': 'hybrid_error',
|
|
'detection_method': 'error',
|
|
'api_calls_used': 0
|
|
}
|
|
|
|
def detect_with_local_inlier_analysis(self, layout_path: str, layout_name: str) -> Dict:
|
|
"""
|
|
Detect masters using local analysis (inlier analysis or vector similarity based on vector_mode)
|
|
"""
|
|
if self.vector_mode:
|
|
return self.detect_with_vector_similarity(layout_path, layout_name)
|
|
else:
|
|
return self.detect_with_inlier_analysis(layout_path, layout_name)
|
|
|
|
def detect_with_vector_similarity(self, layout_path: str, layout_name: str) -> Dict:
|
|
"""
|
|
Detect masters using vector similarity search
|
|
"""
|
|
print(f" → Analyzing {layout_name} with vector similarity (cosine similarity on embeddings)...")
|
|
|
|
try:
|
|
# Generate embedding for layout image
|
|
layout_embedding = self.generate_image_embedding(layout_path)
|
|
|
|
if layout_embedding is None:
|
|
raise Exception("Failed to generate layout embedding")
|
|
|
|
# Compare with all master embeddings
|
|
similarities = {}
|
|
detected_masters = []
|
|
|
|
print(f" → Comparing against {len(self.master_embeddings)} master embeddings...")
|
|
for master_id, master_embedding in self.master_embeddings.items():
|
|
similarity = self.compute_cosine_similarity(layout_embedding, master_embedding)
|
|
similarities[master_id] = similarity
|
|
|
|
if similarity >= self.similarity_threshold:
|
|
detected_masters.append(master_id)
|
|
|
|
# Sort detected masters by similarity (highest first)
|
|
detected_masters.sort(key=lambda x: similarities[x], reverse=True)
|
|
|
|
# Get top similarities for analysis
|
|
top_similarities = sorted(similarities.items(), key=lambda x: x[1], reverse=True)[:5]
|
|
|
|
# Create analysis text
|
|
analysis_parts = [
|
|
f"Vector similarity analysis using Google Vertex AI embeddings (1408 dimensions).",
|
|
f"Similarity threshold: {self.similarity_threshold}",
|
|
f"Found {len(detected_masters)} matches above threshold.",
|
|
f"Top 5 similarities: " + ", ".join([f"{mid}({sim:.3f})" for mid, sim in top_similarities])
|
|
]
|
|
|
|
# Apply deduplication (for consistency with inlier analysis)
|
|
original_detected = detected_masters[:]
|
|
detected_masters = self.deduplicate_master_matches(detected_masters)
|
|
|
|
if len(detected_masters) != len(original_detected):
|
|
duplicates_removed = len(original_detected) - len(detected_masters)
|
|
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
|
|
|
|
analysis = " ".join(analysis_parts)
|
|
|
|
print(f" → Vector similarity analysis completed: {len(detected_masters)} matches found")
|
|
|
|
# Create similarity scores dict for potential truncation
|
|
similarity_scores = {mid: similarities[mid] for mid in detected_masters}
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
|
|
'analysis': analysis,
|
|
'processing_mode': 'vector_similarity',
|
|
'total_masters_checked': len(self.master_embeddings),
|
|
'confident_matches': len(detected_masters),
|
|
'similarity_threshold': self.similarity_threshold,
|
|
'deduplication_applied': len(detected_masters) != len(original_detected),
|
|
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
|
|
'original_detected_masters': original_detected,
|
|
'inlier_scores': similarity_scores, # Using similarity scores for truncation compatibility
|
|
'top_similarities': dict(top_similarities)
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in vector similarity analysis for {layout_name}: {e}"
|
|
print(f" → {error_msg}")
|
|
return {
|
|
'detected_masters': [],
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': f'Vector similarity analysis failed: {error_msg}',
|
|
'error': str(e),
|
|
'processing_mode': 'vector_similarity_error'
|
|
}
|
|
|
|
def detect_with_inlier_analysis(self, layout_path: str, layout_name: str) -> Dict:
|
|
"""
|
|
Detect masters using local OpenCV-based inlier analysis with multiprocessing
|
|
Memory-safe version with dynamic worker adjustment
|
|
"""
|
|
print(f" → Analyzing {layout_name} with local inlier analysis using {self.local_workers} processes...")
|
|
|
|
try:
|
|
# Check memory before starting
|
|
if not self.memory_manager.is_memory_safe():
|
|
print(" → Memory usage high, waiting for safe levels...")
|
|
if not self.memory_manager.wait_for_memory_safe():
|
|
raise Exception("Memory usage too high to safely process")
|
|
|
|
# Load layout image in grayscale for feature detection (for initial check)
|
|
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
|
|
if layout_img is None:
|
|
raise Exception(f"Could not load layout image: {layout_path}")
|
|
|
|
# Detect features in layout image for reporting
|
|
layout_kp, layout_desc = self.akaze.detectAndCompute(layout_img, None)
|
|
if layout_desc is None:
|
|
raise Exception("No features detected in layout image")
|
|
|
|
feature_count = len(layout_kp)
|
|
print(f" → Detected {feature_count} features in layout")
|
|
|
|
# Dynamically adjust worker count based on feature count and memory
|
|
if feature_count > 50000:
|
|
# High feature count - reduce parallelism
|
|
safe_workers = max(1, self.local_workers // 2)
|
|
print(f" → High feature count ({feature_count}), reducing workers to {safe_workers}")
|
|
elif feature_count > 30000:
|
|
safe_workers = max(1, int(self.local_workers * 0.75))
|
|
print(f" → Medium feature count ({feature_count}), reducing workers to {safe_workers}")
|
|
else:
|
|
safe_workers = self.local_workers
|
|
|
|
# Further limit based on available memory
|
|
safe_workers = min(safe_workers, self.memory_manager.limit_concurrent_processes(safe_workers))
|
|
|
|
# Prepare tasks for multiprocessing
|
|
total_masters = len(self.master_images)
|
|
tasks = []
|
|
|
|
# Add max_features parameter to prevent memory explosion
|
|
max_features = min(10000, feature_count // 2) if feature_count > 20000 else 10000
|
|
|
|
for master_id, master_path in self.master_images.items():
|
|
tasks.append((layout_path, master_id, master_path, self.min_good_matches, max_features))
|
|
|
|
print(f" → Processing {total_masters} masters using {safe_workers} concurrent processes...")
|
|
print(f" → Feature limit per master: {max_features}")
|
|
|
|
# Process masters in parallel
|
|
master_results = []
|
|
start_time = time.time()
|
|
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=safe_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_master = {
|
|
executor.submit(process_single_master_inlier_analysis, *task_args): task_args[1]
|
|
for task_args in tasks
|
|
}
|
|
|
|
completed_count = 0
|
|
# Collect results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_master):
|
|
master_id = future_to_master[future]
|
|
completed_count += 1
|
|
|
|
# Progress update and memory monitoring every 10 completed masters
|
|
if completed_count % 10 == 0:
|
|
usage = self.memory_manager.get_memory_usage()
|
|
print(f" → Completed {completed_count}/{total_masters} masters... Memory: {usage['memory_percent']:.1f}%")
|
|
|
|
# If memory is getting high, warn and potentially pause
|
|
if usage['memory_percent'] > 80:
|
|
print(f" → WARNING: High memory usage {usage['memory_percent']:.1f}%, monitoring closely...")
|
|
if usage['swap_percent'] > 20:
|
|
print(f" → WARNING: Swap usage {usage['swap_percent']:.1f}%, may slow down...")
|
|
|
|
try:
|
|
result = future.result()
|
|
|
|
# Add to results if above threshold
|
|
inliers = result.get('inliers', 0)
|
|
confidence = result.get('confidence', 'low')
|
|
|
|
if inliers > 0 and confidence != 'low':
|
|
master_results.append({
|
|
'master_id': master_id,
|
|
'inliers': inliers,
|
|
'confidence': confidence,
|
|
'details': result
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f" → Error processing {master_id}: {e}")
|
|
|
|
|
|
# Sort by inlier count (descending)
|
|
master_results.sort(key=lambda x: x['inliers'], reverse=True)
|
|
|
|
# Apply thresholds and select confident matches
|
|
detected_masters = []
|
|
inlier_scores = {}
|
|
analysis_parts = []
|
|
|
|
if master_results:
|
|
# Get the best match for reference
|
|
best_match = master_results[0]
|
|
best_inliers = best_match['inliers']
|
|
|
|
# Apply absolute and relative thresholds
|
|
min_inliers = max(self.min_good_matches, best_inliers * self.inlier_ratio_threshold, int(best_inliers * self.inlier_threshold))
|
|
|
|
for result in master_results:
|
|
inliers = result['inliers']
|
|
confidence = result['confidence']
|
|
master_id = result['master_id']
|
|
|
|
# Store inlier score for all results (for potential truncation)
|
|
inlier_scores[master_id] = inliers
|
|
|
|
# Check if this match meets our criteria
|
|
if (inliers >= min_inliers and
|
|
confidence in ['high', 'medium'] and
|
|
inliers >= self.min_good_matches):
|
|
detected_masters.append(master_id)
|
|
|
|
# Create analysis text
|
|
analysis_parts = [
|
|
f"Multiprocessing local inlier analysis using OpenCV AKAZE features ({self.local_workers} processes).",
|
|
f"Processed {total_masters} master images against {len(layout_kp)} layout features.",
|
|
f"Found {len(master_results)} potential matches, {len(detected_masters)} above threshold.",
|
|
f"Best match: {best_match['master_id']} ({best_inliers} inliers, {best_match['confidence']} confidence)"
|
|
]
|
|
else:
|
|
analysis_parts = [
|
|
f"Multiprocessing local inlier analysis using OpenCV AKAZE features ({self.local_workers} processes).",
|
|
f"Processed {total_masters} master images against {len(layout_kp)} layout features.",
|
|
f"No confident matches found above threshold."
|
|
]
|
|
|
|
# Apply deduplication (shouldn't be needed for local analysis, but for safety)
|
|
original_detected = detected_masters[:]
|
|
detected_masters = self.deduplicate_master_matches(detected_masters)
|
|
|
|
if len(detected_masters) != len(original_detected):
|
|
duplicates_removed = len(original_detected) - len(detected_masters)
|
|
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
|
|
|
|
analysis = " ".join(analysis_parts)
|
|
|
|
print(f" → Local analysis completed: {len(detected_masters)} matches found using {safe_workers} processes")
|
|
|
|
# Force garbage collection to free memory
|
|
import gc
|
|
gc.collect()
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
|
|
'analysis': analysis,
|
|
'processing_mode': 'local_inlier_analysis_multiprocess',
|
|
'total_masters_checked': total_masters,
|
|
'potential_matches_found': len(master_results),
|
|
'confident_matches': len(detected_masters),
|
|
'inlier_threshold': min_inliers if master_results else self.min_good_matches,
|
|
'master_analysis_results': master_results[:10], # Top 10 for debugging
|
|
'concurrent_workers': self.local_workers,
|
|
'deduplication_applied': len(detected_masters) != len(original_detected),
|
|
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
|
|
'original_detected_masters': original_detected,
|
|
'inlier_scores': inlier_scores # Track inlier scores for truncation
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in local inlier analysis for {layout_name}: {e}"
|
|
print(f" → {error_msg}")
|
|
return {
|
|
'detected_masters': [],
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': f'Local inlier analysis failed: {error_msg}',
|
|
'error': str(e),
|
|
'processing_mode': 'local_inlier_analysis_error'
|
|
}
|
|
|
|
def detect_with_split_and_inlier_analysis(self, layout_path: str, layout_name: str, panel_count: int) -> Dict:
|
|
"""
|
|
Detect masters using split method followed by local analysis (inlier analysis or vector similarity based on vector_mode)
|
|
"""
|
|
if self.vector_mode:
|
|
return self.detect_with_split_and_vector_similarity(layout_path, layout_name, panel_count)
|
|
else:
|
|
return self.detect_with_split_and_inlier_analysis_opencv(layout_path, layout_name, panel_count)
|
|
|
|
def detect_with_split_and_vector_similarity(self, layout_path: str, layout_name: str, panel_count: int) -> Dict:
|
|
"""
|
|
Detect masters using split method followed by vector similarity analysis on individual panels
|
|
"""
|
|
print(f" → Analyzing {layout_name} with split method + vector similarity...")
|
|
|
|
try:
|
|
# Initialize panel splitter if not already done
|
|
if not hasattr(self, 'splitter'):
|
|
if self.split_simple:
|
|
self.splitter = SimplePanelSplitter(debug=False)
|
|
elif self.split_advanced:
|
|
self.splitter = AdvancedPanelSplitter(
|
|
percentile=self.percentile,
|
|
min_gap=self.min_gap,
|
|
debug=False
|
|
)
|
|
else:
|
|
self.splitter = PanelSplitter()
|
|
|
|
# Step 1: Split layout into individual panels using already-known panel count
|
|
print(f" → Splitting layout into {panel_count} panels (using pre-analyzed count)...")
|
|
split_panels = self.splitter.split_panels(layout_path, panel_count)
|
|
|
|
if not split_panels:
|
|
raise Exception("Panel splitting failed: No panels generated")
|
|
|
|
splits_generated = len(split_panels)
|
|
print(f" → Successfully split into {splits_generated} individual panel images")
|
|
|
|
# Step 2: Apply vector similarity analysis to each split panel
|
|
print(f" → Analyzing {splits_generated} split panels using vector similarity...")
|
|
|
|
all_results = []
|
|
split_panel_paths = []
|
|
|
|
for i, split_info in enumerate(split_panels):
|
|
# Save split panel to temporary file for analysis
|
|
split_path = f"/tmp/split_panel_{i}_{os.path.basename(layout_path)}"
|
|
cv2.imwrite(split_path, split_info['image'])
|
|
split_panel_paths.append(split_path)
|
|
|
|
print(f" → Processing split panel {i+1}/{splits_generated}...")
|
|
|
|
# Generate embedding for this split panel
|
|
split_embedding = self.generate_image_embedding(split_path)
|
|
|
|
if split_embedding is None:
|
|
print(f" → Failed to generate embedding for split panel {i+1}")
|
|
continue
|
|
|
|
# Compare split panel against all master embeddings
|
|
for master_id, master_embedding in self.master_embeddings.items():
|
|
similarity = self.compute_cosine_similarity(split_embedding, master_embedding)
|
|
|
|
if similarity >= self.similarity_threshold:
|
|
all_results.append({
|
|
'master_id': master_id,
|
|
'similarity': similarity,
|
|
'confidence': 'high' if similarity >= 0.9 else 'medium',
|
|
'split_panel': i,
|
|
'split_path': split_path
|
|
})
|
|
|
|
# Clean up temporary split panel files
|
|
for split_path in split_panel_paths:
|
|
try:
|
|
os.remove(split_path)
|
|
except:
|
|
pass
|
|
|
|
# Step 3: Process results and find best matches
|
|
detected_masters = []
|
|
similarity_scores = {}
|
|
|
|
if all_results:
|
|
# Sort by similarity (descending)
|
|
all_results.sort(key=lambda x: x['similarity'], reverse=True)
|
|
|
|
# Track similarity scores for all results (for potential truncation)
|
|
for result in all_results:
|
|
master_id = result['master_id']
|
|
similarity = result['similarity']
|
|
|
|
# Keep the highest similarity score for each master (in case of multiple panel matches)
|
|
if master_id not in similarity_scores or similarity > similarity_scores[master_id]:
|
|
similarity_scores[master_id] = similarity
|
|
|
|
# Add to detected masters if not already present
|
|
if master_id not in detected_masters:
|
|
detected_masters.append(master_id)
|
|
|
|
# Apply deduplication
|
|
original_detected = detected_masters[:]
|
|
detected_masters = self.deduplicate_master_matches(detected_masters)
|
|
|
|
if self.split_simple:
|
|
splitter_type = "SimplePanelSplitter (even division)"
|
|
elif self.split_advanced:
|
|
splitter_type = "AdvancedPanelSplitter (edge detection)"
|
|
else:
|
|
splitter_type = "PanelSplitter (multiple CV methods)"
|
|
|
|
analysis_parts = [
|
|
f"Multi-panel layout processed using split method + vector similarity analysis.",
|
|
f"Layout split into {splits_generated} individual panels using {splitter_type} (no additional API calls).",
|
|
f"Each panel analyzed against {len(self.master_embeddings)} master embeddings using Google Vertex AI (1408 dimensions).",
|
|
f"Similarity threshold: {self.similarity_threshold}",
|
|
f"Found {len(detected_masters)} matches after deduplication."
|
|
]
|
|
|
|
if len(detected_masters) != len(original_detected):
|
|
duplicates_removed = len(original_detected) - len(detected_masters)
|
|
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
|
|
|
|
analysis = " ".join(analysis_parts)
|
|
|
|
print(f" → Split + vector similarity analysis completed: {len(detected_masters)} matches found")
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
|
|
'analysis': analysis,
|
|
'processing_mode': 'split_and_vector_similarity',
|
|
'splits_generated': splits_generated,
|
|
'panel_count': panel_count,
|
|
'deduplication_applied': len(detected_masters) != len(original_detected),
|
|
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
|
|
'original_detected_masters': original_detected,
|
|
'total_combinations_processed': len(all_results),
|
|
'potential_matches_found': len(all_results),
|
|
'inlier_scores': similarity_scores, # Using similarity scores for truncation compatibility
|
|
'similarity_threshold': self.similarity_threshold
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in split + vector similarity analysis for {layout_name}: {e}"
|
|
print(f" → {error_msg}")
|
|
return {
|
|
'detected_masters': [],
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': f'Split + vector similarity analysis failed: {error_msg}',
|
|
'error': str(e),
|
|
'processing_mode': 'split_and_vector_similarity_error'
|
|
}
|
|
|
|
def detect_with_split_and_inlier_analysis_opencv(self, layout_path: str, layout_name: str, panel_count: int) -> Dict:
|
|
"""
|
|
Detect masters using split method followed by local inlier analysis on individual panels
|
|
"""
|
|
print(f" → Analyzing {layout_name} with split method + inlier analysis...")
|
|
|
|
try:
|
|
# Initialize panel splitter if not already done
|
|
if not hasattr(self, 'splitter'):
|
|
if self.split_simple:
|
|
self.splitter = SimplePanelSplitter(debug=False)
|
|
elif self.split_advanced:
|
|
self.splitter = AdvancedPanelSplitter(
|
|
percentile=self.percentile,
|
|
min_gap=self.min_gap,
|
|
debug=False
|
|
)
|
|
else:
|
|
self.splitter = PanelSplitter()
|
|
|
|
# Step 1: Split layout into individual panels using already-known panel count
|
|
print(f" → Splitting layout into {panel_count} panels (using pre-analyzed count)...")
|
|
split_panels = self.splitter.split_panels(layout_path, panel_count)
|
|
|
|
if not split_panels:
|
|
raise Exception("Panel splitting failed: No panels generated")
|
|
|
|
splits_generated = len(split_panels)
|
|
print(f" → Successfully split into {splits_generated} individual panel images")
|
|
|
|
# Step 2: Apply multiprocessing inlier analysis to each split panel
|
|
print(f" → Analyzing {splits_generated} split panels using {self.local_workers} concurrent processes...")
|
|
|
|
# Prepare tasks for multiprocessing - analyze each split panel against all masters
|
|
tasks = []
|
|
split_panel_paths = []
|
|
|
|
for i, split_info in enumerate(split_panels):
|
|
# Save split panel to temporary file for analysis
|
|
split_path = f"/tmp/split_panel_{i}_{os.path.basename(layout_path)}"
|
|
cv2.imwrite(split_path, split_info['image'])
|
|
split_panel_paths.append(split_path)
|
|
|
|
# Create tasks for this split panel against all masters
|
|
for master_id, master_path in self.master_images.items():
|
|
tasks.append((split_path, master_id, master_path, self.min_good_matches))
|
|
|
|
# Process all split panels against all masters in parallel
|
|
all_results = []
|
|
start_time = time.time()
|
|
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=self.local_workers) as executor:
|
|
future_to_task = {
|
|
executor.submit(process_single_master_inlier_analysis, *task_args): task_args
|
|
for task_args in tasks
|
|
}
|
|
|
|
completed_count = 0
|
|
for future in concurrent.futures.as_completed(future_to_task):
|
|
task_args = future_to_task[future]
|
|
completed_count += 1
|
|
|
|
# Progress update
|
|
if completed_count % 50 == 0:
|
|
total_tasks = len(tasks)
|
|
print(f" → Completed {completed_count}/{total_tasks} split-master combinations...")
|
|
|
|
try:
|
|
result = future.result()
|
|
if result.get('confidence') != 'low' and result.get('inliers', 0) > 0:
|
|
all_results.append(result)
|
|
except Exception as e:
|
|
print(f" → Error processing split task: {e}")
|
|
|
|
# Clean up temporary split panel files
|
|
for split_path in split_panel_paths:
|
|
try:
|
|
os.remove(split_path)
|
|
except:
|
|
pass
|
|
|
|
# Step 3: Process results and find best matches
|
|
detected_masters = []
|
|
inlier_scores = {}
|
|
if all_results:
|
|
# Sort by inlier count (descending)
|
|
all_results.sort(key=lambda x: x['inliers'], reverse=True)
|
|
|
|
# Apply thresholds
|
|
best_inliers = all_results[0]['inliers']
|
|
min_inliers = max(self.min_good_matches, best_inliers * self.inlier_ratio_threshold, int(best_inliers * self.inlier_threshold))
|
|
|
|
# Track inlier scores for all results (for potential truncation)
|
|
for result in all_results:
|
|
master_id = result['master_id']
|
|
inliers = result['inliers']
|
|
|
|
# Keep the highest inlier score for each master (in case of multiple panel matches)
|
|
if master_id not in inlier_scores or inliers > inlier_scores[master_id]:
|
|
inlier_scores[master_id] = inliers
|
|
|
|
if (inliers >= min_inliers and
|
|
result['confidence'] in ['high', 'medium'] and
|
|
inliers >= self.min_good_matches):
|
|
detected_masters.append(master_id)
|
|
|
|
# Apply deduplication
|
|
original_detected = detected_masters[:]
|
|
detected_masters = self.deduplicate_master_matches(detected_masters)
|
|
|
|
if self.split_simple:
|
|
splitter_type = "SimplePanelSplitter (even division)"
|
|
elif self.split_advanced:
|
|
splitter_type = "AdvancedPanelSplitter (edge detection)"
|
|
else:
|
|
splitter_type = "PanelSplitter (multiple CV methods)"
|
|
analysis_parts = [
|
|
f"Multi-panel layout processed using split method + local inlier analysis.",
|
|
f"Layout split into {splits_generated} individual panels using {splitter_type} (no additional API calls).",
|
|
f"Each panel analyzed against {len(self.master_images)} masters using OpenCV AKAZE features with multiprocessing ({self.local_workers} processes).",
|
|
f"Processed {len(tasks)} split-master combinations in parallel.",
|
|
f"Found {len(detected_masters)} matches after deduplication."
|
|
]
|
|
|
|
if len(detected_masters) != len(original_detected):
|
|
duplicates_removed = len(original_detected) - len(detected_masters)
|
|
analysis_parts.append(f"Removed {duplicates_removed} duplicate(s).")
|
|
|
|
analysis = " ".join(analysis_parts)
|
|
|
|
print(f" → Split + inlier analysis completed: {len(detected_masters)} matches found using {self.local_workers} processes")
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
|
|
'analysis': analysis,
|
|
'processing_mode': 'split_and_inlier_analysis',
|
|
'splits_generated': splits_generated,
|
|
'panel_count': panel_count,
|
|
'concurrent_workers': self.local_workers,
|
|
'deduplication_applied': len(detected_masters) != len(original_detected),
|
|
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
|
|
'original_detected_masters': original_detected,
|
|
'total_combinations_processed': len(tasks),
|
|
'potential_matches_found': len(all_results),
|
|
'inlier_scores': inlier_scores # Track inlier scores for truncation
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in split + inlier analysis for {layout_name}: {e}"
|
|
print(f" → {error_msg}")
|
|
return {
|
|
'detected_masters': [],
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': f'Split + inlier analysis failed: {error_msg}',
|
|
'error': str(e),
|
|
'processing_mode': 'split_and_inlier_analysis_error'
|
|
}
|
|
|
|
def process_all_layouts_hybrid(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
|
|
"""
|
|
Process all layout images using hybrid detection
|
|
"""
|
|
print("Starting hybrid batch processing...")
|
|
analysis_method = "vector similarity" if self.vector_mode else "local analysis"
|
|
print(f"Panel threshold: ≤{self.panel_threshold} panels → {analysis_method}, ≥{self.panel_threshold + 1} panels → split + {analysis_method}")
|
|
|
|
# Load master images
|
|
self.load_master_images()
|
|
|
|
# Generate master embeddings if vector mode is enabled
|
|
if self.vector_mode:
|
|
self.generate_master_embeddings()
|
|
|
|
if not self.master_embeddings:
|
|
raise Exception("No master embeddings available for vector mode")
|
|
|
|
# Configure worker counts now that we know the number of masters
|
|
self._configure_worker_counts()
|
|
|
|
# Get layout files
|
|
if specific_file:
|
|
# Process only the specific file
|
|
layout_files = [self.layouts_path / specific_file]
|
|
if not layout_files[0].exists():
|
|
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
|
|
print(f"Processing specific file: {specific_file}")
|
|
else:
|
|
layout_files = list(self.layouts_path.glob("*.jpg"))
|
|
|
|
if limit:
|
|
layout_files = layout_files[:limit]
|
|
print(f"Processing first {limit} layouts only")
|
|
|
|
total_layouts = len(layout_files)
|
|
print(f"Processing {total_layouts} layout images in hybrid mode")
|
|
print("=" * 60)
|
|
|
|
results = {}
|
|
start_time = time.time()
|
|
|
|
# Track hybrid statistics
|
|
local_analysis_count = 0
|
|
split_analysis_count = 0
|
|
total_api_calls = 0
|
|
truncation_count = 0
|
|
total_truncated_matches = 0
|
|
|
|
for i, layout_path in enumerate(layout_files, 1):
|
|
layout_id = layout_path.stem
|
|
|
|
# Detect images using hybrid method
|
|
result = self.detect_images_in_layout_hybrid(str(layout_path), i, total_layouts)
|
|
|
|
# Track statistics
|
|
detection_method = result.get('detection_method')
|
|
if detection_method in ['local_inlier_analysis', 'vector_similarity']:
|
|
local_analysis_count += 1
|
|
elif detection_method in ['split_and_inlier_analysis', 'split_and_vector_similarity']:
|
|
split_analysis_count += 1
|
|
|
|
total_api_calls += result.get('api_calls_used', 0)
|
|
|
|
if result.get('truncation_applied'):
|
|
truncation_count += 1
|
|
total_truncated_matches += result.get('removed_count', 0)
|
|
|
|
layout_result = {
|
|
'layout_filename': layout_path.name,
|
|
'detected_master_ids': result.get('detected_master_ids', []),
|
|
'detected_master_filenames': result.get('detected_master_filenames', []),
|
|
'analysis': result.get('analysis', ''),
|
|
'detection_method': result.get('detection_method', 'unknown'),
|
|
'panel_count': result.get('panel_count', 1),
|
|
'panel_threshold': self.panel_threshold,
|
|
'processing_mode': 'hybrid'
|
|
}
|
|
|
|
# Add method-specific fields
|
|
if 'panel_analysis' in result:
|
|
layout_result['panel_analysis'] = result['panel_analysis']
|
|
|
|
if 'total_masters_checked' in result:
|
|
layout_result['total_masters_checked'] = result['total_masters_checked']
|
|
|
|
if 'splits_generated' in result:
|
|
layout_result['splits_generated'] = result['splits_generated']
|
|
layout_result['split_results'] = result.get('split_results', [])
|
|
|
|
# Add deduplication fields if applied
|
|
if result.get('deduplication_applied'):
|
|
layout_result['deduplication_applied'] = result['deduplication_applied']
|
|
layout_result['duplicates_removed'] = result['duplicates_removed']
|
|
layout_result['original_detected_masters'] = result['original_detected_masters']
|
|
|
|
# Add refinement fields if applied
|
|
if result.get('refinement_applied'):
|
|
layout_result['refinement_applied'] = result['refinement_applied']
|
|
layout_result['refinement_details'] = result['refinement_details']
|
|
layout_result['censorship_analysis'] = result['censorship_analysis']
|
|
|
|
# Add truncation fields if applied
|
|
if result.get('truncation_applied'):
|
|
layout_result['truncation_applied'] = result['truncation_applied']
|
|
layout_result['original_match_count'] = result['original_match_count']
|
|
layout_result['final_match_count'] = result['final_match_count']
|
|
layout_result['removed_count'] = result['removed_count']
|
|
|
|
if 'error' in result:
|
|
layout_result['error'] = result['error']
|
|
|
|
# Add cost breakdown for this layout
|
|
cost_breakdown = cost_calculator.get_layout_cost_breakdown(layout_path.name)
|
|
if cost_breakdown:
|
|
layout_result['cost_breakdown'] = cost_breakdown
|
|
|
|
results[layout_id] = layout_result
|
|
|
|
# Progress update with time estimate
|
|
elapsed = time.time() - start_time
|
|
avg_time = elapsed / i
|
|
remaining = (total_layouts - i) * avg_time
|
|
|
|
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
|
|
|
|
# Save progress periodically
|
|
if i % 20 == 0:
|
|
self.save_results(results, f"hybrid_progress_{i}")
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Print hybrid statistics
|
|
print(f"\n{'='*60}")
|
|
print("HYBRID PROCESSING STATISTICS")
|
|
print(f"{'='*60}")
|
|
print(f"Total layouts processed: {total_layouts}")
|
|
print(f"Local analysis used: {local_analysis_count} ({local_analysis_count/total_layouts*100:.1f}%)")
|
|
print(f"Split + inlier analysis used: {split_analysis_count} ({split_analysis_count/total_layouts*100:.1f}%)")
|
|
print(f"Truncation applied: {truncation_count} layouts ({truncation_count/total_layouts*100:.1f}%)")
|
|
print(f"Total matches truncated: {total_truncated_matches}")
|
|
print(f"Total API calls made: {total_api_calls}")
|
|
print(f"Average API calls per layout: {total_api_calls/total_layouts:.1f}")
|
|
print(f"Estimated cost savings vs one-at-a-time: {(1 - total_api_calls/(total_layouts * (len(self.master_images) + 1)))*100:.1f}%")
|
|
print(f"Total processing time: {total_time/60:.1f} minutes")
|
|
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
|
|
|
|
return results
|
|
|
|
def process_all_layouts_hybrid_parallel(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
|
|
"""
|
|
Process all layout images using hybrid detection with parallel layout processing
|
|
"""
|
|
print("Starting hybrid batch processing with parallel layout workers...")
|
|
analysis_method = "vector similarity" if self.vector_mode else "local analysis"
|
|
print(f"Panel threshold: ≤{self.panel_threshold} panels → {analysis_method}, ≥{self.panel_threshold + 1} panels → split + {analysis_method}")
|
|
print(f"Parallel processing enabled: {self.layout_workers} layout workers, max {self.max_concurrent_layouts} concurrent layouts")
|
|
|
|
# Load master images
|
|
self.load_master_images()
|
|
|
|
# Generate master embeddings if vector mode is enabled
|
|
if self.vector_mode:
|
|
self.generate_master_embeddings()
|
|
|
|
if not self.master_embeddings:
|
|
raise Exception("No master embeddings available for vector mode")
|
|
|
|
# Configure worker counts now that we know the number of masters
|
|
self._configure_worker_counts()
|
|
|
|
# Initialize inlier analysis coordinator
|
|
self.inlier_coordinator = InlierAnalysisCoordinator(
|
|
self.local_workers,
|
|
self.memory_manager,
|
|
self.min_good_matches
|
|
)
|
|
self.inlier_coordinator.start()
|
|
|
|
# Get layout files
|
|
layout_files = self._prepare_layout_files(limit, specific_file)
|
|
total_layouts = len(layout_files)
|
|
|
|
# Initialize progress tracker
|
|
self.progress_tracker = ProgressTracker(total_layouts)
|
|
|
|
print(f"Processing {total_layouts} layout images in parallel hybrid mode")
|
|
print("=" * 60)
|
|
|
|
# Process layouts in parallel
|
|
results = {}
|
|
start_time = time.time()
|
|
|
|
# Track statistics
|
|
local_analysis_count = 0
|
|
split_analysis_count = 0
|
|
total_api_calls = 0
|
|
truncation_count = 0
|
|
total_truncated_matches = 0
|
|
|
|
try:
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.layout_workers) as executor:
|
|
# Submit all layout processing tasks
|
|
future_to_layout = {
|
|
executor.submit(self._process_single_layout_parallel, layout_path, i, total_layouts): layout_path
|
|
for i, layout_path in enumerate(layout_files, 1)
|
|
}
|
|
|
|
# Progress monitoring thread
|
|
progress_thread = threading.Thread(target=self._monitor_progress, daemon=True)
|
|
progress_thread.start()
|
|
|
|
# Collect results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_layout):
|
|
layout_path = future_to_layout[future]
|
|
layout_id = layout_path.stem
|
|
|
|
try:
|
|
result = future.result()
|
|
|
|
# Track statistics
|
|
detection_method = result.get('detection_method', 'unknown')
|
|
if detection_method in ['local_inlier_analysis', 'vector_similarity']:
|
|
local_analysis_count += 1
|
|
elif detection_method in ['split_and_inlier_analysis', 'split_and_vector_similarity']:
|
|
split_analysis_count += 1
|
|
|
|
total_api_calls += result.get('api_calls_used', 0)
|
|
|
|
if result.get('truncation_applied'):
|
|
truncation_count += 1
|
|
total_truncated_matches += result.get('removed_count', 0)
|
|
|
|
# Create layout result
|
|
layout_result = self._create_layout_result(result, layout_path)
|
|
results[layout_id] = layout_result
|
|
|
|
# Update progress
|
|
self.progress_tracker.complete_layout(success=True)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing layout {layout_path.name}: {e}")
|
|
|
|
# Handle worker failure with potential adjustments
|
|
error_result = self._handle_worker_failure(layout_path.name, e)
|
|
results[layout_id] = error_result
|
|
self.progress_tracker.complete_layout(success=False)
|
|
|
|
# Monitor memory and adjust workers if needed
|
|
self._monitor_memory_and_adjust_workers()
|
|
|
|
# Save progress periodically
|
|
if len(results) % 20 == 0:
|
|
self.save_results(results, f"hybrid_parallel_progress_{len(results)}")
|
|
|
|
finally:
|
|
# Stop coordinator
|
|
if self.inlier_coordinator:
|
|
self.inlier_coordinator.stop()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Print statistics
|
|
self._print_parallel_statistics(
|
|
total_layouts, local_analysis_count, split_analysis_count,
|
|
truncation_count, total_truncated_matches, total_api_calls, total_time
|
|
)
|
|
|
|
return results
|
|
|
|
def _prepare_layout_files(self, limit: Optional[int], specific_file: Optional[str]) -> List:
|
|
"""Prepare list of layout files to process"""
|
|
if specific_file:
|
|
# Process only the specific file
|
|
layout_files = [self.layouts_path / specific_file]
|
|
if not layout_files[0].exists():
|
|
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
|
|
print(f"Processing specific file: {specific_file}")
|
|
else:
|
|
layout_files = list(self.layouts_path.glob("*.jpg"))
|
|
|
|
if limit:
|
|
layout_files = layout_files[:limit]
|
|
print(f"Processing first {limit} layouts only")
|
|
|
|
return layout_files
|
|
|
|
def _process_single_layout_parallel(self, layout_path, layout_index, total_layouts):
|
|
"""Process a single layout in parallel mode"""
|
|
layout_name = layout_path.name
|
|
layout_id = layout_path.stem
|
|
|
|
# Mark layout as started
|
|
self.progress_tracker.start_layout()
|
|
|
|
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Parallel hybrid mode)")
|
|
|
|
try:
|
|
# Phase 1: OpenAI API calls (can be parallel across layouts)
|
|
print(f" Step 1: Analyzing panels and censorship using OpenAI O3...")
|
|
combined_result = self.count_panels_and_detect_censorship(str(layout_path))
|
|
panel_count = combined_result.get('panel_count', 1)
|
|
panel_confidence = combined_result.get('panel_confidence', 'unknown')
|
|
is_censored = combined_result.get('is_censored', True)
|
|
censorship_confidence = combined_result.get('censorship_confidence', 'unknown')
|
|
|
|
print(f" Panel analysis: {panel_count} panels detected (confidence: {panel_confidence})")
|
|
print(f" Censorship analysis: {'CENSORED' if is_censored else 'UNCENSORED'} (confidence: {censorship_confidence})")
|
|
|
|
# Phase 2: Determine analysis method and coordinate with serial inlier analysis
|
|
analysis_method = "vector similarity" if self.vector_mode else "local inlier analysis"
|
|
inlier_future = concurrent.futures.Future()
|
|
|
|
if panel_count <= self.panel_threshold:
|
|
print(f" Step 2: Submitting {analysis_method} (≤{self.panel_threshold} panels) to inlier queue...")
|
|
|
|
# Prepare analysis parameters
|
|
analysis_params = {
|
|
'layout_path': str(layout_path),
|
|
'master_images': self.master_images,
|
|
'vector_mode': self.vector_mode,
|
|
'inlier_threshold': self.inlier_threshold,
|
|
'inlier_ratio_threshold': self.inlier_ratio_threshold
|
|
}
|
|
|
|
# Add vector mode specific parameters
|
|
if self.vector_mode:
|
|
analysis_params['embedding_model'] = self.embedding_model
|
|
analysis_params['master_embeddings'] = self.master_embeddings
|
|
analysis_params['similarity_threshold'] = self.similarity_threshold
|
|
|
|
# Submit to inlier analysis queue
|
|
self.inlier_coordinator.submit_analysis(
|
|
layout_id, 'direct', analysis_params, inlier_future
|
|
)
|
|
|
|
detection_method = 'vector_similarity' if self.vector_mode else 'local_inlier_analysis'
|
|
|
|
else:
|
|
print(f" Step 2: Splitting layout and submitting {analysis_method} (≥{self.panel_threshold + 1} panels) to inlier queue...")
|
|
|
|
# Split the layout first
|
|
split_panels = self._split_layout(layout_path, panel_count)
|
|
|
|
# Prepare analysis parameters
|
|
analysis_params = {
|
|
'split_panels': split_panels,
|
|
'master_images': self.master_images,
|
|
'vector_mode': self.vector_mode,
|
|
'inlier_threshold': self.inlier_threshold,
|
|
'inlier_ratio_threshold': self.inlier_ratio_threshold
|
|
}
|
|
|
|
# Add vector mode specific parameters
|
|
if self.vector_mode:
|
|
analysis_params['embedding_model'] = self.embedding_model
|
|
analysis_params['master_embeddings'] = self.master_embeddings
|
|
analysis_params['similarity_threshold'] = self.similarity_threshold
|
|
|
|
# Submit to inlier analysis queue
|
|
self.inlier_coordinator.submit_analysis(
|
|
layout_id, 'split', analysis_params, inlier_future
|
|
)
|
|
|
|
detection_method = 'split_and_vector_similarity' if self.vector_mode else 'split_and_inlier_analysis'
|
|
|
|
# Wait for inlier analysis to complete
|
|
print(f" Step 3: Waiting for inlier analysis completion...")
|
|
inlier_result = inlier_future.result() # No timeout - wait indefinitely for completion
|
|
|
|
# Phase 3: Post-process results (can be parallel across layouts)
|
|
detection_result = self._post_process_parallel_results(
|
|
inlier_result, combined_result, panel_count, detection_method, layout_path
|
|
)
|
|
|
|
detected_count = len(detection_result.get('detected_masters', []))
|
|
base_method = "vector similarity" if self.vector_mode else "local analysis"
|
|
method_name = base_method if panel_count <= self.panel_threshold else f"split + {base_method}"
|
|
|
|
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {method_name}")
|
|
return detection_result
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in parallel hybrid analysis for {layout_name}: {e}"
|
|
print(error_msg)
|
|
return self._create_error_result(layout_path, e)
|
|
|
|
def _split_layout(self, layout_path, panel_count):
|
|
"""Split layout into panels"""
|
|
# Initialize panel splitter if not already done
|
|
if not hasattr(self, 'splitter'):
|
|
if self.split_simple:
|
|
self.splitter = SimplePanelSplitter(debug=False)
|
|
elif self.split_advanced:
|
|
self.splitter = AdvancedPanelSplitter(
|
|
percentile=self.percentile,
|
|
min_gap=self.min_gap,
|
|
debug=False
|
|
)
|
|
else:
|
|
self.splitter = PanelSplitter()
|
|
|
|
# Split layout into panels
|
|
return self.splitter.split_panels(str(layout_path), panel_count)
|
|
|
|
def _post_process_parallel_results(self, inlier_result, combined_result, panel_count, detection_method, layout_path):
|
|
"""Post-process results from parallel inlier analysis"""
|
|
# Extract basic results
|
|
detected_masters = inlier_result.get('detected_masters', [])
|
|
inlier_scores = inlier_result.get('inlier_scores', {})
|
|
|
|
# Apply CEN refinement if enabled
|
|
if self.refinement_mode and detected_masters:
|
|
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
|
|
if cen_images:
|
|
print(f" Step 4: Applying CEN refinement...")
|
|
is_censored = combined_result.get('is_censored', True)
|
|
censorship_confidence = combined_result.get('censorship_confidence', 'unknown')
|
|
|
|
refined_result = self.apply_cen_refinement_with_stored_analysis(
|
|
{'detected_masters': detected_masters, 'layout_path': str(layout_path)},
|
|
is_censored, censorship_confidence
|
|
)
|
|
detected_masters = refined_result.get('detected_masters', [])
|
|
|
|
# Apply deduplication
|
|
original_count = len(detected_masters)
|
|
detected_masters = self.deduplicate_master_matches(detected_masters)
|
|
duplicates_removed = original_count - len(detected_masters)
|
|
|
|
if duplicates_removed > 0:
|
|
print(f" Step 5: Removed {duplicates_removed} duplicate master(s)")
|
|
|
|
# Apply truncation if needed
|
|
if len(detected_masters) > panel_count and inlier_scores and not self.no_truncation:
|
|
print(f" Step 6: Truncating {len(detected_masters)} matches to top {panel_count} by score...")
|
|
|
|
# Sort masters by score (descending)
|
|
sorted_masters = sorted(detected_masters,
|
|
key=lambda mid: inlier_scores.get(mid, 0),
|
|
reverse=True)
|
|
|
|
# Keep only top N matches
|
|
truncated_masters = sorted_masters[:panel_count]
|
|
removed_masters = sorted_masters[panel_count:]
|
|
|
|
detected_masters = truncated_masters
|
|
truncation_applied = True
|
|
original_match_count = len(sorted_masters)
|
|
removed_count = len(removed_masters)
|
|
elif len(detected_masters) > panel_count and self.no_truncation:
|
|
print(f" Step 6: Truncation disabled - keeping all {len(detected_masters)} matches")
|
|
truncation_applied = False
|
|
original_match_count = len(detected_masters)
|
|
removed_count = 0
|
|
else:
|
|
truncation_applied = False
|
|
original_match_count = len(detected_masters)
|
|
removed_count = 0
|
|
|
|
# Calculate confidence score
|
|
final_matches = len(detected_masters)
|
|
confidence_score = self.calculate_confidence_score(final_matches, panel_count)
|
|
|
|
# Apply fallback to OpenAI one-at-a-time if enabled and needed
|
|
if self.fallback_one_at_a_time and final_matches < panel_count:
|
|
print(f" Step 7: Fallback triggered - {final_matches} matched masters < {panel_count} detected panels")
|
|
print(f" Running OpenAI one-at-a-time method with {len(self.master_images)} workers...")
|
|
|
|
# Check resource usage before starting fallback
|
|
self._check_resource_usage()
|
|
|
|
# Create a temporary OpenAI detector for one-at-a-time processing
|
|
try:
|
|
# Force garbage collection before creating detector
|
|
import gc
|
|
gc.collect()
|
|
|
|
# Limit concurrent workers to prevent file descriptor exhaustion
|
|
max_workers = min(len(self.master_images), 20) # Cap at 20 to prevent resource exhaustion
|
|
|
|
temp_detector = OpenAIImageDetector(
|
|
enable_greyscale=self.enable_greyscale,
|
|
enable_contrast_enhancement=self.enable_contrast_enhancement,
|
|
contrast_factor=self.contrast_factor,
|
|
refinement_mode=self.refinement_mode,
|
|
one_at_a_time_mode=True,
|
|
max_concurrent_workers=max_workers
|
|
)
|
|
|
|
# Set the master images
|
|
temp_detector.master_images = self.master_images.copy()
|
|
|
|
print(f" Using {max_workers} workers for fallback (limited to prevent resource exhaustion)")
|
|
|
|
# Run OpenAI one-at-a-time detection with stored censorship data
|
|
stored_censorship_data = {
|
|
'is_censored': combined_result.get('is_censored', True),
|
|
'confidence': combined_result.get('censorship_confidence', 'unknown')
|
|
}
|
|
|
|
fallback_result = temp_detector.detect_images_in_layout_one_at_a_time(
|
|
str(layout_path), 0, 1, stored_censorship_data
|
|
)
|
|
|
|
# Clean up the temporary detector
|
|
if hasattr(temp_detector, 'cleanup_temp_files'):
|
|
temp_detector.cleanup_temp_files()
|
|
|
|
# Force garbage collection after fallback
|
|
del temp_detector
|
|
gc.collect()
|
|
|
|
if fallback_result and fallback_result.get('detected_masters'):
|
|
fallback_masters = fallback_result['detected_masters']
|
|
print(f" Fallback successful - Found {len(fallback_masters)} matches")
|
|
|
|
# Apply CEN refinement if enabled
|
|
if self.refinement_mode:
|
|
fallback_result = self.apply_cen_refinement_with_stored_analysis(
|
|
fallback_result,
|
|
combined_result.get('is_censored', True),
|
|
combined_result.get('censorship_confidence', 'unknown')
|
|
)
|
|
|
|
# Apply deduplication
|
|
original_count = len(fallback_result['detected_masters'])
|
|
fallback_result['detected_masters'] = self.deduplicate_master_matches(fallback_result['detected_masters'])
|
|
|
|
# Use fallback results
|
|
detected_masters = fallback_result['detected_masters']
|
|
final_matches = len(detected_masters)
|
|
confidence_score = self.calculate_confidence_score(final_matches, panel_count)
|
|
|
|
print(f" Fallback completed - Using {final_matches} matches from OpenAI one-at-a-time")
|
|
|
|
# Mark that fallback was used
|
|
fallback_applied = True
|
|
original_detection_method = detection_method
|
|
detection_method = 'openai_one_at_a_time_fallback'
|
|
fallback_api_calls = fallback_result.get('api_calls_made', 0)
|
|
else:
|
|
print(f" Fallback failed - No additional matches found, keeping original results")
|
|
fallback_applied = False
|
|
original_detection_method = detection_method
|
|
fallback_api_calls = 0
|
|
|
|
except Exception as e:
|
|
print(f" ERROR: Fallback failed with error: {e}")
|
|
|
|
# Handle specific "Too many open files" error
|
|
if "Too many open files" in str(e) or "Errno 24" in str(e):
|
|
print(f" → This is a resource exhaustion issue - forcing cleanup...")
|
|
import gc
|
|
gc.collect()
|
|
|
|
# Try to clean up any temp detector that might exist
|
|
if 'temp_detector' in locals():
|
|
try:
|
|
if hasattr(temp_detector, 'cleanup_temp_files'):
|
|
temp_detector.cleanup_temp_files()
|
|
del temp_detector
|
|
except:
|
|
pass
|
|
|
|
print(f" → Cleanup completed, continuing with original results...")
|
|
|
|
fallback_applied = False
|
|
original_detection_method = detection_method
|
|
fallback_api_calls = 0
|
|
else:
|
|
fallback_applied = False
|
|
original_detection_method = detection_method
|
|
fallback_api_calls = 0
|
|
|
|
# Build result dictionary
|
|
result = {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
|
|
'analysis': f"Parallel hybrid processing: {detection_method}",
|
|
'detection_method': detection_method,
|
|
'panel_count': panel_count,
|
|
'panel_threshold': self.panel_threshold,
|
|
'processing_mode': 'hybrid_parallel',
|
|
'layout_path': str(layout_path),
|
|
'confidence_score': confidence_score,
|
|
'api_calls_used': 1 + fallback_api_calls, # Consolidated panel counting + censorship + fallback calls
|
|
'truncation_applied': truncation_applied,
|
|
'original_match_count': original_match_count,
|
|
'final_match_count': final_matches,
|
|
'removed_count': removed_count,
|
|
'deduplication_applied': duplicates_removed > 0,
|
|
'duplicates_removed': duplicates_removed,
|
|
'fallback_applied': fallback_applied
|
|
}
|
|
|
|
# Add fallback-specific fields if fallback was used
|
|
if fallback_applied:
|
|
result['original_detection_method'] = original_detection_method
|
|
result['processing_mode'] = 'hybrid_parallel_with_fallback'
|
|
|
|
# Add panel and censorship analysis
|
|
result['panel_analysis'] = {
|
|
'panel_count': panel_count,
|
|
'confidence': combined_result.get('panel_confidence', 'unknown'),
|
|
'analysis': combined_result.get('panel_analysis', ''),
|
|
'panel_descriptions': combined_result.get('panel_descriptions', [])
|
|
}
|
|
result['censorship_analysis'] = {
|
|
'is_censored': combined_result.get('is_censored', True),
|
|
'confidence': combined_result.get('censorship_confidence', 'unknown'),
|
|
'analysis': combined_result.get('censorship_analysis', ''),
|
|
'coverage_details': combined_result.get('coverage_details', '')
|
|
}
|
|
|
|
# Add inlier analysis specific results
|
|
if 'analysis_mode' in inlier_result:
|
|
result['inlier_analysis_mode'] = inlier_result['analysis_mode']
|
|
if 'total_masters_checked' in inlier_result:
|
|
result['total_masters_checked'] = inlier_result['total_masters_checked']
|
|
if 'potential_matches_found' in inlier_result:
|
|
result['potential_matches_found'] = inlier_result['potential_matches_found']
|
|
if 'workers_used' in inlier_result:
|
|
result['inlier_workers_used'] = inlier_result['workers_used']
|
|
|
|
return result
|
|
|
|
def _create_layout_result(self, result, layout_path):
|
|
"""Create standardized layout result dictionary"""
|
|
layout_result = {
|
|
'layout_filename': layout_path.name,
|
|
'detected_master_ids': result.get('detected_master_ids', []),
|
|
'detected_master_filenames': result.get('detected_master_filenames', []),
|
|
'analysis': result.get('analysis', ''),
|
|
'detection_method': result.get('detection_method', 'unknown'),
|
|
'panel_count': result.get('panel_count', 1),
|
|
'panel_threshold': self.panel_threshold,
|
|
'processing_mode': result.get('processing_mode', 'hybrid_parallel'),
|
|
'confidence_score': result.get('confidence_score', 0.0)
|
|
}
|
|
|
|
# Add optional fields if present
|
|
optional_fields = [
|
|
'panel_analysis', 'censorship_analysis', 'total_masters_checked',
|
|
'truncation_applied', 'original_match_count', 'final_match_count',
|
|
'removed_count', 'deduplication_applied', 'duplicates_removed',
|
|
'inlier_analysis_mode', 'potential_matches_found', 'inlier_workers_used'
|
|
]
|
|
|
|
for field in optional_fields:
|
|
if field in result:
|
|
layout_result[field] = result[field]
|
|
|
|
# Add cost breakdown if available
|
|
cost_breakdown = cost_calculator.get_layout_cost_breakdown(layout_path.name)
|
|
if cost_breakdown:
|
|
layout_result['cost_breakdown'] = cost_breakdown
|
|
|
|
return layout_result
|
|
|
|
def _create_error_result(self, layout_path, error):
|
|
"""Create error result dictionary"""
|
|
return {
|
|
'layout_filename': layout_path.name,
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': f'Error in parallel hybrid processing: {error}',
|
|
'detection_method': 'error',
|
|
'panel_count': 0,
|
|
'panel_threshold': self.panel_threshold,
|
|
'processing_mode': 'hybrid_parallel_error',
|
|
'confidence_score': 0.0,
|
|
'error': str(error)
|
|
}
|
|
|
|
def _monitor_progress(self):
|
|
"""Monitor and display progress periodically"""
|
|
monitor_cycles = 0
|
|
last_completed = 0
|
|
stall_count = 0
|
|
|
|
while True:
|
|
time.sleep(10) # Update every 10 seconds
|
|
monitor_cycles += 1
|
|
|
|
# Update queue size
|
|
if self.inlier_coordinator:
|
|
queue_size = self.inlier_coordinator.get_queue_size()
|
|
self.progress_tracker.update_queue_size(queue_size)
|
|
|
|
# Check for potential stalls
|
|
info = self.progress_tracker.get_progress_info()
|
|
current_completed = info['completed']
|
|
|
|
# Detect stall condition
|
|
if current_completed == last_completed and queue_size > 0:
|
|
stall_count += 1
|
|
if stall_count >= 6: # 60 seconds of no progress
|
|
print(f" → STALL DETECTED: No progress for 60s with {queue_size} items in queue")
|
|
current_task = self.inlier_coordinator.get_current_task_info()
|
|
if current_task:
|
|
processing_time = time.time() - current_task['started_at']
|
|
print(f" → Current task: {current_task['layout_id']} ({current_task['analysis_type']}) - {processing_time:.1f}s")
|
|
|
|
# If a task is taking too long, provide information but don't timeout
|
|
if current_task and processing_time > 300: # 5 minutes
|
|
print(f" → INFO: Long-running task detected ({processing_time:.1f}s) - continuing to wait for completion")
|
|
|
|
# Automatic queue pressure relief
|
|
if queue_size >= 3 and stall_count >= 6:
|
|
print(f" → QUEUE PRESSURE RELIEF: Reducing layout workers to help with bottleneck")
|
|
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
|
|
original_workers = self.layout_workers
|
|
self.layout_workers = max(1, self.layout_workers - 1)
|
|
print(f" → Reduced layout workers: {original_workers} → {self.layout_workers}")
|
|
else:
|
|
print(f" → Cannot reduce layout workers further (current: {getattr(self, 'layout_workers', 'unknown')})")
|
|
|
|
stall_count = 0 # Reset after taking action
|
|
else:
|
|
stall_count = 0
|
|
last_completed = current_completed
|
|
|
|
# Print progress
|
|
self.progress_tracker.print_progress()
|
|
|
|
# Monitor memory and adjust workers every 30 seconds (3 cycles)
|
|
if monitor_cycles % 3 == 0:
|
|
self._monitor_memory_and_adjust_workers()
|
|
|
|
# Check if we're done
|
|
info = self.progress_tracker.get_progress_info()
|
|
if info['completed'] + info['failed'] >= info['total']:
|
|
break
|
|
|
|
def _print_parallel_statistics(self, total_layouts, local_analysis_count, split_analysis_count,
|
|
truncation_count, total_truncated_matches, total_api_calls, total_time):
|
|
"""Print parallel processing statistics"""
|
|
print(f"\n{'='*60}")
|
|
print("PARALLEL HYBRID PROCESSING STATISTICS")
|
|
print(f"{'='*60}")
|
|
print(f"Total layouts processed: {total_layouts}")
|
|
print(f"Layout workers used: {self.layout_workers}")
|
|
print(f"Local analysis used: {local_analysis_count} ({local_analysis_count/total_layouts*100:.1f}%)")
|
|
print(f"Split + analysis used: {split_analysis_count} ({split_analysis_count/total_layouts*100:.1f}%)")
|
|
print(f"Truncation applied: {truncation_count} layouts ({truncation_count/total_layouts*100:.1f}%)")
|
|
print(f"Total matches truncated: {total_truncated_matches}")
|
|
print(f"Total API calls made: {total_api_calls}")
|
|
print(f"Average API calls per layout: {total_api_calls/total_layouts:.1f}")
|
|
print(f"Estimated cost savings vs one-at-a-time: {(1 - total_api_calls/(total_layouts * (len(self.master_images) + 1)))*100:.1f}%")
|
|
print(f"Total processing time: {total_time/60:.1f} minutes")
|
|
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
|
|
|
|
# Estimate speedup
|
|
if hasattr(self, 'layout_workers') and self.layout_workers > 1:
|
|
estimated_sequential_time = total_time * self.layout_workers
|
|
print(f"Estimated sequential time: {estimated_sequential_time/60:.1f} minutes")
|
|
print(f"Parallel speedup: {estimated_sequential_time/total_time:.1f}x")
|
|
|
|
def _monitor_memory_and_adjust_workers(self):
|
|
"""Monitor memory usage and dynamically adjust worker counts"""
|
|
try:
|
|
memory_usage = psutil.virtual_memory().percent
|
|
swap_usage = psutil.swap_memory().percent
|
|
|
|
# Get queue information for bottleneck detection
|
|
queue_size = 0
|
|
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
|
|
queue_size = self.inlier_coordinator.get_queue_size()
|
|
|
|
# Check if we need to reduce workers due to memory pressure
|
|
# Very lenient swap usage threshold since swap being full is acceptable
|
|
# Also consider queue pressure as a factor
|
|
memory_pressure = memory_usage > 85 or (swap_usage > 95 and memory_usage > 80)
|
|
queue_pressure = queue_size >= 3
|
|
|
|
if memory_pressure or queue_pressure:
|
|
adjustments_made = False
|
|
|
|
if memory_pressure:
|
|
print(f" → Memory pressure detected: {memory_usage:.1f}% RAM, {swap_usage:.1f}% swap")
|
|
|
|
# For memory pressure, reduce both types of workers
|
|
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
|
|
original_workers = self.layout_workers
|
|
self.layout_workers = max(1, self.layout_workers - 1)
|
|
print(f" → Reduced layout workers: {original_workers} → {self.layout_workers}")
|
|
adjustments_made = True
|
|
|
|
if hasattr(self, 'local_workers') and isinstance(self.local_workers, int) and self.local_workers > 1:
|
|
original_local = self.local_workers
|
|
self.local_workers = max(1, self.local_workers - 1)
|
|
print(f" → Reduced local workers: {original_local} → {self.local_workers}")
|
|
|
|
# Update inlier coordinator if it exists
|
|
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
|
|
self.inlier_coordinator.local_workers = self.local_workers
|
|
adjustments_made = True
|
|
|
|
if queue_pressure:
|
|
print(f" → Queue pressure detected: {queue_size} items in inlier queue")
|
|
|
|
# For queue pressure, ONLY reduce layout workers (producers)
|
|
# DO NOT reduce local workers (consumers) - that makes the problem worse!
|
|
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
|
|
original_workers = self.layout_workers
|
|
self.layout_workers = max(1, self.layout_workers - 1)
|
|
print(f" → Reduced layout workers to reduce queue pressure: {original_workers} → {self.layout_workers}")
|
|
adjustments_made = True
|
|
else:
|
|
print(f" → Cannot reduce layout workers further (current: {self.layout_workers})")
|
|
|
|
return adjustments_made
|
|
|
|
# Check if we can safely increase workers
|
|
# More lenient conditions for increasing workers
|
|
elif memory_usage < 75 and swap_usage < 80:
|
|
cpu_count = os.cpu_count()
|
|
|
|
# Can we increase layout workers?
|
|
if (hasattr(self, 'layout_workers') and
|
|
hasattr(self, '_layout_workers_config') and
|
|
self._layout_workers_config is None and # Only if auto-detected
|
|
isinstance(self.layout_workers, int) and
|
|
self.layout_workers < min(4, cpu_count // 2)):
|
|
|
|
original_workers = self.layout_workers
|
|
self.layout_workers = min(4, self.layout_workers + 1)
|
|
print(f" → Increased layout workers: {original_workers} → {self.layout_workers}")
|
|
|
|
return True # Adjustments made
|
|
|
|
# Can we increase local workers?
|
|
if (hasattr(self, 'local_workers') and
|
|
hasattr(self, '_local_workers_config') and
|
|
self._local_workers_config is None and # Only if auto-detected
|
|
isinstance(self.local_workers, int) and
|
|
self.local_workers < max(1, cpu_count - 2)):
|
|
|
|
original_local = self.local_workers
|
|
self.local_workers = min(cpu_count - 2, self.local_workers + 1)
|
|
print(f" → Increased local workers: {original_local} → {self.local_workers}")
|
|
|
|
# Update inlier coordinator if it exists
|
|
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
|
|
self.inlier_coordinator.local_workers = self.local_workers
|
|
|
|
return True # Adjustments made
|
|
|
|
return False # No adjustments needed
|
|
|
|
except Exception as e:
|
|
print(f" → Error monitoring memory: {e}")
|
|
return False
|
|
|
|
def _handle_worker_failure(self, failed_layout, exception):
|
|
"""Handle individual layout worker failures gracefully"""
|
|
print(f" → Worker failure detected for {failed_layout}: {exception}")
|
|
|
|
# Check if it's a memory-related error
|
|
error_str = str(exception).lower()
|
|
if any(keyword in error_str for keyword in ['memory', 'out of memory', 'memoryerror', 'killed']):
|
|
print(f" → Memory-related failure detected, reducing worker counts")
|
|
|
|
# Emergency reduction of all workers
|
|
if hasattr(self, 'layout_workers') and isinstance(self.layout_workers, int) and self.layout_workers > 1:
|
|
self.layout_workers = max(1, self.layout_workers // 2)
|
|
print(f" → Emergency: Reduced layout workers to {self.layout_workers}")
|
|
|
|
if hasattr(self, 'local_workers') and isinstance(self.local_workers, int) and self.local_workers > 1:
|
|
self.local_workers = max(1, self.local_workers // 2)
|
|
print(f" → Emergency: Reduced local workers to {self.local_workers}")
|
|
|
|
# Update inlier coordinator
|
|
if hasattr(self, 'inlier_coordinator') and self.inlier_coordinator:
|
|
self.inlier_coordinator.local_workers = self.local_workers
|
|
|
|
# Return an error result
|
|
return {
|
|
'layout_filename': failed_layout,
|
|
'detected_master_ids': [],
|
|
'detected_master_filenames': [],
|
|
'analysis': f'Worker failure: {exception}',
|
|
'detection_method': 'worker_failure',
|
|
'panel_count': 0,
|
|
'panel_threshold': self.panel_threshold,
|
|
'processing_mode': 'hybrid_parallel_worker_failure',
|
|
'confidence_score': 0.0,
|
|
'error': str(exception),
|
|
'worker_failure': True
|
|
}
|
|
|
|
def _check_resource_usage(self):
|
|
"""Check and log current resource usage"""
|
|
try:
|
|
import resource
|
|
import gc
|
|
|
|
# Get current file descriptor usage
|
|
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
|
|
|
|
# Get current process info
|
|
import os
|
|
pid = os.getpid()
|
|
|
|
# Count open file descriptors (macOS specific)
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(['lsof', '-p', str(pid)], capture_output=True, text=True)
|
|
open_files = len(result.stdout.strip().split('\n')) - 1 # -1 for header
|
|
except:
|
|
open_files = "unknown"
|
|
|
|
# Force garbage collection
|
|
gc.collect()
|
|
|
|
print(f" → Resource check: {open_files} open files, limit: {soft_limit}/{hard_limit}")
|
|
|
|
# Warn if getting close to limit
|
|
if isinstance(open_files, int) and open_files > soft_limit * 0.8:
|
|
print(f" → WARNING: High file descriptor usage ({open_files}/{soft_limit})")
|
|
print(f" → Consider reducing concurrent workers or increasing system limits")
|
|
|
|
except Exception as e:
|
|
print(f" → Could not check resource usage: {e}")
|
|
|
|
def save_results(self, results: Dict, filename: str = "hybrid_detection_results") -> str:
|
|
"""Save hybrid detection results to JSON file"""
|
|
output_path = self.results_path / f"{filename}.json"
|
|
|
|
# Add metadata
|
|
model_description = 'openai_o3_plus_vector_similarity' if self.vector_mode else 'openai_o3_plus_local_analysis'
|
|
processing_mode = 'hybrid_vector' if self.vector_mode else 'hybrid'
|
|
|
|
output_data = {
|
|
'metadata': {
|
|
'total_layouts_processed': len(results),
|
|
'total_master_images': len(self.master_images),
|
|
'master_images_available': list(self.master_files.keys()),
|
|
'provider': 'hybrid',
|
|
'model': model_description,
|
|
'panel_threshold': self.panel_threshold,
|
|
'inlier_threshold': self.inlier_threshold,
|
|
'greyscale_enabled': self.enable_greyscale,
|
|
'contrast_enhancement_enabled': self.enable_contrast_enhancement,
|
|
'processing_mode': processing_mode,
|
|
'vector_mode': self.vector_mode
|
|
},
|
|
'results': results
|
|
}
|
|
|
|
# Add vector mode specific metadata
|
|
if self.vector_mode:
|
|
output_data['metadata']['similarity_threshold'] = self.similarity_threshold
|
|
output_data['metadata']['embedding_model'] = 'Google Vertex AI multimodalembedding@001'
|
|
output_data['metadata']['embedding_dimensions'] = 1408
|
|
|
|
# Add cost tracking metadata if enabled
|
|
if cost_calculator.enable_tracking:
|
|
session_summary = cost_calculator.get_session_summary()
|
|
if session_summary['tracking_enabled']:
|
|
output_data['metadata']['cost_tracking'] = session_summary
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(output_data, f, indent=2)
|
|
|
|
print(f"Results saved to: {output_path}")
|
|
return str(output_path)
|
|
|
|
def generate_summary(self, results: Dict) -> Dict:
|
|
"""Generate summary statistics for hybrid detection"""
|
|
total_layouts = len(results)
|
|
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
|
|
|
|
# Count detection methods used
|
|
local_analysis_count = sum(1 for r in results.values() if r.get('detection_method') in ['local_inlier_analysis', 'vector_similarity'])
|
|
split_analysis_count = sum(1 for r in results.values() if r.get('detection_method') in ['split_and_inlier_analysis', 'split_and_vector_similarity'])
|
|
|
|
# Count master image occurrences
|
|
master_counts = {}
|
|
for result in results.values():
|
|
for master_id in result['detected_master_ids']:
|
|
master_counts[master_id] = master_counts.get(master_id, 0) + 1
|
|
|
|
# Deduplication statistics
|
|
layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False))
|
|
total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values())
|
|
|
|
# Truncation statistics
|
|
layouts_with_truncation = sum(1 for r in results.values() if r.get('truncation_applied', False))
|
|
total_matches_removed = sum(r.get('removed_count', 0) for r in results.values())
|
|
|
|
summary = {
|
|
'total_layouts_processed': total_layouts,
|
|
'layouts_with_matches': layouts_with_matches,
|
|
'layouts_without_matches': total_layouts - layouts_with_matches,
|
|
'local_analysis_used': local_analysis_count,
|
|
'split_analysis_used': split_analysis_count,
|
|
'local_analysis_percentage': round(local_analysis_count / total_layouts * 100, 1) if total_layouts > 0 else 0,
|
|
'split_analysis_percentage': round(split_analysis_count / total_layouts * 100, 1) if total_layouts > 0 else 0,
|
|
'master_image_usage': master_counts,
|
|
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
|
|
# Deduplication stats
|
|
'layouts_with_deduplication': layouts_with_deduplication,
|
|
'total_duplicates_removed': total_duplicates_removed,
|
|
'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0,
|
|
# Truncation stats
|
|
'layouts_with_truncation': layouts_with_truncation,
|
|
'total_matches_removed_by_truncation': total_matches_removed,
|
|
'truncation_rate': round(layouts_with_truncation / total_layouts * 100, 1) if total_layouts > 0 else 0,
|
|
'provider': 'hybrid',
|
|
'model': 'openai_o3_plus_vector_similarity' if self.vector_mode else 'openai_o3_plus_local_analysis_plus_split',
|
|
'panel_threshold': self.panel_threshold,
|
|
'inlier_threshold': self.inlier_threshold,
|
|
'vector_mode': self.vector_mode
|
|
}
|
|
|
|
# Add vector mode specific fields
|
|
if self.vector_mode:
|
|
summary['similarity_threshold'] = self.similarity_threshold
|
|
summary['embedding_model'] = 'Google Vertex AI multimodalembedding@001'
|
|
summary['embedding_dimensions'] = 1408
|
|
|
|
return summary
|
|
|
|
def apply_cen_refinement_with_stored_analysis(self, initial_results: Dict, is_layout_censored: bool, censorship_confidence: str) -> Dict:
|
|
"""
|
|
Apply CEN refinement using stored censorship analysis from the consolidated API call
|
|
"""
|
|
layout_name = Path(initial_results.get('layout_path', 'unknown')).name
|
|
detected_masters = initial_results.get('detected_masters', [])
|
|
|
|
# Find CEN images in the results
|
|
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
|
|
|
|
if not cen_images:
|
|
# No CEN images found, return original results
|
|
return initial_results
|
|
|
|
print(f" Refining {len(cen_images)} CEN matches for {layout_name}")
|
|
print(f" Using stored censorship analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {censorship_confidence})")
|
|
|
|
refined_masters = []
|
|
refinement_details = []
|
|
changes_made = 0
|
|
|
|
# Process each detected image
|
|
for master_id in detected_masters:
|
|
if self.is_cen_image(master_id):
|
|
# This is a CEN image
|
|
non_cen_id = self.find_corresponding_non_cen_image(master_id)
|
|
|
|
if not is_layout_censored and non_cen_id:
|
|
# Layout is uncensored, switch to non-CEN version
|
|
refined_masters.append(non_cen_id)
|
|
refinement_details.append({
|
|
'original_cen_match': master_id,
|
|
'non_cen_alternative': non_cen_id,
|
|
'final_choice': non_cen_id,
|
|
'confidence': censorship_confidence,
|
|
'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}",
|
|
'changed': True,
|
|
'reason': 'layout_uncensored'
|
|
})
|
|
changes_made += 1
|
|
print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)")
|
|
else:
|
|
# Layout is censored or no non-CEN alternative, keep CEN version
|
|
refined_masters.append(master_id)
|
|
reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative'
|
|
refinement_details.append({
|
|
'original_cen_match': master_id,
|
|
'non_cen_alternative': non_cen_id,
|
|
'final_choice': master_id,
|
|
'confidence': censorship_confidence,
|
|
'analysis': f"Layout is censored, keeping CEN version: {master_id}",
|
|
'changed': False,
|
|
'reason': reason
|
|
})
|
|
print(f" → Kept {master_id} (layout is censored)")
|
|
else:
|
|
# Not a CEN image, keep as is
|
|
refined_masters.append(master_id)
|
|
|
|
# Update the results
|
|
refined_results = initial_results.copy()
|
|
refined_results['detected_masters'] = refined_masters
|
|
refined_results['detected_master_ids'] = refined_masters
|
|
refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters]
|
|
|
|
# Store refinement information
|
|
refined_results['cen_refinement_applied'] = True
|
|
refined_results['cen_refinement_details'] = refinement_details
|
|
refined_results['cen_refinement_changes'] = changes_made
|
|
refined_results['censorship_used_stored_analysis'] = True
|
|
|
|
print(f" → CEN refinement completed: {changes_made} changes made")
|
|
|
|
return refined_results
|
|
|
|
def calculate_confidence_score(self, final_matches: int, panel_count: int) -> float:
|
|
"""
|
|
Calculate confidence score as percentage ratio of final matches to detected panels
|
|
|
|
Args:
|
|
final_matches: Number of final detected matches after deduplication and refinement
|
|
panel_count: Number of panels detected by OpenAI O3
|
|
|
|
Returns:
|
|
float: Confidence percentage (0.0 to 100.0)
|
|
"""
|
|
if panel_count == 0:
|
|
return 0.0
|
|
|
|
# Calculate raw percentage
|
|
raw_percentage = (final_matches / panel_count) * 100
|
|
|
|
# Cap at 100% (can't have more confidence than 100%)
|
|
confidence_percentage = min(raw_percentage, 100.0)
|
|
|
|
return confidence_percentage |