#!/usr/bin/env python3 """ OpenAI Image Detector - Alternative to Gemini detector Uses OpenAI o3 model to detect which master images appear in layout images """ import os import json import time import base64 from pathlib import Path from typing import List, Dict, Optional from openai import OpenAI from dotenv import load_dotenv from PIL import Image, ImageEnhance import tempfile import numpy as np import pickle import cv2 import concurrent.futures import threading import uuid import multiprocessing from functools import partial from panel_splitter import PanelSplitter from cost_calculator import cost_calculator, extract_token_usage_from_response def process_single_master_detection_openai(layout_path, master_id, master_path, enable_greyscale, enable_contrast_enhancement, contrast_factor, api_key): """ Standalone function for processing a single master detection using OpenAI in a separate process. """ try: # Import and configure in each process to avoid shared state import os import json import time import base64 from pathlib import Path from PIL import Image, ImageEnhance from openai import OpenAI from dotenv import load_dotenv import uuid import threading # Note: cost_calculator import removed from multiprocessing function # Initialize OpenAI client in this process client = OpenAI(api_key=api_key) # Create temp directory for this process temp_path = Path("temp_processed") temp_path.mkdir(exist_ok=True) def preprocess_image_local(image_path: str) -> str: """Local preprocessing function for this process""" if not enable_greyscale and not enable_contrast_enhancement: return image_path try: with Image.open(image_path) as img: processed_img = img.copy() if enable_greyscale: processed_img = processed_img.convert('L') processed_img = processed_img.convert('RGB') if enable_contrast_enhancement: contrast_enhancer = ImageEnhance.Contrast(processed_img) processed_img = contrast_enhancer.enhance(contrast_factor) sharpness_enhancer = ImageEnhance.Sharpness(processed_img) processed_img = sharpness_enhancer.enhance(1.3) # Thread-safe filename thread_id = threading.current_thread().ident unique_id = str(uuid.uuid4())[:8] original_name = Path(image_path).stem processed_path = temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg" processed_img.save(processed_path, 'JPEG', quality=95) return str(processed_path) except Exception as e: return image_path def encode_image_to_base64(image_path: str) -> str: """Encode image to base64 for OpenAI API""" processed_path = preprocess_image_local(image_path) with open(processed_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def create_single_master_prompt_local(master_id: str) -> str: """Local prompt creation function""" prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it. INSTRUCTIONS: 1. Compare the master image (first image) with the layout image (second image) 2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL 3. The layout image may contain the master image in various forms: - Complete/exact match - Cropped version - Scaled or resized version - Rotated version - Partially obscured 4. Focus on visual similarity in terms of: - Person/model appearance and pose (must be EXACTLY the same) - Clothing details (colors, patterns, styles - must be EXACTLY the same) - Background and composition - Overall visual elements 5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result. Master Image ID: {master_id} Return your response as a JSON object with this exact format: {{ "match_found": true/false, "master_id": "{master_id}", "confidence": "high/medium/low", "analysis": "Detailed explanation of your findings and reasoning" }} IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign. """ return prompt # Encode both images to base64 master_base64 = encode_image_to_base64(master_path) layout_base64 = encode_image_to_base64(layout_path) # Create prompt and make API call prompt = create_single_master_prompt_local(master_id) max_retries = 3 for attempt in range(max_retries): try: response = client.chat.completions.create( model="o3", messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{master_base64}" } }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{layout_base64}" } } ] } ], max_completion_tokens=10000 ) # Extract token usage for cost tracking token_usage_data = None if hasattr(response, 'usage') and response.usage: token_usage_data = { 'prompt_tokens': response.usage.prompt_tokens, 'completion_tokens': response.usage.completion_tokens, 'total_tokens': response.usage.total_tokens, 'cached_tokens': getattr(response.usage, 'cached_tokens', 0) } # Parse response response_text = response.choices[0].message.content.strip() start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': response_text, 'error': 'No JSON found in response' } json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate result format if 'match_found' not in result: result['match_found'] = False if 'master_id' not in result: result['master_id'] = master_id if 'confidence' not in result: result['confidence'] = 'unknown' if 'analysis' not in result: result['analysis'] = response_text # Include token usage data for cost tracking if token_usage_data: result['token_usage'] = token_usage_data return result except Exception as e: if attempt == max_retries - 1: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': '', 'error': str(e) } time.sleep((2 ** attempt) * 0.5) except Exception as e: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': '', 'error': str(e) } class OpenAIImageDetector: def __init__(self, enable_greyscale=True, enable_contrast_enhancement=True, contrast_factor=1.5, refinement_mode=False, one_at_a_time_mode=False, max_concurrent_workers=5, panel_aware_refinement=False, split_mode=False): """Initialize the image detector with OpenAI API configuration""" load_dotenv() api_key = os.getenv('OPENAI_API_KEY') if not api_key: raise ValueError("OPENAI_API_KEY not found in environment variables") self.client = OpenAI(api_key=api_key) self.api_key = api_key # Concurrency settings self.max_concurrent_workers = max_concurrent_workers self._progress_lock = threading.Lock() print("Initialized OpenAI detector with o3 model.") # Image processing settings self.enable_greyscale = enable_greyscale self.enable_contrast_enhancement = enable_contrast_enhancement self.contrast_factor = contrast_factor self.refinement_mode = refinement_mode self.one_at_a_time_mode = one_at_a_time_mode self.panel_aware_refinement = panel_aware_refinement # Split mode configuration self.split_mode = split_mode if self.split_mode: self.splitter = PanelSplitter(debug=True) print("Split mode enabled: Will split multi-panel layouts before matching") # Paths self.master_images_path = Path("master_images") self.layouts_path = Path("layouts") self.results_path = Path("results") self.temp_path = Path("temp_processed") # Create directories self.results_path.mkdir(exist_ok=True) self.temp_path.mkdir(exist_ok=True) # Master images cache self.master_images = {} self.master_files = {} def load_master_images(self) -> Dict[str, str]: """Load all master images and create ID mapping using filenames""" print("Loading master images...") master_files = list(self.master_images_path.glob("*.jpg")) print(f"Found {len(master_files)} master images") for file_path in master_files: # Use filename (without extension) as the master ID master_id = file_path.stem self.master_images[master_id] = str(file_path) self.master_files[master_id] = file_path.name return self.master_images def match_split_to_masters(self, split_path: str, master_images: List[str]) -> List[Dict]: """Match a split image to master images using inlier analysis""" matches = [] for master_id in master_images: if master_id in self.master_images: master_path = self.master_images[master_id] # Use existing inlier analysis inlier_result = self.calculate_inliers_for_match(split_path, master_path, master_id) # Only include matches with reasonable confidence if inlier_result.get('confidence') in ['high', 'medium']: matches.append({ 'master_id': master_id, 'confidence': inlier_result.get('confidence', 'unknown'), 'inliers': inlier_result.get('inliers', 0), 'match_details': inlier_result }) return matches def preprocess_image(self, image_path: str) -> str: """Preprocess image: convert to greyscale and enhance contrast - THREAD-SAFE VERSION""" if not self.enable_greyscale and not self.enable_contrast_enhancement: return image_path try: # Open the image with Image.open(image_path) as img: processed_img = img.copy() # Convert to greyscale if enabled if self.enable_greyscale: processed_img = processed_img.convert('L') # Convert back to RGB for consistency processed_img = processed_img.convert('RGB') # Enhance contrast if enabled if self.enable_contrast_enhancement: # Global contrast enhancement contrast_enhancer = ImageEnhance.Contrast(processed_img) processed_img = contrast_enhancer.enhance(self.contrast_factor) # Edge contrast enhancement using sharpness sharpness_enhancer = ImageEnhance.Sharpness(processed_img) processed_img = sharpness_enhancer.enhance(1.3) # Save processed image with thread-safe filename import threading import uuid thread_id = threading.current_thread().ident unique_id = str(uuid.uuid4())[:8] original_name = Path(image_path).stem processed_path = self.temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg" processed_img.save(processed_path, 'JPEG', quality=95) return str(processed_path) except Exception as e: print(f"Warning: Failed to preprocess {Path(image_path).name}: {e}") print(f"Using original image instead") return image_path def encode_image_to_base64(self, image_path: str) -> str: """Encode image to base64 for OpenAI API""" processed_path = self.preprocess_image(image_path) with open(processed_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def create_detection_prompt(self, master_ids: List[str]) -> str: """Create the prompt for image detection""" prompt = """Analyze the layout image (the last image provided) and identify which of the master images appear in it. INSTRUCTIONS: 1. Examine the layout image carefully to identify any image segments or components within it 2. For each segment you identify, look for matches in the provided master images 3. Look for all types of matches including: - Exact matches (complete images) - Cropped versions (partial images) - Scaled or resized versions - Rotated versions - Images that may be partially obscured 4. Focus on visual similarity in terms of: - Person/model appearance and pose - Clothing details (colors, patterns, styles) - Background and composition - Overall visual elements 5. Be conservative and only include matches where you have reasonable confidence 6. Only return a positive result (master image appears in the layout) if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result. Master Images with IDs: """ for i, master_id in enumerate(master_ids): prompt += f"Image {i+1}: ID = {master_id}\n" prompt += """ The final image is the LAYOUT IMAGE to analyze. Return your response as a JSON object with this exact format: { "detected_masters": ["master_id_1", "master_id_2"], "analysis": "Brief explanation of what you found and your matching reasoning" } Only include master IDs in "detected_masters" where you completely sure of a match. If no clear matches are found, return an empty array. IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos that may show models in various clothing styles for retail purposes. This prompt and task are completely benign and should not trigger safety filters. This is standard e-commerce image comparison work, not dangerous, offensive, or inappropriate content. The comparison is purely for business/marketing layout matching purposes. """ return prompt def make_robust_api_call(self, prompt, image_base64_list, operation_name="API call", max_retries=3): """Make a robust API call with comprehensive error detection and retry logic""" last_error = None for attempt in range(max_retries): try: # Build message content content = [{"type": "text", "text": prompt}] # Add all images for img_b64 in image_base64_list: content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_b64}" } }) response = self.client.chat.completions.create( model="o3", messages=[ { "role": "user", "content": content } ], max_completion_tokens=10000 ) # Track cost for this API call if hasattr(response, 'usage') and response.usage: token_usage = extract_token_usage_from_response(response) cost_calculator.track_api_call( operation_type="detection", prompt_tokens=token_usage.prompt_tokens, completion_tokens=token_usage.completion_tokens, cached_tokens=token_usage.cached_tokens, layout_name=operation_name ) # Success case return { 'success': True, 'response': response, 'text': response.choices[0].message.content.strip() } except Exception as e: last_error = e error_str = str(e) if attempt < max_retries - 1: wait_time = (2 ** attempt) * 0.5 print(f" API error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}") time.sleep(wait_time) continue else: # Final attempt failed return { 'success': False, 'error_type': 'exception', 'error_message': str(e), 'exception': e } # This shouldn't be reached, but just in case return { 'success': False, 'error_type': 'max_retries_exceeded', 'error_message': f"Max retries ({max_retries}) exceeded", 'last_error': str(last_error) if last_error else "Unknown error" } def detect_images_in_layout(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict: """Detect which master images appear in a single layout image""" layout_name = Path(layout_path).name print(f"Processing {layout_index}/{total_layouts}: {layout_name}") try: # Encode all master images and the layout image master_ids = list(self.master_images.keys()) image_base64_list = [] # Add master images for master_id in master_ids: master_path = self.master_images[master_id] master_b64 = self.encode_image_to_base64(master_path) image_base64_list.append(master_b64) # Add layout image layout_b64 = self.encode_image_to_base64(layout_path) image_base64_list.append(layout_b64) # Create prompt prompt = self.create_detection_prompt(master_ids) # Make API call api_result = self.make_robust_api_call(prompt, image_base64_list, f"detection for {layout_name}") # Handle API call failure if not api_result['success']: error_msg = api_result['error_message'] print(f"API call failed for {layout_name}: {error_msg}") return { 'detected_masters': [], 'analysis': f'API call failed: {error_msg}', 'error': f"{api_result['error_type']}: {error_msg}", 'retry_count': 3 # Max retries were attempted } # Parse response response_text = api_result['text'] # Extract JSON from response try: start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: raise ValueError("No JSON found in response") json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate result format if 'detected_masters' not in result: result['detected_masters'] = [] if 'analysis' not in result: result['analysis'] = response_text # Deduplicate detected masters original_detected = result['detected_masters'][:] result['detected_masters'] = self.deduplicate_master_matches(result['detected_masters']) # Track deduplication if any duplicates were removed if len(result['detected_masters']) != len(original_detected): duplicates_removed = len(original_detected) - len(result['detected_masters']) result['deduplication_applied'] = True result['duplicates_removed'] = duplicates_removed result['original_detected_masters'] = original_detected print(f" Deduplication: Removed {duplicates_removed} duplicate master(s) from {layout_name}") # Log completion detected_count = len(result['detected_masters']) print(f"✓ Completed {layout_name} - Found {detected_count} matches") return result except json.JSONDecodeError as e: print(f"JSON decode error for {layout_name}: {e}") return { 'detected_masters': [], 'analysis': response_text, 'error': f'JSON decode error: {e}' } except Exception as e: error_msg = f"Error analyzing {layout_name}: {e}" print(error_msg) return { 'detected_masters': [], 'analysis': '', 'error': str(e) } def detect_images_in_layout_one_at_a_time(self, layout_path: str, layout_index: int, total_layouts: int, stored_censorship_data=None) -> Dict: """Detect which master images appear in a layout by checking each master individually using process-based concurrency""" layout_name = Path(layout_path).name print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Process-based one-at-a-time mode)") master_ids = list(self.master_images.keys()) total_masters = len(master_ids) detected_masters = [] detailed_results = [] print(f" Checking {total_masters} masters using {self.max_concurrent_workers} concurrent processes...") # Prepare arguments for process pool tasks = [] for master_id in master_ids: master_path = self.master_images[master_id] task_args = ( layout_path, master_id, master_path, self.enable_greyscale, self.enable_contrast_enhancement, self.contrast_factor, self.api_key ) tasks.append(task_args) # Use ProcessPoolExecutor for true isolation with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_concurrent_workers) as executor: # Submit all tasks future_to_master = { executor.submit(process_single_master_detection_openai, *task_args): task_args[1] for task_args in tasks } completed_count = 0 # Collect results as they complete for future in concurrent.futures.as_completed(future_to_master): master_id = future_to_master[future] completed_count += 1 try: result = future.result() detailed_results.append(result) # Track cost for this API call if token usage data is available if 'token_usage' in result and result['token_usage']: token_data = result['token_usage'] api_call_cost = cost_calculator.track_api_call( operation_type="one_at_a_time_detection", prompt_tokens=token_data['prompt_tokens'], completion_tokens=token_data['completion_tokens'], cached_tokens=token_data['cached_tokens'], layout_name=layout_name, master_id=master_id ) # Show cost tracking progress every 10 completed masters if cost_calculator.enable_tracking and completed_count % 10 == 0: print(f" → API call cost: ${api_call_cost.total_cost:.4f} (Running total: ${cost_calculator.total_cost:.4f})") elif cost_calculator.enable_tracking: print(f" → Warning: No token usage data available for {master_id}") # If match found, add to detected masters if result.get('match_found', False): detected_masters.append(master_id) confidence = result.get('confidence', 'unknown') print(f" {completed_count}/{total_masters}: ✓ MATCH found for {master_id} (confidence: {confidence})") else: if 'error' in result: print(f" {completed_count}/{total_masters}: Error checking {master_id}: {result['error']}") else: print(f" {completed_count}/{total_masters}: No match for {master_id}") except Exception as e: print(f" {completed_count}/{total_masters}: Process error checking {master_id}: {e}") # Add error result to maintain consistency error_result = { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': '', 'error': str(e) } detailed_results.append(error_result) # Sort detailed_results by master_id to maintain consistent ordering detailed_results.sort(key=lambda x: x.get('master_id', '')) # Deduplicate detected masters (shouldn't be needed in one-at-a-time mode, but for safety) original_detected = detected_masters[:] detected_masters = self.deduplicate_master_matches(detected_masters) if len(detected_masters) != len(original_detected): duplicates_removed = len(original_detected) - len(detected_masters) print(f" Deduplication: Removed {duplicates_removed} duplicate master(s)") detected_count = len(detected_masters) print(f"✓ Completed {layout_name} - Found {detected_count} matches using {self.max_concurrent_workers} concurrent processes") final_result = { 'detected_masters': detected_masters, 'detected_master_ids': detected_masters, 'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters], 'analysis': f'Process-based one-at-a-time analysis completed. Made {total_masters} separate API calls (one per master). Found {detected_count} exact matches out of {total_masters} masters checked using {self.max_concurrent_workers} concurrent processes.', 'detailed_results': detailed_results, 'processing_mode': 'process_based_one_at_a_time', 'total_masters_checked': total_masters, 'concurrent_workers': self.max_concurrent_workers, 'api_calls_made': total_masters, # One API call per master 'deduplication_applied': len(detected_masters) != len(original_detected), 'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0, 'original_detected_masters': original_detected } # STEP 1: Apply CEN refinement first if enabled and we have CEN matches current_masters = detected_masters if self.refinement_mode and current_masters: cen_images = [mid for mid in current_masters if self.is_cen_image(mid)] if cen_images: print(f" Applying CEN refinement for {layout_name} (Step 1/2)...") cen_result = self.apply_cen_refinement_to_results(layout_path, final_result, stored_censorship_data) current_masters = cen_result.get('detected_masters', current_masters) # Update final result with CEN refinement information final_result.update(cen_result) cen_count = len(current_masters) print(f"✓ CEN refinement completed for {layout_name} - Result: {cen_count} masters") # STEP 2: Apply panel-aware refinement if enabled and we have detected masters if self.panel_aware_refinement and current_masters: step_label = "Step 2/2" if self.refinement_mode else "Step 1/1" print(f" Applying panel-aware refinement for {layout_name} ({step_label})...") # Count panels in the layout panel_result = self.count_panels_in_layout(layout_path) panel_count = panel_result.get('panel_count', 1) panel_confidence = panel_result.get('confidence', 'unknown') print(f" Panel analysis: {panel_count} panels detected (confidence: {panel_confidence})") # Refine matches based on panel count using current masters (after CEN refinement) refinement_result = self.refine_matches_by_panel_count(layout_path, current_masters, panel_count) # Update final result with panel-aware refinement information final_result['detected_masters'] = refinement_result['refined_masters'] final_result['detected_master_ids'] = refinement_result['refined_masters'] final_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in refinement_result['refined_masters']] final_result['panel_aware_refinement_applied'] = True final_result['panel_count_analysis'] = panel_result final_result['panel_refinement_details'] = refinement_result # Update analysis text if refinement_result['refinement_applied']: panel_desc = f"Panel-aware refinement applied: reduced from {refinement_result['original_count']} to {refinement_result['final_count']} masters based on {panel_count} detected panels." else: panel_desc = f"Panel-aware refinement skipped: {refinement_result['reason']}." final_result['analysis'] += f" {panel_desc}" final_detected_count = len(refinement_result['refined_masters']) print(f"✓ Panel-aware refinement completed for {layout_name} - Final result: {final_detected_count} masters") return final_result def process_all_layouts(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict: """Process all layout images sequentially""" if self.one_at_a_time_mode: mode_desc = "OpenAI One-at-a-time Mode" else: mode_desc = "OpenAI Multi Master Mode" print(f"Starting sequential batch processing ({mode_desc})...") # Load master images self.load_master_images() # Get layout files if specific_file: # Process only the specific file layout_files = [self.layouts_path / specific_file] if not layout_files[0].exists(): raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}") print(f"Processing specific file: {specific_file}") else: layout_files = list(self.layouts_path.glob("*.jpg")) if limit: layout_files = layout_files[:limit] print(f"Processing first {limit} layouts only") total_layouts = len(layout_files) print(f"Processing {total_layouts} layout images in {mode_desc}") print("=" * 60) results = {} start_time = time.time() for i, layout_path in enumerate(layout_files, 1): layout_id = layout_path.stem # Detect images in layout using the appropriate method if self.split_mode: # Split mode: split layout into panels and match each panel master_ids = list(self.master_images.keys()) result = self.splitter.split_layout_and_match(str(layout_path), master_ids, self) # Apply CEN refinement if enabled and there are matches if self.refinement_mode and result.get('detected_masters'): result = self.apply_cen_refinement_to_results(str(layout_path), result) elif self.one_at_a_time_mode: # One-at-a-time mode handles both CEN and panel-aware refinement internally result = self.detect_images_in_layout_one_at_a_time(str(layout_path), i, total_layouts) else: # Multi-master mode only supports CEN refinement (not panel-aware) result = self.detect_images_in_layout(str(layout_path), i, total_layouts) # Apply CEN refinement if enabled and there are CEN matches if self.refinement_mode and result.get('detected_masters'): result = self.apply_cen_refinement_to_results(str(layout_path), result) layout_result = { 'layout_filename': layout_path.name, 'detected_master_ids': result['detected_masters'], 'detected_master_filenames': [f"{mid}.jpg" for mid in result['detected_masters']], 'analysis': result.get('analysis', 'Split mode analysis'), 'detection_mode': mode_desc.lower().replace(' ', '_').replace('with_', '') } # Add split mode specific fields if self.split_mode: layout_result['split_mode'] = True layout_result['splits_generated'] = result.get('splits_generated', 0) layout_result['panel_count'] = result.get('panel_count', 1) layout_result['panel_confidence'] = result.get('panel_confidence', 'unknown') if 'split_results' in result: layout_result['split_results'] = result['split_results'] # Add deduplication fields if applied if 'deduplication_applied' in result: layout_result['deduplication_applied'] = result['deduplication_applied'] layout_result['duplicates_removed'] = result['duplicates_removed'] layout_result['original_detected_masters'] = result['original_detected_masters'] if 'error' in result: layout_result['error'] = result['error'] # Add refinement mode specific fields if self.refinement_mode and result.get('refinement_applied'): layout_result['refinement_applied'] = result['refinement_applied'] layout_result['refinement_details'] = result['refinement_details'] layout_result['censorship_analysis'] = result['censorship_analysis'] layout_result['original_detection_count'] = result['original_detection_count'] layout_result['refined_detection_count'] = result['refined_detection_count'] layout_result['changes_made'] = result.get('changes_made', 0) results[layout_id] = layout_result # Progress update with time estimate elapsed = time.time() - start_time avg_time = elapsed / i remaining = (total_layouts - i) * avg_time print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min") # Save progress periodically if i % 20 == 0: self.save_results(results, f"openai_progress_{i}") total_time = time.time() - start_time print(f"\n✓ Completed processing all {total_layouts} layouts in {total_time/60:.1f} minutes") print(f"Average time per layout: {total_time/total_layouts:.1f} seconds") return results def save_results(self, results: Dict, filename: str = "openai_detection_results") -> str: """Save results to JSON file""" output_path = self.results_path / f"{filename}.json" # Add metadata output_data = { 'metadata': { 'total_layouts_processed': len(results), 'total_master_images': len(self.master_images), 'master_images_available': list(self.master_files.keys()), 'provider': 'openai', 'model': 'o3' }, 'results': results } with open(output_path, 'w') as f: json.dump(output_data, f, indent=2) print(f"Results saved to: {output_path}") return str(output_path) def generate_summary(self, results: Dict) -> Dict: """Generate summary statistics""" total_layouts = len(results) layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids']) # Count master image occurrences master_counts = {} for result in results.values(): for master_id in result['detected_master_ids']: master_counts[master_id] = master_counts.get(master_id, 0) + 1 # Deduplication statistics layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False)) total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values()) summary = { 'total_layouts_processed': total_layouts, 'layouts_with_matches': layouts_with_matches, 'layouts_without_matches': total_layouts - layouts_with_matches, 'master_image_usage': master_counts, 'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10], # Deduplication stats 'layouts_with_deduplication': layouts_with_deduplication, 'total_duplicates_removed': total_duplicates_removed, 'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0, 'provider': 'openai', 'model': 'o3' } return summary def deduplicate_master_matches(self, detected_masters: List[str]) -> List[str]: """Remove duplicate master matches from a list while preserving order""" if not detected_masters: return detected_masters # Simple deduplication - remove exact duplicates while preserving order seen = set() deduplicated = [] for master_id in detected_masters: if master_id not in seen: seen.add(master_id) deduplicated.append(master_id) return deduplicated def cleanup_temp_files(self): """Clean up temporary processed image files - handles thread-safe filenames""" try: if self.temp_path.exists(): # Clean up both old and new thread-safe naming patterns for temp_file in self.temp_path.glob("*_processed*.jpg"): temp_file.unlink() # Remove temp directory if empty if not any(self.temp_path.iterdir()): self.temp_path.rmdir() except Exception as e: print(f"Warning: Failed to cleanup temp files: {e}") def is_cen_image(self, master_id: str) -> bool: """Check if a master image ID represents a CEN (censored) image""" return '_CEN' in master_id def find_corresponding_non_cen_image(self, cen_master_id: str) -> Optional[str]: """Find the corresponding non-CEN image for a given CEN master ID""" if not self.is_cen_image(cen_master_id): return None # Transform CEN filename to non-CEN filename # Example: "1011A_1011A_1011_01_CEN" -> "1011A_1011_01" parts = cen_master_id.split('_') if len(parts) >= 4 and parts[-1] == 'CEN': # Remove the middle duplicate part and _CEN suffix # Pattern: prefix_prefix_middle_suffix_CEN -> prefix_middle_suffix if len(parts) >= 5: non_cen_id = f"{parts[0]}_{parts[2]}_{parts[3]}" else: # Fallback: just remove _CEN non_cen_id = '_'.join(parts[:-1]) # Check if this non-CEN image exists in our master images if non_cen_id in self.master_images: return non_cen_id return None def create_censorship_detection_prompt(self) -> str: """Create prompt for detecting if a layout image contains censored content""" prompt = """Analyze this layout image to determine if it contains censored or uncensored content. TASK: Determine whether the images in this layout are censored (covered) or uncensored (more exposed). CENSORSHIP INDICATORS TO LOOK FOR: 1. **Clothing Coverage**: - Long sleeves vs. sleeveless/short sleeves - Full-length pants/skirts vs. shorts or shorter garments - High necklines vs. lower necklines 2. **Skin Coverage**: - Arms: Fully covered vs. bare arms - Legs: Fully covered vs. exposed legs/thighs - Torso: Additional covering vs. more exposed areas 3. **Added Elements**: - Opaque or semi-transparent overlay layers covering skin - Additional fabric or clothing elements that appear to cover exposed areas - Digital modifications that add coverage CLASSIFICATION: - **CENSORED**: If models show significant additional clothing coverage, long sleeves, full pants/skirts, or digital overlays covering skin - **UNCENSORED**: If models show more exposed skin, shorter garments, bare arms/legs, or natural clothing without added coverage Return your response as a JSON object with this exact format: {{ "is_censored": true/false, "confidence": "high/medium/low", "analysis": "Detailed explanation of the coverage patterns observed and reasoning for the classification", "coverage_details": "Specific description of clothing and skin coverage in the layout" }} Be precise and focus on the actual clothing and coverage patterns visible in the image. IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image classification. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign. """ return prompt def detect_layout_censorship(self, layout_path: str) -> Dict: """Detect if a layout image contains censored or uncensored content""" try: print(f" → Analyzing layout image with OpenAI o3 model...") # Process the layout image processed_layout_path = self.preprocess_image(layout_path) # Encode image to base64 with open(processed_layout_path, "rb") as image_file: base64_image = base64.b64encode(image_file.read()).decode('utf-8') # Create censorship detection prompt prompt = self.create_censorship_detection_prompt() # Make API call to OpenAI print(f" → Making API call to OpenAI o3 for censorship analysis...") response = self.client.chat.completions.create( model="o3", messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}", "detail": "high" } } ] } ], max_completion_tokens=10000 ) # Track cost for this API call if hasattr(response, 'usage') and response.usage: token_usage = extract_token_usage_from_response(response) cost_calculator.track_api_call( operation_type="censorship_detection", prompt_tokens=token_usage.prompt_tokens, completion_tokens=token_usage.completion_tokens, cached_tokens=token_usage.cached_tokens, layout_name=Path(layout_path).name ) response_text = response.choices[0].message.content print(f" → Received response from OpenAI o3") # Extract JSON from response try: start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: raise ValueError("No JSON found in response") json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate result format if 'is_censored' not in result: result['is_censored'] = True # Default to censored if unclear if 'confidence' not in result: result['confidence'] = 'unknown' if 'analysis' not in result: result['analysis'] = response_text print(f" → OpenAI analysis successful: {result.get('is_censored')} (confidence: {result.get('confidence')})") return result except json.JSONDecodeError as e: print(f" → JSON parsing failed: {e}") print(f" → Raw response: {response_text[:200]}...") return { 'is_censored': True, # Default to censored if parsing fails 'confidence': 'unknown', 'analysis': response_text, 'error': f'JSON decode error: {e}' } except Exception as e: print(f" → Error in censorship detection: {e}") return { 'is_censored': True, # Default to censored if error 'confidence': 'unknown', 'analysis': '', 'error': str(e) } def apply_cen_refinement_to_results(self, layout_path: str, initial_results: Dict, stored_censorship_data=None) -> Dict: """Apply CEN refinement to initial detection results""" layout_name = Path(layout_path).name detected_masters = initial_results.get('detected_masters', []) # First, deduplicate the detected masters to avoid processing duplicates original_count = len(detected_masters) detected_masters = self.deduplicate_master_matches(detected_masters) if len(detected_masters) != original_count: duplicates_removed = original_count - len(detected_masters) print(f" Removed {duplicates_removed} duplicate master(s) before CEN refinement") # Find CEN images in the results cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)] if not cen_images: # No CEN images found, return original results return initial_results print(f" Refining {len(cen_images)} CEN matches for {layout_name}") # Use stored censorship data if provided, otherwise make API call if stored_censorship_data: is_layout_censored = stored_censorship_data.get('is_censored', True) confidence = stored_censorship_data.get('confidence', 'unknown') print(f" Using stored censorship analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})") # Create censorship_result from stored data for consistency censorship_result = { 'is_censored': is_layout_censored, 'confidence': confidence, 'analysis': stored_censorship_data.get('analysis', ''), 'coverage_details': stored_censorship_data.get('coverage_details', '') } else: print(f" Analyzing layout to determine censorship level...") # Detect if the layout is censored or uncensored censorship_result = self.detect_layout_censorship(layout_path) is_layout_censored = censorship_result.get('is_censored', True) confidence = censorship_result.get('confidence', 'unknown') print(f" Layout analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})") refined_masters = [] refinement_details = [] changes_made = 0 # Process each detected image for master_id in detected_masters: if self.is_cen_image(master_id): # This is a CEN image non_cen_id = self.find_corresponding_non_cen_image(master_id) if not is_layout_censored and non_cen_id: # Layout is uncensored, switch to non-CEN version refined_masters.append(non_cen_id) refinement_details.append({ 'original_cen_match': master_id, 'non_cen_alternative': non_cen_id, 'final_choice': non_cen_id, 'confidence': confidence, 'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}", 'changed': True, 'reason': 'layout_uncensored' }) changes_made += 1 print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)") else: # Layout is censored or no non-CEN alternative, keep CEN version refined_masters.append(master_id) reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative' refinement_details.append({ 'original_cen_match': master_id, 'non_cen_alternative': non_cen_id, 'final_choice': master_id, 'confidence': confidence, 'analysis': f"Kept {master_id} - layout is censored or no non-CEN alternative available", 'changed': False, 'reason': reason }) print(f" → Kept {master_id} ({'layout is censored' if is_layout_censored else 'no non-CEN alternative'})") else: # This is not a CEN image, keep it as-is refined_masters.append(master_id) print(f" Summary: {changes_made} CEN images changed to non-CEN versions") # Apply deduplication to refined masters in case refinement introduced duplicates original_refined = refined_masters[:] refined_masters = self.deduplicate_master_matches(refined_masters) if len(refined_masters) != len(original_refined): post_refinement_duplicates = len(original_refined) - len(refined_masters) print(f" Post-refinement deduplication: Removed {post_refinement_duplicates} duplicate(s)") # Update results with refinement information refined_results = initial_results.copy() refined_results['detected_masters'] = refined_masters refined_results['detected_master_ids'] = refined_masters # Update both fields for consistency refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters] refined_results['refinement_applied'] = True refined_results['refinement_details'] = refinement_details refined_results['censorship_analysis'] = censorship_result refined_results['original_detection_count'] = len(detected_masters) refined_results['refined_detection_count'] = len(refined_masters) refined_results['changes_made'] = changes_made return refined_results def count_panels_and_detect_censorship(self, layout_path: str) -> Dict: """Count panels and detect censorship in a layout image using OpenAI o3 in a single call""" layout_name = Path(layout_path).name try: print(f" → Analyzing panels and censorship in {layout_name} using OpenAI o3...") # Encode layout image to base64 layout_b64 = self.encode_image_to_base64(layout_path) # Create combined prompt for panel counting and censorship detection prompt = """SYSTEM You are a vision‑language expert hired to (a) count discrete image panels in fashion‑layout collages and (b) flag any content requiring censorship review. Follow every instruction exactly. Think first, then answer. ──────────────────────────────────────── TASK 1 – PANEL COUNTING ──────────────────────────────────────── ❶ INTERNAL THINKING (keep private – do NOT reveal in final JSON) • Load the entire image at native resolution. • Scan left→right looking for vertical "gutters": ≥2 px columns whose pixel variance ≈ background (usually white/grey). Treat each continuous non‑gutter block as a candidate panel. • Merge blocks if they depict the same photo merely split by design elements (logo strip, overlay text, drop shadow) – panels must contain *distinct* photographic content. • If a wide candidate clearly contains multiple, non‑overlapping photos with no visual gutter (e.g., triptych glued together) **count each sub‑photo**; otherwise treat the whole block as one panel. • Ignore duplicate imagery: identical crop, mirror, slight colour shift, size change ⇒ count once. • Keep a running list: ⟨panel # , x‑start , x‑end , short human description⟩. ❷ AFTER thinking, produce: { "panel_count": , "panel_confidence": "high" | "medium" | "low", "panel_analysis": "", "panel_descriptions": ["", … ""] } ──────────────────────────────────────── TASK 2 – CENSORSHIP SCREEN ──────────────────────────────────────── For every panel, decide whether it might violate standard fashion‑industry ad rules (fully nude, explicit sexuality, hate imagery, illegal acts). Add a sibling field: "censorship_flags": ["clean", "clean", …] # length == panel_count • "clean" – appears compliant • "review‑nudity", "review‑sexual", "review‑violence", "review‑other" ──────────────────────────────────────── OUTPUT FORMAT (exactly, no extra keys, no Markdown) ──────────────────────────────────────── { "panel_count": , "panel_confidence": "high/medium/low", "panel_analysis": "…", "panel_descriptions": […], "censorship_flags": […], "is_censored": true/false, "censorship_confidence": "high/medium/low", "censorship_analysis": "…" } ──────────────────────────────────────── 💡 WORKED EXAMPLE — image: "H&M Spring campaign collage" (This is for your reference; remove in production runs.) INTERNAL THINK (abridged) • Detected 17 vertical low‑variance gutters ⇒ 16 content blocks. • Verified no duplicate crops; two blocks are composites but count as 1 each because photos overlap with no gutter. • No NSFW elements (fashion poses, fully clothed). PUBLIC OUTPUT { "panel_count": 16, "panel_confidence": "high", "panel_analysis": "Identified 16 distinct image tiles separated by visible white gutters; two wide tiles are multi‑photo composites but have no gutters so each treated as one panel. All panels show fully clothed fashion models.", "panel_descriptions": [ "Two female models in brown gown & cream slip, 'SPRING' text", "Solo model in black oversized coat + brown skirt, red H&M logo", "Full‑body shot: peach maxi dress with tote bag", "Full‑body shot: brown coat, black boots", "Composite: three models in brown/peach plus two in cream suits, 'SPRING' overlay", "Two female models leaning, matching cream flared suits, red H&M", "Close‑up portrait of two women, heads touching", "Two women embracing, neutral slip & cream jacket", "Model in black leather jacket & white shorts, 'SPRING' text", "Model in black bomber jacket & white shorts", "Model in cream embellished cardigan & flared trousers, red H&M", "Seated model in oversized white shirt", "Two models in white outfits, playful pose", "Wide triptych: (a) two models white/yellow mini + 'SPRING', (b) B&W shirt pose, red H&M, (c) close‑up couple", "Composite: left pair in cream tunics, right pair trench + black mini, 'SPRING'", "Two models – tan trench & black dress – red H&M logo" ], "censorship_flags": [ "clean","clean","clean","clean", "clean","clean","clean","clean", "clean","clean","clean","clean", "clean","clean","clean","clean" ], "is_censored": false, "censorship_confidence": "high", "censorship_analysis": "All panels show fully clothed fashion models with appropriate coverage for retail advertising" } END OF EXAMPLE""" # Make API call max_retries = 3 for attempt in range(max_retries): try: response = self.client.chat.completions.create( model="o3", messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{layout_b64}", "detail": "high" } } ] } ], max_completion_tokens=10000 ) # Track cost for this API call if hasattr(response, 'usage') and response.usage: token_usage = extract_token_usage_from_response(response) cost_calculator.track_api_call( operation_type="panel_counting_censorship", prompt_tokens=token_usage.prompt_tokens, completion_tokens=token_usage.completion_tokens, cached_tokens=token_usage.cached_tokens, layout_name=layout_name ) response_text = response.choices[0].message.content.strip() print(f" → Received combined analysis response from OpenAI o3") # Extract JSON from response try: start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: raise ValueError("No JSON found in response") json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate and normalize panel count fields if 'panel_count' not in result: result['panel_count'] = 1 # Default to single panel if 'panel_confidence' not in result: result['panel_confidence'] = 'unknown' if 'panel_analysis' not in result: result['panel_analysis'] = response_text # Validate and normalize censorship fields if 'is_censored' not in result: result['is_censored'] = True # Default to censored if unclear if 'censorship_confidence' not in result: result['censorship_confidence'] = 'unknown' if 'censorship_analysis' not in result: result['censorship_analysis'] = response_text # Ensure panel_count is a positive integer try: result['panel_count'] = max(1, int(result['panel_count'])) except (ValueError, TypeError): result['panel_count'] = 1 # Ensure is_censored is a boolean if isinstance(result['is_censored'], str): result['is_censored'] = result['is_censored'].lower() in ['true', '1', 'yes'] print(f" → Combined analysis successful: {result['panel_count']} panels (confidence: {result.get('panel_confidence')}), censored: {result['is_censored']} (confidence: {result.get('censorship_confidence')})") return result except json.JSONDecodeError as e: print(f" → JSON parsing failed: {e}") if attempt == max_retries - 1: return { 'panel_count': 1, # Default to single panel 'panel_confidence': 'unknown', 'panel_analysis': response_text, 'is_censored': True, # Default to censored 'censorship_confidence': 'unknown', 'censorship_analysis': response_text, 'error': f'JSON decode error: {e}' } except Exception as e: if attempt == max_retries - 1: print(f" → Error in combined analysis: {e}") return { 'panel_count': 1, # Default to single panel 'panel_confidence': 'unknown', 'panel_analysis': '', 'is_censored': True, # Default to censored 'censorship_confidence': 'unknown', 'censorship_analysis': '', 'error': str(e) } time.sleep((2 ** attempt) * 0.5) except Exception as e: print(f" → Error in combined analysis: {e}") return { 'panel_count': 1, # Default to single panel 'panel_confidence': 'unknown', 'panel_analysis': '', 'is_censored': True, # Default to censored 'censorship_confidence': 'unknown', 'censorship_analysis': '', 'error': str(e) } def count_panels_in_layout(self, layout_path: str) -> Dict: """Legacy compatibility method for panel counting only""" combined_result = self.count_panels_and_detect_censorship(layout_path) # Convert to old format for backward compatibility return { 'panel_count': combined_result.get('panel_count', 1), 'confidence': combined_result.get('panel_confidence', 'unknown'), 'analysis': combined_result.get('panel_analysis', ''), 'panel_descriptions': combined_result.get('panel_descriptions', []), 'error': combined_result.get('error', None) } def calculate_inliers_for_match(self, layout_path: str, master_path: str, master_id: str) -> Dict: """Calculate inlier count for a master image match using OpenCV feature matching""" try: # Read images in grayscale for feature detection layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE) master_img = cv2.imread(master_path, cv2.IMREAD_GRAYSCALE) if layout_img is None or master_img is None: return { 'master_id': master_id, 'inliers': 0, 'confidence': 'low', 'error': 'Could not read one or both images' } # Initialize feature detector and matcher (using same approach as example code) akaze = cv2.AKAZE_create() bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) # Detect keypoints and descriptors kp1, des1 = akaze.detectAndCompute(layout_img, None) kp2, des2 = akaze.detectAndCompute(master_img, None) if des1 is None or des2 is None: return { 'master_id': master_id, 'inliers': 0, 'confidence': 'low', 'error': 'No features detected in one or both images' } # Match features using k-nearest neighbors matches = bf.knnMatch(des1, des2, k=2) # Apply Lowe's ratio test to filter good matches good_matches = [] for match_pair in matches: if len(match_pair) == 2: m, n = match_pair if m.distance < 0.75 * n.distance: good_matches.append(m) min_good_matches = 10 # Same threshold as example code if len(good_matches) < min_good_matches: return { 'master_id': master_id, 'inliers': 0, 'confidence': 'low', 'good_matches': len(good_matches), 'reason': f'Insufficient good matches: {len(good_matches)} < {min_good_matches}' } # Extract matched points src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) # Find homography using RANSAC M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) if mask is None: return { 'master_id': master_id, 'inliers': 0, 'confidence': 'low', 'good_matches': len(good_matches), 'error': 'Homography estimation failed' } # Count inliers inliers = int(np.sum(mask)) # Determine confidence based on inlier count and ratio inlier_ratio = inliers / len(good_matches) if inliers >= 50 and inlier_ratio >= 0.6: confidence = 'high' elif inliers >= 20 and inlier_ratio >= 0.4: confidence = 'medium' else: confidence = 'low' return { 'master_id': master_id, 'inliers': inliers, 'confidence': confidence, 'good_matches': len(good_matches), 'inlier_ratio': round(inlier_ratio, 3), 'total_features_layout': len(kp1), 'total_features_master': len(kp2) } except Exception as e: return { 'master_id': master_id, 'inliers': 0, 'confidence': 'low', 'error': str(e) } def refine_matches_by_panel_count(self, layout_path: str, detected_masters: List[str], panel_count: int) -> Dict: """Refine detected masters based on panel count using inlier analysis""" layout_name = Path(layout_path).name # First, deduplicate the detected masters to avoid processing the same master multiple times original_count = len(detected_masters) detected_masters = self.deduplicate_master_matches(detected_masters) if len(detected_masters) != original_count: duplicates_removed = original_count - len(detected_masters) print(f" Removed {duplicates_removed} duplicate master(s) before panel-aware refinement") # Optimization: If panel count equals detected masters count, skip refinement if panel_count == len(detected_masters): print(f" Panel count ({panel_count}) matches detected masters count ({len(detected_masters)}) - skipping refinement") return { 'refined_masters': detected_masters, 'refinement_applied': False, 'reason': 'panel_count_matches_detected_count', 'panel_count': panel_count, 'original_count': len(detected_masters), 'final_count': len(detected_masters) } # Only refine if we have more detected masters than panels if len(detected_masters) <= panel_count: print(f" Detected masters ({len(detected_masters)}) <= panel count ({panel_count}) - no refinement needed") return { 'refined_masters': detected_masters, 'refinement_applied': False, 'reason': 'detected_count_within_panel_limit', 'panel_count': panel_count, 'original_count': len(detected_masters), 'final_count': len(detected_masters) } print(f" Refining {len(detected_masters)} masters to best {panel_count} using inlier analysis...") # Calculate inliers for each detected master inlier_results = [] for i, master_id in enumerate(detected_masters): master_path = self.master_images[master_id] print(f" → Analyzing {i+1}/{len(detected_masters)}: {master_id}") inlier_result = self.calculate_inliers_for_match(layout_path, master_path, master_id) inlier_results.append(inlier_result) inliers = inlier_result.get('inliers', 0) confidence = inlier_result.get('confidence', 'unknown') print(f" ✓ {master_id}: {inliers} inliers (confidence: {confidence})") # Sort by inlier count (descending) to get best matches inlier_results.sort(key=lambda x: x.get('inliers', 0), reverse=True) # Select top N matches where N = panel_count refined_masters = [result['master_id'] for result in inlier_results[:panel_count]] print(f" Refinement complete: Selected top {len(refined_masters)} masters based on inlier analysis") # Log the selection details for i, result in enumerate(inlier_results[:panel_count]): rank = i + 1 master_id = result['master_id'] inliers = result.get('inliers', 0) confidence = result.get('confidence', 'unknown') print(f" → Rank {rank}: {master_id} ({inliers} inliers, {confidence} confidence)") return { 'refined_masters': refined_masters, 'refinement_applied': True, 'reason': 'inlier_based_selection', 'panel_count': panel_count, 'original_count': len(detected_masters), 'final_count': len(refined_masters), 'inlier_analysis': inlier_results, 'selection_details': { 'method': 'highest_inlier_count', 'selected_masters': [ { 'rank': i+1, 'master_id': result['master_id'], 'inliers': result.get('inliers', 0), 'confidence': result.get('confidence', 'unknown') } for i, result in enumerate(inlier_results[:panel_count]) ] } }