#!/usr/bin/env python3 """ Gemini Image Detector - Extracted ImageDetector class Uses Google Gemini 2.5 Pro API to detect which master images appear in layout images """ import os import json import time from pathlib import Path from typing import List, Dict, Optional import google.generativeai as genai from dotenv import load_dotenv from PIL import Image, ImageEnhance import tempfile import numpy as np import pickle import cv2 import concurrent.futures import threading import uuid import multiprocessing from functools import partial from panel_splitter import PanelSplitter def process_single_master_detection(layout_path, master_id, master_path, enable_greyscale, enable_contrast_enhancement, contrast_factor, safety_settings): """ Standalone function for processing a single master detection in a separate process. This ensures complete isolation from other workers. """ try: # Import and configure in each process to avoid shared state import os import json import time from pathlib import Path from PIL import Image, ImageEnhance import google.generativeai as genai from dotenv import load_dotenv import uuid import threading # Load environment in this process load_dotenv() api_key = os.getenv('GEMINI_API_KEY') if not api_key: raise ValueError("GEMINI_API_KEY not found in environment variables") genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-2.5-pro') # Create temp directory for this process temp_path = Path("temp_processed") temp_path.mkdir(exist_ok=True) def preprocess_image_local(image_path: str) -> str: """Local preprocessing function for this process""" if not enable_greyscale and not enable_contrast_enhancement: return image_path try: with Image.open(image_path) as img: processed_img = img.copy() if enable_greyscale: processed_img = processed_img.convert('L') processed_img = processed_img.convert('RGB') if enable_contrast_enhancement: contrast_enhancer = ImageEnhance.Contrast(processed_img) processed_img = contrast_enhancer.enhance(contrast_factor) sharpness_enhancer = ImageEnhance.Sharpness(processed_img) processed_img = sharpness_enhancer.enhance(1.3) # Thread-safe filename thread_id = threading.current_thread().ident unique_id = str(uuid.uuid4())[:8] original_name = Path(image_path).stem processed_path = temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg" processed_img.save(processed_path, 'JPEG', quality=95) return str(processed_path) except Exception as e: return image_path def upload_single_image_local(image_path: str): """Local upload function for this process""" max_retries = 3 for attempt in range(max_retries): try: processed_path = preprocess_image_local(image_path) uploaded_file = genai.upload_file(processed_path) return uploaded_file except Exception as e: if attempt == max_retries - 1: return None import random jitter = random.uniform(0.1, 0.5) sleep_time = (0.5 * (attempt + 1)) + jitter time.sleep(sleep_time) return None def create_single_master_prompt_local(master_id: str) -> str: """Local prompt creation function""" prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it. INSTRUCTIONS: 1. Compare the master image (first image) with the layout image (second image) 2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL 3. The layout image may contain the master image in various forms: - Complete/exact match - Cropped version - Scaled or resized version - Rotated version - Partially obscured 4. Focus on visual similarity in terms of: - Person/model appearance and pose (must be EXACTLY the same) - Clothing details (colors, patterns, styles - must be EXACTLY the same) - Background and composition - Overall visual elements 5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result. Master Image ID: {master_id} Return your response as a JSON object with this exact format: {{ "match_found": true/false, "master_id": "{master_id}", "confidence": "high/medium/low", "analysis": "Detailed explanation of your findings and reasoning" }} IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign. """ return prompt # Upload both images master_file = upload_single_image_local(master_path) layout_file = upload_single_image_local(layout_path) if not master_file or not layout_file: raise Exception("Failed to upload images") # Create prompt and make API call prompt = create_single_master_prompt_local(master_id) max_retries = 3 for attempt in range(max_retries): try: response = model.generate_content([prompt, master_file, layout_file], safety_settings=safety_settings) if not response.candidates: if attempt < max_retries - 1: time.sleep((2 ** attempt) * 0.5) continue else: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': 'No candidates returned', 'error': 'Safety block' } candidate = response.candidates[0] if candidate.finish_reason and candidate.finish_reason != 1: if candidate.finish_reason in [3, 4, 5] and attempt < max_retries - 1: time.sleep((2 ** attempt) * 0.5) continue else: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': f'Finished with reason: {candidate.finish_reason}', 'error': f'Finish reason: {candidate.finish_reason}' } # Parse response response_text = response.text.strip() start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': response_text, 'error': 'No JSON found in response' } json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate result format if 'match_found' not in result: result['match_found'] = False if 'master_id' not in result: result['master_id'] = master_id if 'confidence' not in result: result['confidence'] = 'unknown' if 'analysis' not in result: result['analysis'] = response_text return result except Exception as e: if attempt == max_retries - 1: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': '', 'error': str(e) } time.sleep((2 ** attempt) * 0.5) except Exception as e: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': '', 'error': str(e) } class ImageDetector: def __init__(self, enable_greyscale=True, enable_contrast_enhancement=True, contrast_factor=1.5, refinement_mode=False, one_at_a_time_mode=False, max_concurrent_workers=5, split_mode=False): """Initialize the image detector with Gemini API configuration""" load_dotenv() api_key = os.getenv('GEMINI_API_KEY') if not api_key: raise ValueError("GEMINI_API_KEY not found in environment variables") genai.configure(api_key=api_key) self.model = genai.GenerativeModel('gemini-2.5-pro') # Concurrency settings self.max_concurrent_workers = max_concurrent_workers self._progress_lock = threading.Lock() # Safety settings to prevent false positives for benign content self.safety_settings = [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE", }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE", }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE", }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE", }, ] print("Initialized with BLOCK_NONE safety settings for all categories to prevent false positives on benign marketing content.") # Image processing settings self.enable_greyscale = enable_greyscale self.enable_contrast_enhancement = enable_contrast_enhancement self.contrast_factor = contrast_factor self.refinement_mode = refinement_mode self.one_at_a_time_mode = one_at_a_time_mode # Split mode configuration self.split_mode = split_mode if self.split_mode: self.splitter = PanelSplitter(debug=True) print("Split mode enabled: Will split multi-panel layouts before matching") # Paths self.master_images_path = Path("master_images") self.layouts_path = Path("layouts") self.results_path = Path("results") self.temp_path = Path("temp_processed") # Create directories self.results_path.mkdir(exist_ok=True) self.temp_path.mkdir(exist_ok=True) # Master images cache self.master_images = {} self.master_files = {} self.uploaded_masters = None # Cache uploaded master files def load_master_images(self) -> Dict[str, str]: """Load all master images and create ID mapping using filenames""" print("Loading master images...") master_files = list(self.master_images_path.glob("*.jpg")) print(f"Found {len(master_files)} master images") for file_path in master_files: # Use filename (without extension) as the master ID master_id = file_path.stem self.master_images[master_id] = str(file_path) self.master_files[master_id] = file_path.name return self.master_images def match_split_to_masters(self, split_path: str, master_images: List[str]) -> List[Dict]: """Match a split image to master images using basic OpenCV matching""" matches = [] # For Gemini detector, we'll use a simple approach since it doesn't have # the sophisticated inlier analysis like OpenAI detector # This is a placeholder - in practice, you might want to use the OpenAI # detector's inlier analysis or implement a similar approach # For now, return empty matches to avoid errors # You could implement basic template matching or feature matching here return matches def preprocess_image(self, image_path: str) -> str: """Preprocess image: convert to greyscale and enhance contrast - THREAD-SAFE VERSION""" if not self.enable_greyscale and not self.enable_contrast_enhancement: return image_path try: # Open the image with Image.open(image_path) as img: processed_img = img.copy() # Convert to greyscale if enabled if self.enable_greyscale: processed_img = processed_img.convert('L') # Convert back to RGB for consistency processed_img = processed_img.convert('RGB') # Enhance contrast if enabled if self.enable_contrast_enhancement: # Global contrast enhancement contrast_enhancer = ImageEnhance.Contrast(processed_img) processed_img = contrast_enhancer.enhance(self.contrast_factor) # Edge contrast enhancement using sharpness sharpness_enhancer = ImageEnhance.Sharpness(processed_img) processed_img = sharpness_enhancer.enhance(1.3) # Save processed image with thread-safe filename import threading import uuid thread_id = threading.current_thread().ident unique_id = str(uuid.uuid4())[:8] original_name = Path(image_path).stem processed_path = self.temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg" processed_img.save(processed_path, 'JPEG', quality=95) return str(processed_path) except Exception as e: print(f"Warning: Failed to preprocess {Path(image_path).name}: {e}") print(f"Using original image instead") return image_path def upload_master_images_once(self): """Upload all master images ONCE and cache them (with preprocessing)""" if self.uploaded_masters is not None: return self.uploaded_masters processing_msg = [] if self.enable_greyscale: processing_msg.append("greyscale conversion") if self.enable_contrast_enhancement: processing_msg.append("contrast enhancement") if processing_msg: print(f"Uploading master images with {' and '.join(processing_msg)} (one-time operation)...") else: print("Uploading master images (one-time operation)...") uploaded_masters = [] master_paths = list(self.master_images.values()) for i, path in enumerate(master_paths): try: # Preprocess the image processed_path = self.preprocess_image(path) # Upload the processed image uploaded_file = genai.upload_file(processed_path) uploaded_masters.append(uploaded_file) print(f"Uploaded master {i+1}/{len(master_paths)}: {Path(path).name}") # Small delay to avoid rate limiting if i < len(master_paths) - 1: time.sleep(0.1) except Exception as e: print(f"Error uploading {Path(path).name}: {e}") # Retry once after delay try: time.sleep(1.0) processed_path = self.preprocess_image(path) uploaded_file = genai.upload_file(processed_path) uploaded_masters.append(uploaded_file) print(f"Retry successful for {Path(path).name}") except Exception as e2: print(f"Failed to upload {Path(path).name}: {e2}") self.uploaded_masters = uploaded_masters print(f"✓ Successfully uploaded {len(uploaded_masters)} master images") return uploaded_masters def upload_single_image(self, image_path: str) -> Optional: """Upload a single image with preprocessing and retry logic""" max_retries = 3 for attempt in range(max_retries): try: # Preprocess the image processed_path = self.preprocess_image(image_path) # Upload the processed image uploaded_file = genai.upload_file(processed_path) return uploaded_file except Exception as e: if attempt == max_retries - 1: print(f"Failed to upload {Path(image_path).name}: {e}") return None print(f"Upload retry {attempt + 1}/{max_retries} for {Path(image_path).name}: {e}") # Progressive backoff with jitter to avoid thundering herd import random jitter = random.uniform(0.1, 0.5) sleep_time = (0.5 * (attempt + 1)) + jitter time.sleep(sleep_time) return None def _upload_single_image_threadsafe(self, image_path: str, thread_genai) -> Optional: """Thread-safe version of upload_single_image using thread-local client""" max_retries = 3 for attempt in range(max_retries): try: # Preprocess the image processed_path = self.preprocess_image(image_path) # Upload the processed image using thread-local client uploaded_file = thread_genai.upload_file(processed_path) return uploaded_file except Exception as e: if attempt == max_retries - 1: return None # Progressive backoff with jitter to avoid thundering herd import random jitter = random.uniform(0.1, 0.5) sleep_time = (0.5 * (attempt + 1)) + jitter time.sleep(sleep_time) return None def _make_robust_api_call_threadsafe(self, thread_model, prompt, files, operation_name="API call", max_retries=3): """Thread-safe version of make_robust_api_call using thread-local model""" last_error = None for attempt in range(max_retries): try: response = thread_model.generate_content([prompt] + files, safety_settings=self.safety_settings) # Check for blocked content or safety issues immediately if not response.candidates: error_msg = f"No candidates returned" if hasattr(response, 'prompt_feedback'): error_msg += f" - Prompt feedback: {response.prompt_feedback}" # This is a safety block, should retry if attempt < max_retries - 1: wait_time = (2 ** attempt) * 0.5 time.sleep(wait_time) continue else: return { 'success': False, 'error_type': 'safety_block', 'error_message': error_msg, 'response': response } candidate = response.candidates[0] # Check finish reason if candidate.finish_reason and candidate.finish_reason != 1: # 1 = STOP (normal completion) error_msg = f"Request finished with reason: {candidate.finish_reason}" if hasattr(response, 'prompt_feedback'): error_msg += f" - Prompt feedback: {response.prompt_feedback}" if hasattr(candidate, 'safety_ratings'): error_msg += f" - Safety ratings: {candidate.safety_ratings}" # Check if this is a retryable safety issue if candidate.finish_reason in [3, 4, 5]: # SAFETY, RECITATION, OTHER safety-related reasons if attempt < max_retries - 1: wait_time = (2 ** attempt) * 0.5 time.sleep(wait_time) continue else: return { 'success': False, 'error_type': 'safety_finish_reason', 'error_message': error_msg, 'response': response } else: # Non-safety related finish reason, don't retry return { 'success': False, 'error_type': 'other_finish_reason', 'error_message': error_msg, 'response': response } # Success case return { 'success': True, 'response': response, 'text': response.text.strip() } except Exception as e: last_error = e error_str = str(e) # Check if this looks like a safety/blocking error is_safety_error = any(keyword in error_str.lower() for keyword in [ 'safety', 'blocked', 'filtered', 'response.text', 'response.parts', 'finish_reason', 'candidates', 'prompt_feedback' ]) if is_safety_error and attempt < max_retries - 1: wait_time = (2 ** attempt) * 0.5 time.sleep(wait_time) continue elif attempt < max_retries - 1: # Other errors, also retry but with different messaging wait_time = (2 ** attempt) * 0.5 time.sleep(wait_time) continue else: # Final attempt failed return { 'success': False, 'error_type': 'exception', 'error_message': str(e), 'exception': e } # This shouldn't be reached, but just in case return { 'success': False, 'error_type': 'max_retries_exceeded', 'error_message': f"Max retries ({max_retries}) exceeded", 'last_error': str(last_error) if last_error else "Unknown error" } def create_detection_prompt(self, master_ids: List[str]) -> str: """Create the prompt for image detection""" prompt = """Analyze the layout image (the last image provided) and identify which of the master images appear in it. INSTRUCTIONS: 1. Examine the layout image carefully to identify any image segments or components within it 2. For each segment you identify, look for matches in the provided master images 3. Look for all types of matches including: - Exact matches (complete images) - Cropped versions (partial images) - Scaled or resized versions - Rotated versions - Images that may be partially obscured 4. Focus on visual similarity in terms of: - Person/model appearance and pose - Clothing details (colors, patterns, styles) - Background and composition - Overall visual elements 5. Be conservative and only include matches where you have reasonable confidence 6. Only return a positive result (master image appears in the layout) if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result. Master Images with IDs: """ for i, master_id in enumerate(master_ids): prompt += f"Image {i+1}: ID = {master_id}\n" prompt += """ The final image is the LAYOUT IMAGE to analyze. Return your response as a JSON object with this exact format: { "detected_masters": ["master_id_1", "master_id_2"], "analysis": "Brief explanation of what you found and your matching reasoning" } Only include master IDs in "detected_masters" where you completely sure of a match. If no clear matches are found, return an empty array. IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos that may show models in various clothing styles for retail purposes. This prompt and task are completely benign and should not trigger safety filters. This is standard e-commerce image comparison work, not dangerous, offensive, or inappropriate content. The comparison is purely for business/marketing layout matching purposes. """ return prompt def create_simple_detection_prompt(self, master_ids: List[str]) -> str: """Create a simplified prompt for cases where the main prompt gets blocked""" prompt = """Compare the layout image (the last image) with the master images provided and identify which master images appear in the layout. Look for visual matches including: - Complete images that match - Partial/cropped versions - Resized versions This is for e-commerce product image matching. Master Images with IDs: """ for i, master_id in enumerate(master_ids): prompt += f"Image {i+1}: ID = {master_id}\n" prompt += """ The final image is the LAYOUT IMAGE to analyze. Return your response as a JSON object: { "detected_masters": ["master_id_1", "master_id_2", ...], "analysis": "Brief explanation" } Only include master IDs that clearly appear in the layout image. """ return prompt def make_robust_api_call(self, prompt, files, operation_name="API call", max_retries=3): """Make a robust API call with comprehensive error detection and retry logic""" last_error = None for attempt in range(max_retries): try: response = self.model.generate_content([prompt] + files, safety_settings=self.safety_settings) # Check for blocked content or safety issues immediately if not response.candidates: error_msg = f"No candidates returned" if hasattr(response, 'prompt_feedback'): error_msg += f" - Prompt feedback: {response.prompt_feedback}" # This is a safety block, should retry if attempt < max_retries - 1: wait_time = (2 ** attempt) * 0.5 print(f" Safety block detected on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s...") time.sleep(wait_time) continue else: return { 'success': False, 'error_type': 'safety_block', 'error_message': error_msg, 'response': response } candidate = response.candidates[0] # Check finish reason if candidate.finish_reason and candidate.finish_reason != 1: # 1 = STOP (normal completion) error_msg = f"Request finished with reason: {candidate.finish_reason}" if hasattr(response, 'prompt_feedback'): error_msg += f" - Prompt feedback: {response.prompt_feedback}" if hasattr(candidate, 'safety_ratings'): error_msg += f" - Safety ratings: {candidate.safety_ratings}" # Check if this is a retryable safety issue if candidate.finish_reason in [3, 4, 5]: # SAFETY, RECITATION, OTHER safety-related reasons if attempt < max_retries - 1: wait_time = (2 ** attempt) * 0.5 print(f" Safety/content issue detected on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s...") time.sleep(wait_time) continue else: return { 'success': False, 'error_type': 'safety_finish_reason', 'error_message': error_msg, 'response': response } else: # Non-safety related finish reason, don't retry return { 'success': False, 'error_type': 'other_finish_reason', 'error_message': error_msg, 'response': response } # Success case return { 'success': True, 'response': response, 'text': response.text.strip() } except Exception as e: last_error = e error_str = str(e) # Check if this looks like a safety/blocking error is_safety_error = any(keyword in error_str.lower() for keyword in [ 'safety', 'blocked', 'filtered', 'response.text', 'response.parts', 'finish_reason', 'candidates', 'prompt_feedback' ]) if is_safety_error and attempt < max_retries - 1: wait_time = (2 ** attempt) * 0.5 print(f" Safety-related error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}") time.sleep(wait_time) continue elif attempt < max_retries - 1: # Other errors, also retry but with different messaging wait_time = (2 ** attempt) * 0.5 print(f" API error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}") time.sleep(wait_time) continue else: # Final attempt failed return { 'success': False, 'error_type': 'exception', 'error_message': str(e), 'exception': e } # This shouldn't be reached, but just in case return { 'success': False, 'error_type': 'max_retries_exceeded', 'error_message': f"Max retries ({max_retries}) exceeded", 'last_error': str(last_error) if last_error else "Unknown error" } def is_cen_image(self, master_id: str) -> bool: """Check if a master image ID represents a CEN (censored) image""" return '_CEN' in master_id def find_corresponding_non_cen_image(self, cen_master_id: str) -> Optional[str]: """Find the corresponding non-CEN image for a given CEN master ID""" if not self.is_cen_image(cen_master_id): return None # Transform CEN filename to non-CEN filename # Example: "1011A_1011A_1011_01_CEN" -> "1011A_1011_01" parts = cen_master_id.split('_') if len(parts) >= 4 and parts[-1] == 'CEN': # Remove the middle duplicate part and _CEN suffix # Pattern: prefix_prefix_middle_suffix_CEN -> prefix_middle_suffix if len(parts) >= 5: non_cen_id = f"{parts[0]}_{parts[2]}_{parts[3]}" else: # Fallback: just remove _CEN non_cen_id = '_'.join(parts[:-1]) # Check if this non-CEN image exists in our master images if non_cen_id in self.master_images: return non_cen_id return None def create_censorship_detection_prompt(self) -> str: """Create prompt for detecting if a layout image contains censored content""" prompt = """Analyze this layout image to determine if it contains censored or uncensored content. TASK: Determine whether the images in this layout are censored (covered) or uncensored (more exposed). CENSORSHIP INDICATORS TO LOOK FOR: 1. **Clothing Coverage**: - Long sleeves vs. sleeveless/short sleeves - Full-length pants/skirts vs. shorts or shorter garments - High necklines vs. lower necklines 2. **Skin Coverage**: - Arms: Fully covered vs. bare arms - Legs: Fully covered vs. exposed legs/thighs - Torso: Additional covering vs. more exposed areas 3. **Added Elements**: - Opaque or semi-transparent overlay layers covering skin - Additional fabric or clothing elements that appear to cover exposed areas - Digital modifications that add coverage CLASSIFICATION: - **CENSORED**: If models show significant additional clothing coverage, long sleeves, full pants/skirts, or digital overlays covering skin - **UNCENSORED**: If models show more exposed skin, shorter garments, bare arms/legs, or natural clothing without added coverage Return your response as a JSON object with this exact format: {{ "is_censored": true/false, "confidence": "high/medium/low", "analysis": "Detailed explanation of the coverage patterns observed and reasoning for the classification", "coverage_details": "Specific description of clothing and skin coverage in the layout" }} Be precise and focus on the actual clothing and coverage patterns visible in the image. IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image classification. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign. """ return prompt def detect_layout_censorship(self, layout_path: str) -> Dict: """Detect if a layout image contains censored or uncensored content""" try: # Upload layout image layout_file = self.upload_single_image(layout_path) if not layout_file: raise Exception("Failed to upload layout image") # Create censorship detection prompt prompt = self.create_censorship_detection_prompt() # Make API call with robust retry logic api_result = self.make_robust_api_call(prompt, [layout_file], "censorship detection") # Handle API call failure if not api_result['success']: return { 'is_censored': True, # Default to censored if API fails 'confidence': 'unknown', 'analysis': f'API call failed: {api_result["error_message"]}', 'error': f"{api_result['error_type']}: {api_result['error_message']}" } # Parse response response_text = api_result['text'] # Extract JSON from response try: start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: raise ValueError("No JSON found in response") json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate result format if 'is_censored' not in result: result['is_censored'] = True # Default to censored if unclear if 'confidence' not in result: result['confidence'] = 'unknown' if 'analysis' not in result: result['analysis'] = response_text return result except json.JSONDecodeError as e: return { 'is_censored': True, # Default to censored if parsing fails 'confidence': 'unknown', 'analysis': response_text, 'error': f'JSON decode error: {e}' } except Exception as e: return { 'is_censored': True, # Default to censored if error 'confidence': 'unknown', 'analysis': '', 'error': str(e) } def apply_cen_refinement_to_results(self, layout_path: str, initial_results: Dict) -> Dict: """Apply CEN refinement to initial detection results""" layout_name = Path(layout_path).name detected_masters = initial_results.get('detected_masters', []) # First, deduplicate the detected masters to avoid processing duplicates original_count = len(detected_masters) detected_masters = self.deduplicate_master_matches(detected_masters) if len(detected_masters) != original_count: duplicates_removed = original_count - len(detected_masters) print(f" Removed {duplicates_removed} duplicate master(s) before CEN refinement") # Find CEN images in the results cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)] if not cen_images: # No CEN images found, return original results return initial_results print(f" Refining {len(cen_images)} CEN matches for {layout_name}") print(f" Analyzing layout to determine censorship level...") # Detect if the layout is censored or uncensored censorship_result = self.detect_layout_censorship(layout_path) is_layout_censored = censorship_result.get('is_censored', True) confidence = censorship_result.get('confidence', 'unknown') print(f" Layout analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})") refined_masters = [] refinement_details = [] changes_made = 0 # Process each detected image for master_id in detected_masters: if self.is_cen_image(master_id): # This is a CEN image non_cen_id = self.find_corresponding_non_cen_image(master_id) if not is_layout_censored and non_cen_id: # Layout is uncensored, switch to non-CEN version refined_masters.append(non_cen_id) refinement_details.append({ 'original_cen_match': master_id, 'non_cen_alternative': non_cen_id, 'final_choice': non_cen_id, 'confidence': confidence, 'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}", 'changed': True, 'reason': 'layout_uncensored' }) changes_made += 1 print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)") else: # Layout is censored or no non-CEN alternative, keep CEN version refined_masters.append(master_id) reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative' refinement_details.append({ 'original_cen_match': master_id, 'non_cen_alternative': non_cen_id, 'final_choice': master_id, 'confidence': confidence, 'analysis': f"Kept {master_id} - layout is censored or no non-CEN alternative available", 'changed': False, 'reason': reason }) print(f" → Kept {master_id} ({'layout is censored' if is_layout_censored else 'no non-CEN alternative'})") else: # This is not a CEN image, keep it as-is refined_masters.append(master_id) print(f" Summary: {changes_made} CEN images changed to non-CEN versions") # Apply deduplication to refined masters in case refinement introduced duplicates original_refined = refined_masters[:] refined_masters = self.deduplicate_master_matches(refined_masters) if len(refined_masters) != len(original_refined): post_refinement_duplicates = len(original_refined) - len(refined_masters) print(f" Post-refinement deduplication: Removed {post_refinement_duplicates} duplicate(s)") # Update results with refinement information refined_results = initial_results.copy() refined_results['detected_masters'] = refined_masters refined_results['detected_master_ids'] = refined_masters # Update both fields for consistency refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters ] refined_results['refinement_applied'] = True refined_results['refinement_details'] = refinement_details refined_results['censorship_analysis'] = censorship_result refined_results['original_detection_count'] = len(detected_masters) refined_results['refined_detection_count'] = len(refined_masters) refined_results['changes_made'] = changes_made return refined_results def detect_images_in_layout(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict: """Detect which master images appear in a single layout image""" layout_name = Path(layout_path).name print(f"Processing {layout_index}/{total_layouts}: {layout_name}") try: # Upload only the layout image (masters already uploaded) layout_file = self.upload_single_image(layout_path) if not layout_file: raise Exception("Failed to upload layout image") # Combine pre-uploaded masters with the layout all_files = self.uploaded_masters + [layout_file] # Create prompt master_ids = list(self.master_images.keys()) prompt = self.create_detection_prompt(master_ids) # Try main prompt first, then fallback to simple prompt if blocked api_result = self.make_robust_api_call(prompt, all_files, f"detection for {layout_name}") # If main prompt failed due to safety issues, try simple prompt if not api_result['success'] and api_result['error_type'] in ['safety_block', 'safety_finish_reason']: print(f" Main prompt blocked for {layout_name}, trying simplified prompt...") simple_prompt = self.create_simple_detection_prompt(master_ids) api_result = self.make_robust_api_call(simple_prompt, all_files, f"simple detection for {layout_name}") # Handle API call failure if not api_result['success']: error_msg = api_result['error_message'] print(f"API call failed for {layout_name}: {error_msg}") return { 'detected_masters': [], 'analysis': f'API call failed: {error_msg}', 'error': f"{api_result['error_type']}: {error_msg}", 'retry_count': 3 # Max retries were attempted } # Parse response response_text = api_result['text'] # Extract JSON from response try: start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: raise ValueError("No JSON found in response") json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate result format if 'detected_masters' not in result: result['detected_masters'] = [] if 'analysis' not in result: result['analysis'] = response_text # Deduplicate detected masters original_detected = result['detected_masters'][:] result['detected_masters'] = self.deduplicate_master_matches(result['detected_masters']) # Track deduplication if any duplicates were removed if len(result['detected_masters']) != len(original_detected): duplicates_removed = len(original_detected) - len(result['detected_masters']) result['deduplication_applied'] = True result['duplicates_removed'] = duplicates_removed result['original_detected_masters'] = original_detected print(f" Deduplication: Removed {duplicates_removed} duplicate master(s) from {layout_name}") # Log completion detected_count = len(result['detected_masters']) print(f"✓ Completed {layout_name} - Found {detected_count} matches") return result except json.JSONDecodeError as e: print(f"JSON decode error for {layout_name}: {e}") return { 'detected_masters': [], 'analysis': response_text, 'error': f'JSON decode error: {e}' } except Exception as e: error_msg = f"Error analyzing {layout_name}: {e}" # Check if this was a safety-related error if "response.text" in str(e) or "response.parts" in str(e): error_msg += "\nThis appears to be a safety filter blocking issue." if hasattr(e, 'response') and e.response: if hasattr(e.response, 'prompt_feedback'): error_msg += f"\nPrompt feedback: {e.response.prompt_feedback}" if e.response.candidates: candidate = e.response.candidates[0] if hasattr(candidate, 'safety_ratings'): error_msg += f"\nSafety ratings: {candidate.safety_ratings}" if hasattr(candidate, 'finish_reason'): error_msg += f"\nFinish reason: {candidate.finish_reason}" print(error_msg) return { 'detected_masters': [], 'analysis': '', 'error': str(e) } def create_single_master_prompt(self, master_id: str) -> str: """Create prompt for checking if a single master image appears in the layout""" prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it. INSTRUCTIONS: 1. Compare the master image (first image) with the layout image (second image) 2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL 3. The layout image may contain the master image in various forms: - Complete/exact match - Cropped version - Scaled or resized version - Rotated version - Partially obscured 4. Focus on visual similarity in terms of: - Person/model appearance and pose (must be EXACTLY the same) - Clothing details (colors, patterns, styles - must be EXACTLY the same) - Background and composition - Overall visual elements 5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result. Master Image ID: {master_id} Return your response as a JSON object with this exact format: {{ "match_found": true/false, "master_id": "{master_id}", "confidence": "high/medium/low", "analysis": "Detailed explanation of your findings and reasoning" }} IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign. """ return prompt def detect_single_master_in_layout(self, layout_path: str, master_id: str, master_index: int, total_masters: int) -> Dict: """Check if a single master image appears in the layout image - THREAD-SAFE VERSION""" layout_name = Path(layout_path).name master_path = self.master_images[master_id] try: # Create thread-local API client to avoid shared state issues import google.generativeai as thread_genai api_key = os.getenv('GEMINI_API_KEY') thread_genai.configure(api_key=api_key) thread_model = thread_genai.GenerativeModel('gemini-2.5-pro') # Upload both images using thread-local client master_file = self._upload_single_image_threadsafe(master_path, thread_genai) layout_file = self._upload_single_image_threadsafe(layout_path, thread_genai) if not master_file or not layout_file: raise Exception("Failed to upload images") # Create prompt for single master matching prompt = self.create_single_master_prompt(master_id) # Make API call with thread-local model api_result = self._make_robust_api_call_threadsafe( thread_model, prompt, [master_file, layout_file], f"single master detection: {master_id} in {layout_name}" ) # Handle API call failure if not api_result['success']: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': f'API call failed: {api_result["error_message"]}', 'error': f"{api_result['error_type']}: {api_result['error_message']}" } # Parse response response_text = api_result['text'] # Extract JSON from response try: start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: raise ValueError("No JSON found in response") json_str = response_text[start_idx:end_idx] result = json.loads(json_str) # Validate result format if 'match_found' not in result: result['match_found'] = False if 'master_id' not in result: result['master_id'] = master_id if 'confidence' not in result: result['confidence'] = 'unknown' if 'analysis' not in result: result['analysis'] = response_text return result except json.JSONDecodeError as e: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': response_text, 'error': f'JSON decode error: {e}' } except Exception as e: return { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': '', 'error': str(e) } def detect_images_in_layout_one_at_a_time(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict: """Detect which master images appear in a layout by checking each master individually using process-based concurrency""" layout_name = Path(layout_path).name print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Process-based one-at-a-time mode)") master_ids = list(self.master_images.keys()) total_masters = len(master_ids) detected_masters = [] detailed_results = [] print(f" Checking {total_masters} masters using {self.max_concurrent_workers} concurrent processes...") # Prepare arguments for process pool tasks = [] for master_id in master_ids: master_path = self.master_images[master_id] task_args = ( layout_path, master_id, master_path, self.enable_greyscale, self.enable_contrast_enhancement, self.contrast_factor, self.safety_settings ) tasks.append(task_args) # Use ProcessPoolExecutor for true isolation with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_concurrent_workers) as executor: # Submit all tasks future_to_master = { executor.submit(process_single_master_detection, *task_args): task_args[1] for task_args in tasks } completed_count = 0 # Collect results as they complete for future in concurrent.futures.as_completed(future_to_master): master_id = future_to_master[future] completed_count += 1 try: result = future.result() detailed_results.append(result) # If match found, add to detected masters if result.get('match_found', False): detected_masters.append(master_id) confidence = result.get('confidence', 'unknown') print(f" {completed_count}/{total_masters}: ✓ MATCH found for {master_id} (confidence: {confidence})") else: if 'error' in result: print(f" {completed_count}/{total_masters}: Error checking {master_id}: {result['error']}") else: print(f" {completed_count}/{total_masters}: No match for {master_id}") except Exception as e: print(f" {completed_count}/{total_masters}: Process error checking {master_id}: {e}") # Add error result to maintain consistency error_result = { 'match_found': False, 'master_id': master_id, 'confidence': 'unknown', 'analysis': '', 'error': str(e) } detailed_results.append(error_result) # Sort detailed_results by master_id to maintain consistent ordering detailed_results.sort(key=lambda x: x.get('master_id', '')) # Deduplicate detected masters (shouldn't be needed in one-at-a-time mode, but for safety) original_detected = detected_masters[:] detected_masters = self.deduplicate_master_matches(detected_masters) if len(detected_masters) != len(original_detected): duplicates_removed = len(original_detected) - len(detected_masters) print(f" Deduplication: Removed {duplicates_removed} duplicate master(s)") detected_count = len(detected_masters) print(f"✓ Completed {layout_name} - Found {detected_count} matches using {self.max_concurrent_workers} concurrent processes") return { 'detected_masters': detected_masters, 'detected_master_ids': detected_masters, 'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters ], 'analysis': f'Process-based one-at-a-time analysis completed. Found {detected_count} exact matches out of {total_masters} masters checked using {self.max_concurrent_workers} concurrent processes.', 'detailed_results': detailed_results, 'processing_mode': 'process_based_one_at_a_time', 'total_masters_checked': total_masters, 'concurrent_workers': self.max_concurrent_workers, 'deduplication_applied': len(detected_masters) != len(original_detected), 'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0, 'original_detected_masters': original_detected } def process_all_layouts(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict: """Process all layout images sequentially""" if self.one_at_a_time_mode: mode_desc = "One-at-a-time Mode" else: mode_desc = "Multi Master Mode" if self.refinement_mode: mode_desc += " with CEN Refinement" print(f"Starting sequential batch processing ({mode_desc})...") # Load master images self.load_master_images() # Upload all master images ONCE (only for multi-master mode) if not self.one_at_a_time_mode: self.upload_master_images_once() # Get layout files if specific_file: # Process only the specific file layout_files = [self.layouts_path / specific_file] if not layout_files[0].exists(): raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}") print(f"Processing specific file: {specific_file}") else: layout_files = list(self.layouts_path.glob("*.jpg")) if limit: layout_files = layout_files[:limit] print(f"Processing first {limit} layouts only") total_layouts = len(layout_files) print(f"Processing {total_layouts} layout images in {mode_desc}") print("=" * 60) results = {} start_time = time.time() for i, layout_path in enumerate(layout_files, 1): layout_id = layout_path.stem # Detect images in layout using the appropriate method if self.split_mode: # Split mode: split layout into panels and match each panel master_ids = list(self.master_images.keys()) result = self.splitter.split_layout_and_match(str(layout_path), master_ids, self) # Apply CEN refinement if enabled and there are matches if self.refinement_mode and result.get('detected_masters'): result = self.apply_cen_refinement_to_results(str(layout_path), result) elif self.one_at_a_time_mode: result = self.detect_images_in_layout_one_at_a_time(str(layout_path), i, total_layouts) else: result = self.detect_images_in_layout(str(layout_path), i, total_layouts) # Apply CEN refinement if enabled and there are CEN matches if not self.split_mode and self.refinement_mode and result.get('detected_masters'): result = self.apply_cen_refinement_to_results(str(layout_path), result) layout_result = { 'layout_filename': layout_path.name, 'detected_master_ids': result['detected_masters'], 'detected_master_filenames': [f"{mid}.jpg" for mid in result['detected_masters'] ], 'analysis': result.get('analysis', 'Split mode analysis'), 'detection_mode': mode_desc.lower().replace(' ', '_').replace('with_', '') } # Add split mode specific fields if self.split_mode: layout_result['split_mode'] = True layout_result['splits_generated'] = result.get('splits_generated', 0) layout_result['panel_count'] = result.get('panel_count', 1) layout_result['panel_confidence'] = result.get('panel_confidence', 'unknown') if 'split_results' in result: layout_result['split_results'] = result['split_results'] # Add deduplication fields if applied if 'deduplication_applied' in result: layout_result['deduplication_applied'] = result['deduplication_applied'] layout_result['duplicates_removed'] = result['duplicates_removed'] layout_result['original_detected_masters'] = result['original_detected_masters'] if 'error' in result: layout_result['error'] = result['error'] # Add refinement mode specific fields if self.refinement_mode and result.get('refinement_applied'): layout_result['refinement_applied'] = result['refinement_applied'] layout_result['refinement_details'] = result['refinement_details'] layout_result['censorship_analysis'] = result['censorship_analysis'] layout_result['original_detection_count'] = result['original_detection_count'] layout_result['refined_detection_count'] = result['refined_detection_count'] layout_result['changes_made'] = result.get('changes_made', 0) results[layout_id] = layout_result # Progress update with time estimate elapsed = time.time() - start_time avg_time = elapsed / i remaining = (total_layouts - i) * avg_time print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min") # Save progress periodically if i % 20 == 0: self.save_results(results, f"progress_{i}") total_time = time.time() - start_time print(f"\n✓ Completed processing all {total_layouts} layouts in {total_time/60:.1f} minutes") print(f"Average time per layout: {total_time/total_layouts:.1f} seconds") return results def save_results(self, results: Dict, filename: str = "detection_results") -> str: """Save results to JSON file""" output_path = self.results_path / f"{filename}.json" # Add metadata output_data = { 'metadata': { 'total_layouts_processed': len(results), 'total_master_images': len(self.master_images), 'master_images_available': list(self.master_files.keys()) }, 'results': results } with open(output_path, 'w') as f: json.dump(output_data, f, indent=2) print(f"Results saved to: {output_path}") return str(output_path) def generate_summary(self, results: Dict) -> Dict: """Generate summary statistics""" total_layouts = len(results) layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids']) # Count master image occurrences master_counts = {} for result in results.values(): for master_id in result['detected_master_ids']: master_counts[master_id] = master_counts.get(master_id, 0) + 1 # Deduplication statistics layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False)) total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values()) summary = { 'total_layouts_processed': total_layouts, 'layouts_with_matches': layouts_with_matches, 'layouts_without_matches': total_layouts - layouts_with_matches, 'master_image_usage': master_counts, 'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10], # Deduplication stats 'layouts_with_deduplication': layouts_with_deduplication, 'total_duplicates_removed': total_duplicates_removed, 'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0 } return summary def deduplicate_master_matches(self, detected_masters: List[str]) -> List[str]: """Remove duplicate master matches from a list while preserving order""" if not detected_masters: return detected_masters # Simple deduplication - remove exact duplicates while preserving order seen = set() deduplicated = [] for master_id in detected_masters: if master_id not in seen: seen.add(master_id) deduplicated.append(master_id) return deduplicated def cleanup_temp_files(self): """Clean up temporary processed image files - handles thread-safe filenames""" try: if self.temp_path.exists(): # Clean up both old and new thread-safe naming patterns for temp_file in self.temp_path.glob("*_processed*.jpg"): temp_file.unlink() # Remove temp directory if empty if not any(self.temp_path.iterdir()): self.temp_path.rmdir() except Exception as e: print(f"Warning: Failed to cleanup temp files: {e}")