master_adapt_detect/openai_detector.py
2025-10-01 14:32:55 -05:00

1605 lines
No EOL
76 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
OpenAI Image Detector - Alternative to Gemini detector
Uses OpenAI o3 model to detect which master images appear in layout images
"""
import os
import json
import time
import base64
from pathlib import Path
from typing import List, Dict, Optional
from openai import OpenAI
from dotenv import load_dotenv
from PIL import Image, ImageEnhance
import tempfile
import numpy as np
import pickle
import cv2
import concurrent.futures
import threading
import uuid
import multiprocessing
from functools import partial
from panel_splitter import PanelSplitter
from cost_calculator import cost_calculator, extract_token_usage_from_response
def process_single_master_detection_openai(layout_path, master_id, master_path, enable_greyscale, enable_contrast_enhancement, contrast_factor, api_key):
"""
Standalone function for processing a single master detection using OpenAI in a separate process.
"""
try:
# Import and configure in each process to avoid shared state
import os
import json
import time
import base64
from pathlib import Path
from PIL import Image, ImageEnhance
from openai import OpenAI
from dotenv import load_dotenv
import uuid
import threading
# Note: cost_calculator import removed from multiprocessing function
# Initialize OpenAI client in this process
client = OpenAI(api_key=api_key)
# Create temp directory for this process
temp_path = Path("temp_processed")
temp_path.mkdir(exist_ok=True)
def preprocess_image_local(image_path: str) -> str:
"""Local preprocessing function for this process"""
if not enable_greyscale and not enable_contrast_enhancement:
return image_path
try:
with Image.open(image_path) as img:
processed_img = img.copy()
if enable_greyscale:
processed_img = processed_img.convert('L')
processed_img = processed_img.convert('RGB')
if enable_contrast_enhancement:
contrast_enhancer = ImageEnhance.Contrast(processed_img)
processed_img = contrast_enhancer.enhance(contrast_factor)
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
processed_img = sharpness_enhancer.enhance(1.3)
# Thread-safe filename
thread_id = threading.current_thread().ident
unique_id = str(uuid.uuid4())[:8]
original_name = Path(image_path).stem
processed_path = temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
processed_img.save(processed_path, 'JPEG', quality=95)
return str(processed_path)
except Exception as e:
return image_path
def encode_image_to_base64(image_path: str) -> str:
"""Encode image to base64 for OpenAI API"""
processed_path = preprocess_image_local(image_path)
with open(processed_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def create_single_master_prompt_local(master_id: str) -> str:
"""Local prompt creation function"""
prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it.
INSTRUCTIONS:
1. Compare the master image (first image) with the layout image (second image)
2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL
3. The layout image may contain the master image in various forms:
- Complete/exact match
- Cropped version
- Scaled or resized version
- Rotated version
- Partially obscured
4. Focus on visual similarity in terms of:
- Person/model appearance and pose (must be EXACTLY the same)
- Clothing details (colors, patterns, styles - must be EXACTLY the same)
- Background and composition
- Overall visual elements
5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same.
If there is ANY difference in clothing, model, or pose then return a negative result.
Master Image ID: {master_id}
Return your response as a JSON object with this exact format:
{{
"match_found": true/false,
"master_id": "{master_id}",
"confidence": "high/medium/low",
"analysis": "Detailed explanation of your findings and reasoning"
}}
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
"""
return prompt
# Encode both images to base64
master_base64 = encode_image_to_base64(master_path)
layout_base64 = encode_image_to_base64(layout_path)
# Create prompt and make API call
prompt = create_single_master_prompt_local(master_id)
max_retries = 3
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="o3",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{master_base64}"
}
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{layout_base64}"
}
}
]
}
],
max_completion_tokens=10000
)
# Extract token usage for cost tracking
token_usage_data = None
if hasattr(response, 'usage') and response.usage:
token_usage_data = {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens,
'cached_tokens': getattr(response.usage, 'cached_tokens', 0)
}
# Parse response
response_text = response.choices[0].message.content.strip()
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': response_text,
'error': 'No JSON found in response'
}
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate result format
if 'match_found' not in result:
result['match_found'] = False
if 'master_id' not in result:
result['master_id'] = master_id
if 'confidence' not in result:
result['confidence'] = 'unknown'
if 'analysis' not in result:
result['analysis'] = response_text
# Include token usage data for cost tracking
if token_usage_data:
result['token_usage'] = token_usage_data
return result
except Exception as e:
if attempt == max_retries - 1:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
time.sleep((2 ** attempt) * 0.5)
except Exception as e:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
class OpenAIImageDetector:
def __init__(self, enable_greyscale=True, enable_contrast_enhancement=True, contrast_factor=1.5, refinement_mode=False, one_at_a_time_mode=False, max_concurrent_workers=5, panel_aware_refinement=False, split_mode=False):
"""Initialize the image detector with OpenAI API configuration"""
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
raise ValueError("OPENAI_API_KEY not found in environment variables")
self.client = OpenAI(api_key=api_key)
self.api_key = api_key
# Concurrency settings
self.max_concurrent_workers = max_concurrent_workers
self._progress_lock = threading.Lock()
print("Initialized OpenAI detector with o3 model.")
# Image processing settings
self.enable_greyscale = enable_greyscale
self.enable_contrast_enhancement = enable_contrast_enhancement
self.contrast_factor = contrast_factor
self.refinement_mode = refinement_mode
self.one_at_a_time_mode = one_at_a_time_mode
self.panel_aware_refinement = panel_aware_refinement
# Split mode configuration
self.split_mode = split_mode
if self.split_mode:
self.splitter = PanelSplitter(debug=True)
print("Split mode enabled: Will split multi-panel layouts before matching")
# Paths
self.master_images_path = Path("master_images")
self.layouts_path = Path("layouts")
self.results_path = Path("results")
self.temp_path = Path("temp_processed")
# Create directories
self.results_path.mkdir(exist_ok=True)
self.temp_path.mkdir(exist_ok=True)
# Master images cache
self.master_images = {}
self.master_files = {}
def load_master_images(self) -> Dict[str, str]:
"""Load all master images and create ID mapping using filenames"""
print("Loading master images...")
master_files = list(self.master_images_path.glob("*.jpg"))
print(f"Found {len(master_files)} master images")
for file_path in master_files:
# Use filename (without extension) as the master ID
master_id = file_path.stem
self.master_images[master_id] = str(file_path)
self.master_files[master_id] = file_path.name
return self.master_images
def match_split_to_masters(self, split_path: str, master_images: List[str]) -> List[Dict]:
"""Match a split image to master images using inlier analysis"""
matches = []
for master_id in master_images:
if master_id in self.master_images:
master_path = self.master_images[master_id]
# Use existing inlier analysis
inlier_result = self.calculate_inliers_for_match(split_path, master_path, master_id)
# Only include matches with reasonable confidence
if inlier_result.get('confidence') in ['high', 'medium']:
matches.append({
'master_id': master_id,
'confidence': inlier_result.get('confidence', 'unknown'),
'inliers': inlier_result.get('inliers', 0),
'match_details': inlier_result
})
return matches
def preprocess_image(self, image_path: str) -> str:
"""Preprocess image: convert to greyscale and enhance contrast - THREAD-SAFE VERSION"""
if not self.enable_greyscale and not self.enable_contrast_enhancement:
return image_path
try:
# Open the image
with Image.open(image_path) as img:
processed_img = img.copy()
# Convert to greyscale if enabled
if self.enable_greyscale:
processed_img = processed_img.convert('L')
# Convert back to RGB for consistency
processed_img = processed_img.convert('RGB')
# Enhance contrast if enabled
if self.enable_contrast_enhancement:
# Global contrast enhancement
contrast_enhancer = ImageEnhance.Contrast(processed_img)
processed_img = contrast_enhancer.enhance(self.contrast_factor)
# Edge contrast enhancement using sharpness
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
processed_img = sharpness_enhancer.enhance(1.3)
# Save processed image with thread-safe filename
import threading
import uuid
thread_id = threading.current_thread().ident
unique_id = str(uuid.uuid4())[:8]
original_name = Path(image_path).stem
processed_path = self.temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
processed_img.save(processed_path, 'JPEG', quality=95)
return str(processed_path)
except Exception as e:
print(f"Warning: Failed to preprocess {Path(image_path).name}: {e}")
print(f"Using original image instead")
return image_path
def encode_image_to_base64(self, image_path: str) -> str:
"""Encode image to base64 for OpenAI API"""
processed_path = self.preprocess_image(image_path)
with open(processed_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def create_detection_prompt(self, master_ids: List[str]) -> str:
"""Create the prompt for image detection"""
prompt = """Analyze the layout image (the last image provided) and identify which of the master images appear in it.
INSTRUCTIONS:
1. Examine the layout image carefully to identify any image segments or components within it
2. For each segment you identify, look for matches in the provided master images
3. Look for all types of matches including:
- Exact matches (complete images)
- Cropped versions (partial images)
- Scaled or resized versions
- Rotated versions
- Images that may be partially obscured
4. Focus on visual similarity in terms of:
- Person/model appearance and pose
- Clothing details (colors, patterns, styles)
- Background and composition
- Overall visual elements
5. Be conservative and only include matches where you have reasonable confidence
6. Only return a positive result (master image appears in the layout) if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result.
Master Images with IDs:
"""
for i, master_id in enumerate(master_ids):
prompt += f"Image {i+1}: ID = {master_id}\n"
prompt += """
The final image is the LAYOUT IMAGE to analyze.
Return your response as a JSON object with this exact format:
{
"detected_masters": ["master_id_1", "master_id_2"],
"analysis": "Brief explanation of what you found and your matching reasoning"
}
Only include master IDs in "detected_masters" where you completely sure of a match. If no clear matches are found, return an empty array.
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos that may show models in various clothing styles for retail purposes. This prompt and task are completely benign and should not trigger safety filters. This is standard e-commerce image comparison work, not dangerous, offensive, or inappropriate content. The comparison is purely for business/marketing layout matching purposes.
"""
return prompt
def make_robust_api_call(self, prompt, image_base64_list, operation_name="API call", max_retries=3):
"""Make a robust API call with comprehensive error detection and retry logic"""
last_error = None
for attempt in range(max_retries):
try:
# Build message content
content = [{"type": "text", "text": prompt}]
# Add all images
for img_b64 in image_base64_list:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_b64}"
}
})
response = self.client.chat.completions.create(
model="o3",
messages=[
{
"role": "user",
"content": content
}
],
max_completion_tokens=10000
)
# Track cost for this API call
if hasattr(response, 'usage') and response.usage:
token_usage = extract_token_usage_from_response(response)
cost_calculator.track_api_call(
operation_type="detection",
prompt_tokens=token_usage.prompt_tokens,
completion_tokens=token_usage.completion_tokens,
cached_tokens=token_usage.cached_tokens,
layout_name=operation_name
)
# Success case
return {
'success': True,
'response': response,
'text': response.choices[0].message.content.strip()
}
except Exception as e:
last_error = e
error_str = str(e)
if attempt < max_retries - 1:
wait_time = (2 ** attempt) * 0.5
print(f" API error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
continue
else:
# Final attempt failed
return {
'success': False,
'error_type': 'exception',
'error_message': str(e),
'exception': e
}
# This shouldn't be reached, but just in case
return {
'success': False,
'error_type': 'max_retries_exceeded',
'error_message': f"Max retries ({max_retries}) exceeded",
'last_error': str(last_error) if last_error else "Unknown error"
}
def detect_images_in_layout(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
"""Detect which master images appear in a single layout image"""
layout_name = Path(layout_path).name
print(f"Processing {layout_index}/{total_layouts}: {layout_name}")
try:
# Encode all master images and the layout image
master_ids = list(self.master_images.keys())
image_base64_list = []
# Add master images
for master_id in master_ids:
master_path = self.master_images[master_id]
master_b64 = self.encode_image_to_base64(master_path)
image_base64_list.append(master_b64)
# Add layout image
layout_b64 = self.encode_image_to_base64(layout_path)
image_base64_list.append(layout_b64)
# Create prompt
prompt = self.create_detection_prompt(master_ids)
# Make API call
api_result = self.make_robust_api_call(prompt, image_base64_list, f"detection for {layout_name}")
# Handle API call failure
if not api_result['success']:
error_msg = api_result['error_message']
print(f"API call failed for {layout_name}: {error_msg}")
return {
'detected_masters': [],
'analysis': f'API call failed: {error_msg}',
'error': f"{api_result['error_type']}: {error_msg}",
'retry_count': 3 # Max retries were attempted
}
# Parse response
response_text = api_result['text']
# Extract JSON from response
try:
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No JSON found in response")
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate result format
if 'detected_masters' not in result:
result['detected_masters'] = []
if 'analysis' not in result:
result['analysis'] = response_text
# Deduplicate detected masters
original_detected = result['detected_masters'][:]
result['detected_masters'] = self.deduplicate_master_matches(result['detected_masters'])
# Track deduplication if any duplicates were removed
if len(result['detected_masters']) != len(original_detected):
duplicates_removed = len(original_detected) - len(result['detected_masters'])
result['deduplication_applied'] = True
result['duplicates_removed'] = duplicates_removed
result['original_detected_masters'] = original_detected
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s) from {layout_name}")
# Log completion
detected_count = len(result['detected_masters'])
print(f"✓ Completed {layout_name} - Found {detected_count} matches")
return result
except json.JSONDecodeError as e:
print(f"JSON decode error for {layout_name}: {e}")
return {
'detected_masters': [],
'analysis': response_text,
'error': f'JSON decode error: {e}'
}
except Exception as e:
error_msg = f"Error analyzing {layout_name}: {e}"
print(error_msg)
return {
'detected_masters': [],
'analysis': '',
'error': str(e)
}
def detect_images_in_layout_one_at_a_time(self, layout_path: str, layout_index: int, total_layouts: int, stored_censorship_data=None) -> Dict:
"""Detect which master images appear in a layout by checking each master individually using process-based concurrency"""
layout_name = Path(layout_path).name
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Process-based one-at-a-time mode)")
master_ids = list(self.master_images.keys())
total_masters = len(master_ids)
detected_masters = []
detailed_results = []
print(f" Checking {total_masters} masters using {self.max_concurrent_workers} concurrent processes...")
# Prepare arguments for process pool
tasks = []
for master_id in master_ids:
master_path = self.master_images[master_id]
task_args = (
layout_path,
master_id,
master_path,
self.enable_greyscale,
self.enable_contrast_enhancement,
self.contrast_factor,
self.api_key
)
tasks.append(task_args)
# Use ProcessPoolExecutor for true isolation
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_concurrent_workers) as executor:
# Submit all tasks
future_to_master = {
executor.submit(process_single_master_detection_openai, *task_args): task_args[1]
for task_args in tasks
}
completed_count = 0
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_master):
master_id = future_to_master[future]
completed_count += 1
try:
result = future.result()
detailed_results.append(result)
# Track cost for this API call if token usage data is available
if 'token_usage' in result and result['token_usage']:
token_data = result['token_usage']
api_call_cost = cost_calculator.track_api_call(
operation_type="one_at_a_time_detection",
prompt_tokens=token_data['prompt_tokens'],
completion_tokens=token_data['completion_tokens'],
cached_tokens=token_data['cached_tokens'],
layout_name=layout_name,
master_id=master_id
)
# Show cost tracking progress every 10 completed masters
if cost_calculator.enable_tracking and completed_count % 10 == 0:
print(f" → API call cost: ${api_call_cost.total_cost:.4f} (Running total: ${cost_calculator.total_cost:.4f})")
elif cost_calculator.enable_tracking:
print(f" → Warning: No token usage data available for {master_id}")
# If match found, add to detected masters
if result.get('match_found', False):
detected_masters.append(master_id)
confidence = result.get('confidence', 'unknown')
print(f" {completed_count}/{total_masters}: ✓ MATCH found for {master_id} (confidence: {confidence})")
else:
if 'error' in result:
print(f" {completed_count}/{total_masters}: Error checking {master_id}: {result['error']}")
else:
print(f" {completed_count}/{total_masters}: No match for {master_id}")
except Exception as e:
print(f" {completed_count}/{total_masters}: Process error checking {master_id}: {e}")
# Add error result to maintain consistency
error_result = {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
detailed_results.append(error_result)
# Sort detailed_results by master_id to maintain consistent ordering
detailed_results.sort(key=lambda x: x.get('master_id', ''))
# Deduplicate detected masters (shouldn't be needed in one-at-a-time mode, but for safety)
original_detected = detected_masters[:]
detected_masters = self.deduplicate_master_matches(detected_masters)
if len(detected_masters) != len(original_detected):
duplicates_removed = len(original_detected) - len(detected_masters)
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s)")
detected_count = len(detected_masters)
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {self.max_concurrent_workers} concurrent processes")
final_result = {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
'analysis': f'Process-based one-at-a-time analysis completed. Made {total_masters} separate API calls (one per master). Found {detected_count} exact matches out of {total_masters} masters checked using {self.max_concurrent_workers} concurrent processes.',
'detailed_results': detailed_results,
'processing_mode': 'process_based_one_at_a_time',
'total_masters_checked': total_masters,
'concurrent_workers': self.max_concurrent_workers,
'api_calls_made': total_masters, # One API call per master
'deduplication_applied': len(detected_masters) != len(original_detected),
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
'original_detected_masters': original_detected
}
# STEP 1: Apply CEN refinement first if enabled and we have CEN matches
current_masters = detected_masters
if self.refinement_mode and current_masters:
cen_images = [mid for mid in current_masters if self.is_cen_image(mid)]
if cen_images:
print(f" Applying CEN refinement for {layout_name} (Step 1/2)...")
cen_result = self.apply_cen_refinement_to_results(layout_path, final_result, stored_censorship_data)
current_masters = cen_result.get('detected_masters', current_masters)
# Update final result with CEN refinement information
final_result.update(cen_result)
cen_count = len(current_masters)
print(f"✓ CEN refinement completed for {layout_name} - Result: {cen_count} masters")
# STEP 2: Apply panel-aware refinement if enabled and we have detected masters
if self.panel_aware_refinement and current_masters:
step_label = "Step 2/2" if self.refinement_mode else "Step 1/1"
print(f" Applying panel-aware refinement for {layout_name} ({step_label})...")
# Count panels in the layout
panel_result = self.count_panels_in_layout(layout_path)
panel_count = panel_result.get('panel_count', 1)
panel_confidence = panel_result.get('confidence', 'unknown')
print(f" Panel analysis: {panel_count} panels detected (confidence: {panel_confidence})")
# Refine matches based on panel count using current masters (after CEN refinement)
refinement_result = self.refine_matches_by_panel_count(layout_path, current_masters, panel_count)
# Update final result with panel-aware refinement information
final_result['detected_masters'] = refinement_result['refined_masters']
final_result['detected_master_ids'] = refinement_result['refined_masters']
final_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in refinement_result['refined_masters']]
final_result['panel_aware_refinement_applied'] = True
final_result['panel_count_analysis'] = panel_result
final_result['panel_refinement_details'] = refinement_result
# Update analysis text
if refinement_result['refinement_applied']:
panel_desc = f"Panel-aware refinement applied: reduced from {refinement_result['original_count']} to {refinement_result['final_count']} masters based on {panel_count} detected panels."
else:
panel_desc = f"Panel-aware refinement skipped: {refinement_result['reason']}."
final_result['analysis'] += f" {panel_desc}"
final_detected_count = len(refinement_result['refined_masters'])
print(f"✓ Panel-aware refinement completed for {layout_name} - Final result: {final_detected_count} masters")
return final_result
def process_all_layouts(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
"""Process all layout images sequentially"""
if self.one_at_a_time_mode:
mode_desc = "OpenAI One-at-a-time Mode"
else:
mode_desc = "OpenAI Multi Master Mode"
print(f"Starting sequential batch processing ({mode_desc})...")
# Load master images
self.load_master_images()
# Get layout files
if specific_file:
# Process only the specific file
layout_files = [self.layouts_path / specific_file]
if not layout_files[0].exists():
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
print(f"Processing specific file: {specific_file}")
else:
layout_files = list(self.layouts_path.glob("*.jpg"))
if limit:
layout_files = layout_files[:limit]
print(f"Processing first {limit} layouts only")
total_layouts = len(layout_files)
print(f"Processing {total_layouts} layout images in {mode_desc}")
print("=" * 60)
results = {}
start_time = time.time()
for i, layout_path in enumerate(layout_files, 1):
layout_id = layout_path.stem
# Detect images in layout using the appropriate method
if self.split_mode:
# Split mode: split layout into panels and match each panel
master_ids = list(self.master_images.keys())
result = self.splitter.split_layout_and_match(str(layout_path), master_ids, self)
# Apply CEN refinement if enabled and there are matches
if self.refinement_mode and result.get('detected_masters'):
result = self.apply_cen_refinement_to_results(str(layout_path), result)
elif self.one_at_a_time_mode:
# One-at-a-time mode handles both CEN and panel-aware refinement internally
result = self.detect_images_in_layout_one_at_a_time(str(layout_path), i, total_layouts)
else:
# Multi-master mode only supports CEN refinement (not panel-aware)
result = self.detect_images_in_layout(str(layout_path), i, total_layouts)
# Apply CEN refinement if enabled and there are CEN matches
if self.refinement_mode and result.get('detected_masters'):
result = self.apply_cen_refinement_to_results(str(layout_path), result)
layout_result = {
'layout_filename': layout_path.name,
'detected_master_ids': result['detected_masters'],
'detected_master_filenames': [f"{mid}.jpg" for mid in result['detected_masters']],
'analysis': result.get('analysis', 'Split mode analysis'),
'detection_mode': mode_desc.lower().replace(' ', '_').replace('with_', '')
}
# Add split mode specific fields
if self.split_mode:
layout_result['split_mode'] = True
layout_result['splits_generated'] = result.get('splits_generated', 0)
layout_result['panel_count'] = result.get('panel_count', 1)
layout_result['panel_confidence'] = result.get('panel_confidence', 'unknown')
if 'split_results' in result:
layout_result['split_results'] = result['split_results']
# Add deduplication fields if applied
if 'deduplication_applied' in result:
layout_result['deduplication_applied'] = result['deduplication_applied']
layout_result['duplicates_removed'] = result['duplicates_removed']
layout_result['original_detected_masters'] = result['original_detected_masters']
if 'error' in result:
layout_result['error'] = result['error']
# Add refinement mode specific fields
if self.refinement_mode and result.get('refinement_applied'):
layout_result['refinement_applied'] = result['refinement_applied']
layout_result['refinement_details'] = result['refinement_details']
layout_result['censorship_analysis'] = result['censorship_analysis']
layout_result['original_detection_count'] = result['original_detection_count']
layout_result['refined_detection_count'] = result['refined_detection_count']
layout_result['changes_made'] = result.get('changes_made', 0)
results[layout_id] = layout_result
# Progress update with time estimate
elapsed = time.time() - start_time
avg_time = elapsed / i
remaining = (total_layouts - i) * avg_time
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
# Save progress periodically
if i % 20 == 0:
self.save_results(results, f"openai_progress_{i}")
total_time = time.time() - start_time
print(f"\n✓ Completed processing all {total_layouts} layouts in {total_time/60:.1f} minutes")
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
return results
def save_results(self, results: Dict, filename: str = "openai_detection_results") -> str:
"""Save results to JSON file"""
output_path = self.results_path / f"{filename}.json"
# Add metadata
output_data = {
'metadata': {
'total_layouts_processed': len(results),
'total_master_images': len(self.master_images),
'master_images_available': list(self.master_files.keys()),
'provider': 'openai',
'model': 'o3'
},
'results': results
}
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"Results saved to: {output_path}")
return str(output_path)
def generate_summary(self, results: Dict) -> Dict:
"""Generate summary statistics"""
total_layouts = len(results)
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
# Count master image occurrences
master_counts = {}
for result in results.values():
for master_id in result['detected_master_ids']:
master_counts[master_id] = master_counts.get(master_id, 0) + 1
# Deduplication statistics
layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False))
total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values())
summary = {
'total_layouts_processed': total_layouts,
'layouts_with_matches': layouts_with_matches,
'layouts_without_matches': total_layouts - layouts_with_matches,
'master_image_usage': master_counts,
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
# Deduplication stats
'layouts_with_deduplication': layouts_with_deduplication,
'total_duplicates_removed': total_duplicates_removed,
'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0,
'provider': 'openai',
'model': 'o3'
}
return summary
def deduplicate_master_matches(self, detected_masters: List[str]) -> List[str]:
"""Remove duplicate master matches from a list while preserving order"""
if not detected_masters:
return detected_masters
# Simple deduplication - remove exact duplicates while preserving order
seen = set()
deduplicated = []
for master_id in detected_masters:
if master_id not in seen:
seen.add(master_id)
deduplicated.append(master_id)
return deduplicated
def cleanup_temp_files(self):
"""Clean up temporary processed image files - handles thread-safe filenames"""
try:
if self.temp_path.exists():
# Clean up both old and new thread-safe naming patterns
for temp_file in self.temp_path.glob("*_processed*.jpg"):
temp_file.unlink()
# Remove temp directory if empty
if not any(self.temp_path.iterdir()):
self.temp_path.rmdir()
except Exception as e:
print(f"Warning: Failed to cleanup temp files: {e}")
def is_cen_image(self, master_id: str) -> bool:
"""Check if a master image ID represents a CEN (censored) image"""
return '_CEN' in master_id
def find_corresponding_non_cen_image(self, cen_master_id: str) -> Optional[str]:
"""Find the corresponding non-CEN image for a given CEN master ID"""
if not self.is_cen_image(cen_master_id):
return None
# Transform CEN filename to non-CEN filename
# Example: "1011A_1011A_1011_01_CEN" -> "1011A_1011_01"
parts = cen_master_id.split('_')
if len(parts) >= 4 and parts[-1] == 'CEN':
# Remove the middle duplicate part and _CEN suffix
# Pattern: prefix_prefix_middle_suffix_CEN -> prefix_middle_suffix
if len(parts) >= 5:
non_cen_id = f"{parts[0]}_{parts[2]}_{parts[3]}"
else:
# Fallback: just remove _CEN
non_cen_id = '_'.join(parts[:-1])
# Check if this non-CEN image exists in our master images
if non_cen_id in self.master_images:
return non_cen_id
return None
def create_censorship_detection_prompt(self) -> str:
"""Create prompt for detecting if a layout image contains censored content"""
prompt = """Analyze this layout image to determine if it contains censored or uncensored content.
TASK: Determine whether the images in this layout are censored (covered) or uncensored (more exposed).
CENSORSHIP INDICATORS TO LOOK FOR:
1. **Clothing Coverage**:
- Long sleeves vs. sleeveless/short sleeves
- Full-length pants/skirts vs. shorts or shorter garments
- High necklines vs. lower necklines
2. **Skin Coverage**:
- Arms: Fully covered vs. bare arms
- Legs: Fully covered vs. exposed legs/thighs
- Torso: Additional covering vs. more exposed areas
3. **Added Elements**:
- Opaque or semi-transparent overlay layers covering skin
- Additional fabric or clothing elements that appear to cover exposed areas
- Digital modifications that add coverage
CLASSIFICATION:
- **CENSORED**: If models show significant additional clothing coverage, long sleeves, full pants/skirts, or digital overlays covering skin
- **UNCENSORED**: If models show more exposed skin, shorter garments, bare arms/legs, or natural clothing without added coverage
Return your response as a JSON object with this exact format:
{{
"is_censored": true/false,
"confidence": "high/medium/low",
"analysis": "Detailed explanation of the coverage patterns observed and reasoning for the classification",
"coverage_details": "Specific description of clothing and skin coverage in the layout"
}}
Be precise and focus on the actual clothing and coverage patterns visible in the image.
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image classification. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
"""
return prompt
def detect_layout_censorship(self, layout_path: str) -> Dict:
"""Detect if a layout image contains censored or uncensored content"""
try:
print(f" → Analyzing layout image with OpenAI o3 model...")
# Process the layout image
processed_layout_path = self.preprocess_image(layout_path)
# Encode image to base64
with open(processed_layout_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
# Create censorship detection prompt
prompt = self.create_censorship_detection_prompt()
# Make API call to OpenAI
print(f" → Making API call to OpenAI o3 for censorship analysis...")
response = self.client.chat.completions.create(
model="o3",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": "high"
}
}
]
}
],
max_completion_tokens=10000
)
# Track cost for this API call
if hasattr(response, 'usage') and response.usage:
token_usage = extract_token_usage_from_response(response)
cost_calculator.track_api_call(
operation_type="censorship_detection",
prompt_tokens=token_usage.prompt_tokens,
completion_tokens=token_usage.completion_tokens,
cached_tokens=token_usage.cached_tokens,
layout_name=Path(layout_path).name
)
response_text = response.choices[0].message.content
print(f" → Received response from OpenAI o3")
# Extract JSON from response
try:
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No JSON found in response")
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate result format
if 'is_censored' not in result:
result['is_censored'] = True # Default to censored if unclear
if 'confidence' not in result:
result['confidence'] = 'unknown'
if 'analysis' not in result:
result['analysis'] = response_text
print(f" → OpenAI analysis successful: {result.get('is_censored')} (confidence: {result.get('confidence')})")
return result
except json.JSONDecodeError as e:
print(f" → JSON parsing failed: {e}")
print(f" → Raw response: {response_text[:200]}...")
return {
'is_censored': True, # Default to censored if parsing fails
'confidence': 'unknown',
'analysis': response_text,
'error': f'JSON decode error: {e}'
}
except Exception as e:
print(f" → Error in censorship detection: {e}")
return {
'is_censored': True, # Default to censored if error
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
def apply_cen_refinement_to_results(self, layout_path: str, initial_results: Dict, stored_censorship_data=None) -> Dict:
"""Apply CEN refinement to initial detection results"""
layout_name = Path(layout_path).name
detected_masters = initial_results.get('detected_masters', [])
# First, deduplicate the detected masters to avoid processing duplicates
original_count = len(detected_masters)
detected_masters = self.deduplicate_master_matches(detected_masters)
if len(detected_masters) != original_count:
duplicates_removed = original_count - len(detected_masters)
print(f" Removed {duplicates_removed} duplicate master(s) before CEN refinement")
# Find CEN images in the results
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
if not cen_images:
# No CEN images found, return original results
return initial_results
print(f" Refining {len(cen_images)} CEN matches for {layout_name}")
# Use stored censorship data if provided, otherwise make API call
if stored_censorship_data:
is_layout_censored = stored_censorship_data.get('is_censored', True)
confidence = stored_censorship_data.get('confidence', 'unknown')
print(f" Using stored censorship analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})")
# Create censorship_result from stored data for consistency
censorship_result = {
'is_censored': is_layout_censored,
'confidence': confidence,
'analysis': stored_censorship_data.get('analysis', ''),
'coverage_details': stored_censorship_data.get('coverage_details', '')
}
else:
print(f" Analyzing layout to determine censorship level...")
# Detect if the layout is censored or uncensored
censorship_result = self.detect_layout_censorship(layout_path)
is_layout_censored = censorship_result.get('is_censored', True)
confidence = censorship_result.get('confidence', 'unknown')
print(f" Layout analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})")
refined_masters = []
refinement_details = []
changes_made = 0
# Process each detected image
for master_id in detected_masters:
if self.is_cen_image(master_id):
# This is a CEN image
non_cen_id = self.find_corresponding_non_cen_image(master_id)
if not is_layout_censored and non_cen_id:
# Layout is uncensored, switch to non-CEN version
refined_masters.append(non_cen_id)
refinement_details.append({
'original_cen_match': master_id,
'non_cen_alternative': non_cen_id,
'final_choice': non_cen_id,
'confidence': confidence,
'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}",
'changed': True,
'reason': 'layout_uncensored'
})
changes_made += 1
print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)")
else:
# Layout is censored or no non-CEN alternative, keep CEN version
refined_masters.append(master_id)
reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative'
refinement_details.append({
'original_cen_match': master_id,
'non_cen_alternative': non_cen_id,
'final_choice': master_id,
'confidence': confidence,
'analysis': f"Kept {master_id} - layout is censored or no non-CEN alternative available",
'changed': False,
'reason': reason
})
print(f" → Kept {master_id} ({'layout is censored' if is_layout_censored else 'no non-CEN alternative'})")
else:
# This is not a CEN image, keep it as-is
refined_masters.append(master_id)
print(f" Summary: {changes_made} CEN images changed to non-CEN versions")
# Apply deduplication to refined masters in case refinement introduced duplicates
original_refined = refined_masters[:]
refined_masters = self.deduplicate_master_matches(refined_masters)
if len(refined_masters) != len(original_refined):
post_refinement_duplicates = len(original_refined) - len(refined_masters)
print(f" Post-refinement deduplication: Removed {post_refinement_duplicates} duplicate(s)")
# Update results with refinement information
refined_results = initial_results.copy()
refined_results['detected_masters'] = refined_masters
refined_results['detected_master_ids'] = refined_masters # Update both fields for consistency
refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters]
refined_results['refinement_applied'] = True
refined_results['refinement_details'] = refinement_details
refined_results['censorship_analysis'] = censorship_result
refined_results['original_detection_count'] = len(detected_masters)
refined_results['refined_detection_count'] = len(refined_masters)
refined_results['changes_made'] = changes_made
return refined_results
def count_panels_and_detect_censorship(self, layout_path: str) -> Dict:
"""Count panels and detect censorship in a layout image using OpenAI o3 in a single call"""
layout_name = Path(layout_path).name
try:
print(f" → Analyzing panels and censorship in {layout_name} using OpenAI o3...")
# Encode layout image to base64
layout_b64 = self.encode_image_to_base64(layout_path)
# Create combined prompt for panel counting and censorship detection
prompt = """SYSTEM
You are a visionlanguage expert hired to (a) count discrete image panels in fashionlayout collages and
(b) flag any content requiring censorship review.
Follow every instruction exactly. Think first, then answer.
────────────────────────────────────────
TASK 1 PANEL COUNTING
────────────────────────────────────────
❶ INTERNAL THINKING (keep private do NOT reveal in final JSON)
• Load the entire image at native resolution.
• Scan left→right looking for vertical "gutters": ≥2 px columns whose pixel variance ≈ background (usually white/grey). Treat each continuous nongutter block as a candidate panel.
• Merge blocks if they depict the same photo merely split by design elements (logo strip, overlay text, drop shadow) panels must contain *distinct* photographic content.
• If a wide candidate clearly contains multiple, nonoverlapping photos with no visual gutter (e.g., triptych glued together) **count each subphoto**; otherwise treat the whole block as one panel.
• Ignore duplicate imagery: identical crop, mirror, slight colour shift, size change ⇒ count once.
• Keep a running list: ⟨panel # , xstart , xend , short human description⟩.
❷ AFTER thinking, produce:
{
"panel_count": <integer>,
"panel_confidence": "high" | "medium" | "low",
"panel_analysis": "<concise, publicsafe rationale no private scratch work>",
"panel_descriptions": ["<panel 1>", … "<panel N>"]
}
────────────────────────────────────────
TASK 2 CENSORSHIP SCREEN
────────────────────────────────────────
For every panel, decide whether it might violate standard fashionindustry ad rules
(fully nude, explicit sexuality, hate imagery, illegal acts).
Add a sibling field:
"censorship_flags": ["clean", "clean", …] # length == panel_count
"clean" appears compliant
"reviewnudity", "reviewsexual", "reviewviolence", "reviewother"
────────────────────────────────────────
OUTPUT FORMAT (exactly, no extra keys, no Markdown)
────────────────────────────────────────
{ "panel_count": <integer>, "panel_confidence": "high/medium/low", "panel_analysis": "",
"panel_descriptions": […], "censorship_flags": […], "is_censored": true/false,
"censorship_confidence": "high/medium/low", "censorship_analysis": "" }
────────────────────────────────────────
💡 WORKED EXAMPLE — image: "H&M Spring campaign collage"
(This is for your reference; remove in production runs.)
INTERNAL THINK (abridged)
• Detected 17 vertical lowvariance gutters ⇒ 16 content blocks.
• Verified no duplicate crops; two blocks are composites but count as 1 each because photos overlap with no gutter.
• No NSFW elements (fashion poses, fully clothed).
PUBLIC OUTPUT
{
"panel_count": 16,
"panel_confidence": "high",
"panel_analysis": "Identified 16 distinct image tiles separated by visible white gutters; two wide tiles are multiphoto composites but have no gutters so each treated as one panel. All panels show fully clothed fashion models.",
"panel_descriptions": [
"Two female models in brown gown & cream slip, 'SPRING' text",
"Solo model in black oversized coat + brown skirt, red H&M logo",
"Fullbody shot: peach maxi dress with tote bag",
"Fullbody shot: brown coat, black boots",
"Composite: three models in brown/peach plus two in cream suits, 'SPRING' overlay",
"Two female models leaning, matching cream flared suits, red H&M",
"Closeup portrait of two women, heads touching",
"Two women embracing, neutral slip & cream jacket",
"Model in black leather jacket & white shorts, 'SPRING' text",
"Model in black bomber jacket & white shorts",
"Model in cream embellished cardigan & flared trousers, red H&M",
"Seated model in oversized white shirt",
"Two models in white outfits, playful pose",
"Wide triptych: (a) two models white/yellow mini + 'SPRING', (b) B&W shirt pose, red H&M, (c) closeup couple",
"Composite: left pair in cream tunics, right pair trench + black mini, 'SPRING'",
"Two models tan trench & black dress red H&M logo"
],
"censorship_flags": [
"clean","clean","clean","clean",
"clean","clean","clean","clean",
"clean","clean","clean","clean",
"clean","clean","clean","clean"
],
"is_censored": false,
"censorship_confidence": "high",
"censorship_analysis": "All panels show fully clothed fashion models with appropriate coverage for retail advertising"
}
END OF EXAMPLE"""
# Make API call
max_retries = 3
for attempt in range(max_retries):
try:
response = self.client.chat.completions.create(
model="o3",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{layout_b64}",
"detail": "high"
}
}
]
}
],
max_completion_tokens=10000
)
# Track cost for this API call
if hasattr(response, 'usage') and response.usage:
token_usage = extract_token_usage_from_response(response)
cost_calculator.track_api_call(
operation_type="panel_counting_censorship",
prompt_tokens=token_usage.prompt_tokens,
completion_tokens=token_usage.completion_tokens,
cached_tokens=token_usage.cached_tokens,
layout_name=layout_name
)
response_text = response.choices[0].message.content.strip()
print(f" → Received combined analysis response from OpenAI o3")
# Extract JSON from response
try:
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No JSON found in response")
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate and normalize panel count fields
if 'panel_count' not in result:
result['panel_count'] = 1 # Default to single panel
if 'panel_confidence' not in result:
result['panel_confidence'] = 'unknown'
if 'panel_analysis' not in result:
result['panel_analysis'] = response_text
# Validate and normalize censorship fields
if 'is_censored' not in result:
result['is_censored'] = True # Default to censored if unclear
if 'censorship_confidence' not in result:
result['censorship_confidence'] = 'unknown'
if 'censorship_analysis' not in result:
result['censorship_analysis'] = response_text
# Ensure panel_count is a positive integer
try:
result['panel_count'] = max(1, int(result['panel_count']))
except (ValueError, TypeError):
result['panel_count'] = 1
# Ensure is_censored is a boolean
if isinstance(result['is_censored'], str):
result['is_censored'] = result['is_censored'].lower() in ['true', '1', 'yes']
print(f" → Combined analysis successful: {result['panel_count']} panels (confidence: {result.get('panel_confidence')}), censored: {result['is_censored']} (confidence: {result.get('censorship_confidence')})")
return result
except json.JSONDecodeError as e:
print(f" → JSON parsing failed: {e}")
if attempt == max_retries - 1:
return {
'panel_count': 1, # Default to single panel
'panel_confidence': 'unknown',
'panel_analysis': response_text,
'is_censored': True, # Default to censored
'censorship_confidence': 'unknown',
'censorship_analysis': response_text,
'error': f'JSON decode error: {e}'
}
except Exception as e:
if attempt == max_retries - 1:
print(f" → Error in combined analysis: {e}")
return {
'panel_count': 1, # Default to single panel
'panel_confidence': 'unknown',
'panel_analysis': '',
'is_censored': True, # Default to censored
'censorship_confidence': 'unknown',
'censorship_analysis': '',
'error': str(e)
}
time.sleep((2 ** attempt) * 0.5)
except Exception as e:
print(f" → Error in combined analysis: {e}")
return {
'panel_count': 1, # Default to single panel
'panel_confidence': 'unknown',
'panel_analysis': '',
'is_censored': True, # Default to censored
'censorship_confidence': 'unknown',
'censorship_analysis': '',
'error': str(e)
}
def count_panels_in_layout(self, layout_path: str) -> Dict:
"""Legacy compatibility method for panel counting only"""
combined_result = self.count_panels_and_detect_censorship(layout_path)
# Convert to old format for backward compatibility
return {
'panel_count': combined_result.get('panel_count', 1),
'confidence': combined_result.get('panel_confidence', 'unknown'),
'analysis': combined_result.get('panel_analysis', ''),
'panel_descriptions': combined_result.get('panel_descriptions', []),
'error': combined_result.get('error', None)
}
def calculate_inliers_for_match(self, layout_path: str, master_path: str, master_id: str) -> Dict:
"""Calculate inlier count for a master image match using OpenCV feature matching"""
try:
# Read images in grayscale for feature detection
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
master_img = cv2.imread(master_path, cv2.IMREAD_GRAYSCALE)
if layout_img is None or master_img is None:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'error': 'Could not read one or both images'
}
# Initialize feature detector and matcher (using same approach as example code)
akaze = cv2.AKAZE_create()
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
# Detect keypoints and descriptors
kp1, des1 = akaze.detectAndCompute(layout_img, None)
kp2, des2 = akaze.detectAndCompute(master_img, None)
if des1 is None or des2 is None:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'error': 'No features detected in one or both images'
}
# Match features using k-nearest neighbors
matches = bf.knnMatch(des1, des2, k=2)
# Apply Lowe's ratio test to filter good matches
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.75 * n.distance:
good_matches.append(m)
min_good_matches = 10 # Same threshold as example code
if len(good_matches) < min_good_matches:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'good_matches': len(good_matches),
'reason': f'Insufficient good matches: {len(good_matches)} < {min_good_matches}'
}
# Extract matched points
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
# Find homography using RANSAC
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
if mask is None:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'good_matches': len(good_matches),
'error': 'Homography estimation failed'
}
# Count inliers
inliers = int(np.sum(mask))
# Determine confidence based on inlier count and ratio
inlier_ratio = inliers / len(good_matches)
if inliers >= 50 and inlier_ratio >= 0.6:
confidence = 'high'
elif inliers >= 20 and inlier_ratio >= 0.4:
confidence = 'medium'
else:
confidence = 'low'
return {
'master_id': master_id,
'inliers': inliers,
'confidence': confidence,
'good_matches': len(good_matches),
'inlier_ratio': round(inlier_ratio, 3),
'total_features_layout': len(kp1),
'total_features_master': len(kp2)
}
except Exception as e:
return {
'master_id': master_id,
'inliers': 0,
'confidence': 'low',
'error': str(e)
}
def refine_matches_by_panel_count(self, layout_path: str, detected_masters: List[str], panel_count: int) -> Dict:
"""Refine detected masters based on panel count using inlier analysis"""
layout_name = Path(layout_path).name
# First, deduplicate the detected masters to avoid processing the same master multiple times
original_count = len(detected_masters)
detected_masters = self.deduplicate_master_matches(detected_masters)
if len(detected_masters) != original_count:
duplicates_removed = original_count - len(detected_masters)
print(f" Removed {duplicates_removed} duplicate master(s) before panel-aware refinement")
# Optimization: If panel count equals detected masters count, skip refinement
if panel_count == len(detected_masters):
print(f" Panel count ({panel_count}) matches detected masters count ({len(detected_masters)}) - skipping refinement")
return {
'refined_masters': detected_masters,
'refinement_applied': False,
'reason': 'panel_count_matches_detected_count',
'panel_count': panel_count,
'original_count': len(detected_masters),
'final_count': len(detected_masters)
}
# Only refine if we have more detected masters than panels
if len(detected_masters) <= panel_count:
print(f" Detected masters ({len(detected_masters)}) <= panel count ({panel_count}) - no refinement needed")
return {
'refined_masters': detected_masters,
'refinement_applied': False,
'reason': 'detected_count_within_panel_limit',
'panel_count': panel_count,
'original_count': len(detected_masters),
'final_count': len(detected_masters)
}
print(f" Refining {len(detected_masters)} masters to best {panel_count} using inlier analysis...")
# Calculate inliers for each detected master
inlier_results = []
for i, master_id in enumerate(detected_masters):
master_path = self.master_images[master_id]
print(f" → Analyzing {i+1}/{len(detected_masters)}: {master_id}")
inlier_result = self.calculate_inliers_for_match(layout_path, master_path, master_id)
inlier_results.append(inlier_result)
inliers = inlier_result.get('inliers', 0)
confidence = inlier_result.get('confidence', 'unknown')
print(f"{master_id}: {inliers} inliers (confidence: {confidence})")
# Sort by inlier count (descending) to get best matches
inlier_results.sort(key=lambda x: x.get('inliers', 0), reverse=True)
# Select top N matches where N = panel_count
refined_masters = [result['master_id'] for result in inlier_results[:panel_count]]
print(f" Refinement complete: Selected top {len(refined_masters)} masters based on inlier analysis")
# Log the selection details
for i, result in enumerate(inlier_results[:panel_count]):
rank = i + 1
master_id = result['master_id']
inliers = result.get('inliers', 0)
confidence = result.get('confidence', 'unknown')
print(f" → Rank {rank}: {master_id} ({inliers} inliers, {confidence} confidence)")
return {
'refined_masters': refined_masters,
'refinement_applied': True,
'reason': 'inlier_based_selection',
'panel_count': panel_count,
'original_count': len(detected_masters),
'final_count': len(refined_masters),
'inlier_analysis': inlier_results,
'selection_details': {
'method': 'highest_inlier_count',
'selected_masters': [
{
'rank': i+1,
'master_id': result['master_id'],
'inliers': result.get('inliers', 0),
'confidence': result.get('confidence', 'unknown')
}
for i, result in enumerate(inlier_results[:panel_count])
]
}
}