master_adapt_detect/gemini_detector.py
2025-10-01 14:32:55 -05:00

1501 lines
No EOL
67 KiB
Python

#!/usr/bin/env python3
"""
Gemini Image Detector - Extracted ImageDetector class
Uses Google Gemini 2.5 Pro API to detect which master images appear in layout images
"""
import os
import json
import time
from pathlib import Path
from typing import List, Dict, Optional
import google.generativeai as genai
from dotenv import load_dotenv
from PIL import Image, ImageEnhance
import tempfile
import numpy as np
import pickle
import cv2
import concurrent.futures
import threading
import uuid
import multiprocessing
from functools import partial
from panel_splitter import PanelSplitter
def process_single_master_detection(layout_path, master_id, master_path, enable_greyscale, enable_contrast_enhancement, contrast_factor, safety_settings):
"""
Standalone function for processing a single master detection in a separate process.
This ensures complete isolation from other workers.
"""
try:
# Import and configure in each process to avoid shared state
import os
import json
import time
from pathlib import Path
from PIL import Image, ImageEnhance
import google.generativeai as genai
from dotenv import load_dotenv
import uuid
import threading
# Load environment in this process
load_dotenv()
api_key = os.getenv('GEMINI_API_KEY')
if not api_key:
raise ValueError("GEMINI_API_KEY not found in environment variables")
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-2.5-pro')
# Create temp directory for this process
temp_path = Path("temp_processed")
temp_path.mkdir(exist_ok=True)
def preprocess_image_local(image_path: str) -> str:
"""Local preprocessing function for this process"""
if not enable_greyscale and not enable_contrast_enhancement:
return image_path
try:
with Image.open(image_path) as img:
processed_img = img.copy()
if enable_greyscale:
processed_img = processed_img.convert('L')
processed_img = processed_img.convert('RGB')
if enable_contrast_enhancement:
contrast_enhancer = ImageEnhance.Contrast(processed_img)
processed_img = contrast_enhancer.enhance(contrast_factor)
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
processed_img = sharpness_enhancer.enhance(1.3)
# Thread-safe filename
thread_id = threading.current_thread().ident
unique_id = str(uuid.uuid4())[:8]
original_name = Path(image_path).stem
processed_path = temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
processed_img.save(processed_path, 'JPEG', quality=95)
return str(processed_path)
except Exception as e:
return image_path
def upload_single_image_local(image_path: str):
"""Local upload function for this process"""
max_retries = 3
for attempt in range(max_retries):
try:
processed_path = preprocess_image_local(image_path)
uploaded_file = genai.upload_file(processed_path)
return uploaded_file
except Exception as e:
if attempt == max_retries - 1:
return None
import random
jitter = random.uniform(0.1, 0.5)
sleep_time = (0.5 * (attempt + 1)) + jitter
time.sleep(sleep_time)
return None
def create_single_master_prompt_local(master_id: str) -> str:
"""Local prompt creation function"""
prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it.
INSTRUCTIONS:
1. Compare the master image (first image) with the layout image (second image)
2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL
3. The layout image may contain the master image in various forms:
- Complete/exact match
- Cropped version
- Scaled or resized version
- Rotated version
- Partially obscured
4. Focus on visual similarity in terms of:
- Person/model appearance and pose (must be EXACTLY the same)
- Clothing details (colors, patterns, styles - must be EXACTLY the same)
- Background and composition
- Overall visual elements
5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same.
If there is ANY difference in clothing, model, or pose then return a negative result.
Master Image ID: {master_id}
Return your response as a JSON object with this exact format:
{{
"match_found": true/false,
"master_id": "{master_id}",
"confidence": "high/medium/low",
"analysis": "Detailed explanation of your findings and reasoning"
}}
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
"""
return prompt
# Upload both images
master_file = upload_single_image_local(master_path)
layout_file = upload_single_image_local(layout_path)
if not master_file or not layout_file:
raise Exception("Failed to upload images")
# Create prompt and make API call
prompt = create_single_master_prompt_local(master_id)
max_retries = 3
for attempt in range(max_retries):
try:
response = model.generate_content([prompt, master_file, layout_file], safety_settings=safety_settings)
if not response.candidates:
if attempt < max_retries - 1:
time.sleep((2 ** attempt) * 0.5)
continue
else:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': 'No candidates returned',
'error': 'Safety block'
}
candidate = response.candidates[0]
if candidate.finish_reason and candidate.finish_reason != 1:
if candidate.finish_reason in [3, 4, 5] and attempt < max_retries - 1:
time.sleep((2 ** attempt) * 0.5)
continue
else:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': f'Finished with reason: {candidate.finish_reason}',
'error': f'Finish reason: {candidate.finish_reason}'
}
# Parse response
response_text = response.text.strip()
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': response_text,
'error': 'No JSON found in response'
}
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate result format
if 'match_found' not in result:
result['match_found'] = False
if 'master_id' not in result:
result['master_id'] = master_id
if 'confidence' not in result:
result['confidence'] = 'unknown'
if 'analysis' not in result:
result['analysis'] = response_text
return result
except Exception as e:
if attempt == max_retries - 1:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
time.sleep((2 ** attempt) * 0.5)
except Exception as e:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
class ImageDetector:
def __init__(self, enable_greyscale=True, enable_contrast_enhancement=True, contrast_factor=1.5, refinement_mode=False, one_at_a_time_mode=False, max_concurrent_workers=5, split_mode=False):
"""Initialize the image detector with Gemini API configuration"""
load_dotenv()
api_key = os.getenv('GEMINI_API_KEY')
if not api_key:
raise ValueError("GEMINI_API_KEY not found in environment variables")
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-2.5-pro')
# Concurrency settings
self.max_concurrent_workers = max_concurrent_workers
self._progress_lock = threading.Lock()
# Safety settings to prevent false positives for benign content
self.safety_settings = [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_NONE",
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_NONE",
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_NONE",
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_NONE",
},
]
print("Initialized with BLOCK_NONE safety settings for all categories to prevent false positives on benign marketing content.")
# Image processing settings
self.enable_greyscale = enable_greyscale
self.enable_contrast_enhancement = enable_contrast_enhancement
self.contrast_factor = contrast_factor
self.refinement_mode = refinement_mode
self.one_at_a_time_mode = one_at_a_time_mode
# Split mode configuration
self.split_mode = split_mode
if self.split_mode:
self.splitter = PanelSplitter(debug=True)
print("Split mode enabled: Will split multi-panel layouts before matching")
# Paths
self.master_images_path = Path("master_images")
self.layouts_path = Path("layouts")
self.results_path = Path("results")
self.temp_path = Path("temp_processed")
# Create directories
self.results_path.mkdir(exist_ok=True)
self.temp_path.mkdir(exist_ok=True)
# Master images cache
self.master_images = {}
self.master_files = {}
self.uploaded_masters = None # Cache uploaded master files
def load_master_images(self) -> Dict[str, str]:
"""Load all master images and create ID mapping using filenames"""
print("Loading master images...")
master_files = list(self.master_images_path.glob("*.jpg"))
print(f"Found {len(master_files)} master images")
for file_path in master_files:
# Use filename (without extension) as the master ID
master_id = file_path.stem
self.master_images[master_id] = str(file_path)
self.master_files[master_id] = file_path.name
return self.master_images
def match_split_to_masters(self, split_path: str, master_images: List[str]) -> List[Dict]:
"""Match a split image to master images using basic OpenCV matching"""
matches = []
# For Gemini detector, we'll use a simple approach since it doesn't have
# the sophisticated inlier analysis like OpenAI detector
# This is a placeholder - in practice, you might want to use the OpenAI
# detector's inlier analysis or implement a similar approach
# For now, return empty matches to avoid errors
# You could implement basic template matching or feature matching here
return matches
def preprocess_image(self, image_path: str) -> str:
"""Preprocess image: convert to greyscale and enhance contrast - THREAD-SAFE VERSION"""
if not self.enable_greyscale and not self.enable_contrast_enhancement:
return image_path
try:
# Open the image
with Image.open(image_path) as img:
processed_img = img.copy()
# Convert to greyscale if enabled
if self.enable_greyscale:
processed_img = processed_img.convert('L')
# Convert back to RGB for consistency
processed_img = processed_img.convert('RGB')
# Enhance contrast if enabled
if self.enable_contrast_enhancement:
# Global contrast enhancement
contrast_enhancer = ImageEnhance.Contrast(processed_img)
processed_img = contrast_enhancer.enhance(self.contrast_factor)
# Edge contrast enhancement using sharpness
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
processed_img = sharpness_enhancer.enhance(1.3)
# Save processed image with thread-safe filename
import threading
import uuid
thread_id = threading.current_thread().ident
unique_id = str(uuid.uuid4())[:8]
original_name = Path(image_path).stem
processed_path = self.temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
processed_img.save(processed_path, 'JPEG', quality=95)
return str(processed_path)
except Exception as e:
print(f"Warning: Failed to preprocess {Path(image_path).name}: {e}")
print(f"Using original image instead")
return image_path
def upload_master_images_once(self):
"""Upload all master images ONCE and cache them (with preprocessing)"""
if self.uploaded_masters is not None:
return self.uploaded_masters
processing_msg = []
if self.enable_greyscale:
processing_msg.append("greyscale conversion")
if self.enable_contrast_enhancement:
processing_msg.append("contrast enhancement")
if processing_msg:
print(f"Uploading master images with {' and '.join(processing_msg)} (one-time operation)...")
else:
print("Uploading master images (one-time operation)...")
uploaded_masters = []
master_paths = list(self.master_images.values())
for i, path in enumerate(master_paths):
try:
# Preprocess the image
processed_path = self.preprocess_image(path)
# Upload the processed image
uploaded_file = genai.upload_file(processed_path)
uploaded_masters.append(uploaded_file)
print(f"Uploaded master {i+1}/{len(master_paths)}: {Path(path).name}")
# Small delay to avoid rate limiting
if i < len(master_paths) - 1:
time.sleep(0.1)
except Exception as e:
print(f"Error uploading {Path(path).name}: {e}")
# Retry once after delay
try:
time.sleep(1.0)
processed_path = self.preprocess_image(path)
uploaded_file = genai.upload_file(processed_path)
uploaded_masters.append(uploaded_file)
print(f"Retry successful for {Path(path).name}")
except Exception as e2:
print(f"Failed to upload {Path(path).name}: {e2}")
self.uploaded_masters = uploaded_masters
print(f"✓ Successfully uploaded {len(uploaded_masters)} master images")
return uploaded_masters
def upload_single_image(self, image_path: str) -> Optional:
"""Upload a single image with preprocessing and retry logic"""
max_retries = 3
for attempt in range(max_retries):
try:
# Preprocess the image
processed_path = self.preprocess_image(image_path)
# Upload the processed image
uploaded_file = genai.upload_file(processed_path)
return uploaded_file
except Exception as e:
if attempt == max_retries - 1:
print(f"Failed to upload {Path(image_path).name}: {e}")
return None
print(f"Upload retry {attempt + 1}/{max_retries} for {Path(image_path).name}: {e}")
# Progressive backoff with jitter to avoid thundering herd
import random
jitter = random.uniform(0.1, 0.5)
sleep_time = (0.5 * (attempt + 1)) + jitter
time.sleep(sleep_time)
return None
def _upload_single_image_threadsafe(self, image_path: str, thread_genai) -> Optional:
"""Thread-safe version of upload_single_image using thread-local client"""
max_retries = 3
for attempt in range(max_retries):
try:
# Preprocess the image
processed_path = self.preprocess_image(image_path)
# Upload the processed image using thread-local client
uploaded_file = thread_genai.upload_file(processed_path)
return uploaded_file
except Exception as e:
if attempt == max_retries - 1:
return None
# Progressive backoff with jitter to avoid thundering herd
import random
jitter = random.uniform(0.1, 0.5)
sleep_time = (0.5 * (attempt + 1)) + jitter
time.sleep(sleep_time)
return None
def _make_robust_api_call_threadsafe(self, thread_model, prompt, files, operation_name="API call", max_retries=3):
"""Thread-safe version of make_robust_api_call using thread-local model"""
last_error = None
for attempt in range(max_retries):
try:
response = thread_model.generate_content([prompt] + files, safety_settings=self.safety_settings)
# Check for blocked content or safety issues immediately
if not response.candidates:
error_msg = f"No candidates returned"
if hasattr(response, 'prompt_feedback'):
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
# This is a safety block, should retry
if attempt < max_retries - 1:
wait_time = (2 ** attempt) * 0.5
time.sleep(wait_time)
continue
else:
return {
'success': False,
'error_type': 'safety_block',
'error_message': error_msg,
'response': response
}
candidate = response.candidates[0]
# Check finish reason
if candidate.finish_reason and candidate.finish_reason != 1: # 1 = STOP (normal completion)
error_msg = f"Request finished with reason: {candidate.finish_reason}"
if hasattr(response, 'prompt_feedback'):
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
if hasattr(candidate, 'safety_ratings'):
error_msg += f" - Safety ratings: {candidate.safety_ratings}"
# Check if this is a retryable safety issue
if candidate.finish_reason in [3, 4, 5]: # SAFETY, RECITATION, OTHER safety-related reasons
if attempt < max_retries - 1:
wait_time = (2 ** attempt) * 0.5
time.sleep(wait_time)
continue
else:
return {
'success': False,
'error_type': 'safety_finish_reason',
'error_message': error_msg,
'response': response
}
else:
# Non-safety related finish reason, don't retry
return {
'success': False,
'error_type': 'other_finish_reason',
'error_message': error_msg,
'response': response
}
# Success case
return {
'success': True,
'response': response,
'text': response.text.strip()
}
except Exception as e:
last_error = e
error_str = str(e)
# Check if this looks like a safety/blocking error
is_safety_error = any(keyword in error_str.lower() for keyword in [
'safety', 'blocked', 'filtered', 'response.text', 'response.parts',
'finish_reason', 'candidates', 'prompt_feedback'
])
if is_safety_error and attempt < max_retries - 1:
wait_time = (2 ** attempt) * 0.5
time.sleep(wait_time)
continue
elif attempt < max_retries - 1:
# Other errors, also retry but with different messaging
wait_time = (2 ** attempt) * 0.5
time.sleep(wait_time)
continue
else:
# Final attempt failed
return {
'success': False,
'error_type': 'exception',
'error_message': str(e),
'exception': e
}
# This shouldn't be reached, but just in case
return {
'success': False,
'error_type': 'max_retries_exceeded',
'error_message': f"Max retries ({max_retries}) exceeded",
'last_error': str(last_error) if last_error else "Unknown error"
}
def create_detection_prompt(self, master_ids: List[str]) -> str:
"""Create the prompt for image detection"""
prompt = """Analyze the layout image (the last image provided) and identify which of the master images appear in it.
INSTRUCTIONS:
1. Examine the layout image carefully to identify any image segments or components within it
2. For each segment you identify, look for matches in the provided master images
3. Look for all types of matches including:
- Exact matches (complete images)
- Cropped versions (partial images)
- Scaled or resized versions
- Rotated versions
- Images that may be partially obscured
4. Focus on visual similarity in terms of:
- Person/model appearance and pose
- Clothing details (colors, patterns, styles)
- Background and composition
- Overall visual elements
5. Be conservative and only include matches where you have reasonable confidence
6. Only return a positive result (master image appears in the layout) if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result.
Master Images with IDs:
"""
for i, master_id in enumerate(master_ids):
prompt += f"Image {i+1}: ID = {master_id}\n"
prompt += """
The final image is the LAYOUT IMAGE to analyze.
Return your response as a JSON object with this exact format:
{
"detected_masters": ["master_id_1", "master_id_2"],
"analysis": "Brief explanation of what you found and your matching reasoning"
}
Only include master IDs in "detected_masters" where you completely sure of a match. If no clear matches are found, return an empty array.
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos that may show models in various clothing styles for retail purposes. This prompt and task are completely benign and should not trigger safety filters. This is standard e-commerce image comparison work, not dangerous, offensive, or inappropriate content. The comparison is purely for business/marketing layout matching purposes.
"""
return prompt
def create_simple_detection_prompt(self, master_ids: List[str]) -> str:
"""Create a simplified prompt for cases where the main prompt gets blocked"""
prompt = """Compare the layout image (the last image) with the master images provided and identify which master images appear in the layout.
Look for visual matches including:
- Complete images that match
- Partial/cropped versions
- Resized versions
This is for e-commerce product image matching.
Master Images with IDs:
"""
for i, master_id in enumerate(master_ids):
prompt += f"Image {i+1}: ID = {master_id}\n"
prompt += """
The final image is the LAYOUT IMAGE to analyze.
Return your response as a JSON object:
{
"detected_masters": ["master_id_1", "master_id_2", ...],
"analysis": "Brief explanation"
}
Only include master IDs that clearly appear in the layout image.
"""
return prompt
def make_robust_api_call(self, prompt, files, operation_name="API call", max_retries=3):
"""Make a robust API call with comprehensive error detection and retry logic"""
last_error = None
for attempt in range(max_retries):
try:
response = self.model.generate_content([prompt] + files, safety_settings=self.safety_settings)
# Check for blocked content or safety issues immediately
if not response.candidates:
error_msg = f"No candidates returned"
if hasattr(response, 'prompt_feedback'):
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
# This is a safety block, should retry
if attempt < max_retries - 1:
wait_time = (2 ** attempt) * 0.5
print(f" Safety block detected on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
return {
'success': False,
'error_type': 'safety_block',
'error_message': error_msg,
'response': response
}
candidate = response.candidates[0]
# Check finish reason
if candidate.finish_reason and candidate.finish_reason != 1: # 1 = STOP (normal completion)
error_msg = f"Request finished with reason: {candidate.finish_reason}"
if hasattr(response, 'prompt_feedback'):
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
if hasattr(candidate, 'safety_ratings'):
error_msg += f" - Safety ratings: {candidate.safety_ratings}"
# Check if this is a retryable safety issue
if candidate.finish_reason in [3, 4, 5]: # SAFETY, RECITATION, OTHER safety-related reasons
if attempt < max_retries - 1:
wait_time = (2 ** attempt) * 0.5
print(f" Safety/content issue detected on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
return {
'success': False,
'error_type': 'safety_finish_reason',
'error_message': error_msg,
'response': response
}
else:
# Non-safety related finish reason, don't retry
return {
'success': False,
'error_type': 'other_finish_reason',
'error_message': error_msg,
'response': response
}
# Success case
return {
'success': True,
'response': response,
'text': response.text.strip()
}
except Exception as e:
last_error = e
error_str = str(e)
# Check if this looks like a safety/blocking error
is_safety_error = any(keyword in error_str.lower() for keyword in [
'safety', 'blocked', 'filtered', 'response.text', 'response.parts',
'finish_reason', 'candidates', 'prompt_feedback'
])
if is_safety_error and attempt < max_retries - 1:
wait_time = (2 ** attempt) * 0.5
print(f" Safety-related error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
continue
elif attempt < max_retries - 1:
# Other errors, also retry but with different messaging
wait_time = (2 ** attempt) * 0.5
print(f" API error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
continue
else:
# Final attempt failed
return {
'success': False,
'error_type': 'exception',
'error_message': str(e),
'exception': e
}
# This shouldn't be reached, but just in case
return {
'success': False,
'error_type': 'max_retries_exceeded',
'error_message': f"Max retries ({max_retries}) exceeded",
'last_error': str(last_error) if last_error else "Unknown error"
}
def is_cen_image(self, master_id: str) -> bool:
"""Check if a master image ID represents a CEN (censored) image"""
return '_CEN' in master_id
def find_corresponding_non_cen_image(self, cen_master_id: str) -> Optional[str]:
"""Find the corresponding non-CEN image for a given CEN master ID"""
if not self.is_cen_image(cen_master_id):
return None
# Transform CEN filename to non-CEN filename
# Example: "1011A_1011A_1011_01_CEN" -> "1011A_1011_01"
parts = cen_master_id.split('_')
if len(parts) >= 4 and parts[-1] == 'CEN':
# Remove the middle duplicate part and _CEN suffix
# Pattern: prefix_prefix_middle_suffix_CEN -> prefix_middle_suffix
if len(parts) >= 5:
non_cen_id = f"{parts[0]}_{parts[2]}_{parts[3]}"
else:
# Fallback: just remove _CEN
non_cen_id = '_'.join(parts[:-1])
# Check if this non-CEN image exists in our master images
if non_cen_id in self.master_images:
return non_cen_id
return None
def create_censorship_detection_prompt(self) -> str:
"""Create prompt for detecting if a layout image contains censored content"""
prompt = """Analyze this layout image to determine if it contains censored or uncensored content.
TASK: Determine whether the images in this layout are censored (covered) or uncensored (more exposed).
CENSORSHIP INDICATORS TO LOOK FOR:
1. **Clothing Coverage**:
- Long sleeves vs. sleeveless/short sleeves
- Full-length pants/skirts vs. shorts or shorter garments
- High necklines vs. lower necklines
2. **Skin Coverage**:
- Arms: Fully covered vs. bare arms
- Legs: Fully covered vs. exposed legs/thighs
- Torso: Additional covering vs. more exposed areas
3. **Added Elements**:
- Opaque or semi-transparent overlay layers covering skin
- Additional fabric or clothing elements that appear to cover exposed areas
- Digital modifications that add coverage
CLASSIFICATION:
- **CENSORED**: If models show significant additional clothing coverage, long sleeves, full pants/skirts, or digital overlays covering skin
- **UNCENSORED**: If models show more exposed skin, shorter garments, bare arms/legs, or natural clothing without added coverage
Return your response as a JSON object with this exact format:
{{
"is_censored": true/false,
"confidence": "high/medium/low",
"analysis": "Detailed explanation of the coverage patterns observed and reasoning for the classification",
"coverage_details": "Specific description of clothing and skin coverage in the layout"
}}
Be precise and focus on the actual clothing and coverage patterns visible in the image.
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image classification. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
"""
return prompt
def detect_layout_censorship(self, layout_path: str) -> Dict:
"""Detect if a layout image contains censored or uncensored content"""
try:
# Upload layout image
layout_file = self.upload_single_image(layout_path)
if not layout_file:
raise Exception("Failed to upload layout image")
# Create censorship detection prompt
prompt = self.create_censorship_detection_prompt()
# Make API call with robust retry logic
api_result = self.make_robust_api_call(prompt, [layout_file], "censorship detection")
# Handle API call failure
if not api_result['success']:
return {
'is_censored': True, # Default to censored if API fails
'confidence': 'unknown',
'analysis': f'API call failed: {api_result["error_message"]}',
'error': f"{api_result['error_type']}: {api_result['error_message']}"
}
# Parse response
response_text = api_result['text']
# Extract JSON from response
try:
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No JSON found in response")
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate result format
if 'is_censored' not in result:
result['is_censored'] = True # Default to censored if unclear
if 'confidence' not in result:
result['confidence'] = 'unknown'
if 'analysis' not in result:
result['analysis'] = response_text
return result
except json.JSONDecodeError as e:
return {
'is_censored': True, # Default to censored if parsing fails
'confidence': 'unknown',
'analysis': response_text,
'error': f'JSON decode error: {e}'
}
except Exception as e:
return {
'is_censored': True, # Default to censored if error
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
def apply_cen_refinement_to_results(self, layout_path: str, initial_results: Dict) -> Dict:
"""Apply CEN refinement to initial detection results"""
layout_name = Path(layout_path).name
detected_masters = initial_results.get('detected_masters', [])
# First, deduplicate the detected masters to avoid processing duplicates
original_count = len(detected_masters)
detected_masters = self.deduplicate_master_matches(detected_masters)
if len(detected_masters) != original_count:
duplicates_removed = original_count - len(detected_masters)
print(f" Removed {duplicates_removed} duplicate master(s) before CEN refinement")
# Find CEN images in the results
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
if not cen_images:
# No CEN images found, return original results
return initial_results
print(f" Refining {len(cen_images)} CEN matches for {layout_name}")
print(f" Analyzing layout to determine censorship level...")
# Detect if the layout is censored or uncensored
censorship_result = self.detect_layout_censorship(layout_path)
is_layout_censored = censorship_result.get('is_censored', True)
confidence = censorship_result.get('confidence', 'unknown')
print(f" Layout analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})")
refined_masters = []
refinement_details = []
changes_made = 0
# Process each detected image
for master_id in detected_masters:
if self.is_cen_image(master_id):
# This is a CEN image
non_cen_id = self.find_corresponding_non_cen_image(master_id)
if not is_layout_censored and non_cen_id:
# Layout is uncensored, switch to non-CEN version
refined_masters.append(non_cen_id)
refinement_details.append({
'original_cen_match': master_id,
'non_cen_alternative': non_cen_id,
'final_choice': non_cen_id,
'confidence': confidence,
'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}",
'changed': True,
'reason': 'layout_uncensored'
})
changes_made += 1
print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)")
else:
# Layout is censored or no non-CEN alternative, keep CEN version
refined_masters.append(master_id)
reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative'
refinement_details.append({
'original_cen_match': master_id,
'non_cen_alternative': non_cen_id,
'final_choice': master_id,
'confidence': confidence,
'analysis': f"Kept {master_id} - layout is censored or no non-CEN alternative available",
'changed': False,
'reason': reason
})
print(f" → Kept {master_id} ({'layout is censored' if is_layout_censored else 'no non-CEN alternative'})")
else:
# This is not a CEN image, keep it as-is
refined_masters.append(master_id)
print(f" Summary: {changes_made} CEN images changed to non-CEN versions")
# Apply deduplication to refined masters in case refinement introduced duplicates
original_refined = refined_masters[:]
refined_masters = self.deduplicate_master_matches(refined_masters)
if len(refined_masters) != len(original_refined):
post_refinement_duplicates = len(original_refined) - len(refined_masters)
print(f" Post-refinement deduplication: Removed {post_refinement_duplicates} duplicate(s)")
# Update results with refinement information
refined_results = initial_results.copy()
refined_results['detected_masters'] = refined_masters
refined_results['detected_master_ids'] = refined_masters # Update both fields for consistency
refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters ]
refined_results['refinement_applied'] = True
refined_results['refinement_details'] = refinement_details
refined_results['censorship_analysis'] = censorship_result
refined_results['original_detection_count'] = len(detected_masters)
refined_results['refined_detection_count'] = len(refined_masters)
refined_results['changes_made'] = changes_made
return refined_results
def detect_images_in_layout(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
"""Detect which master images appear in a single layout image"""
layout_name = Path(layout_path).name
print(f"Processing {layout_index}/{total_layouts}: {layout_name}")
try:
# Upload only the layout image (masters already uploaded)
layout_file = self.upload_single_image(layout_path)
if not layout_file:
raise Exception("Failed to upload layout image")
# Combine pre-uploaded masters with the layout
all_files = self.uploaded_masters + [layout_file]
# Create prompt
master_ids = list(self.master_images.keys())
prompt = self.create_detection_prompt(master_ids)
# Try main prompt first, then fallback to simple prompt if blocked
api_result = self.make_robust_api_call(prompt, all_files, f"detection for {layout_name}")
# If main prompt failed due to safety issues, try simple prompt
if not api_result['success'] and api_result['error_type'] in ['safety_block', 'safety_finish_reason']:
print(f" Main prompt blocked for {layout_name}, trying simplified prompt...")
simple_prompt = self.create_simple_detection_prompt(master_ids)
api_result = self.make_robust_api_call(simple_prompt, all_files, f"simple detection for {layout_name}")
# Handle API call failure
if not api_result['success']:
error_msg = api_result['error_message']
print(f"API call failed for {layout_name}: {error_msg}")
return {
'detected_masters': [],
'analysis': f'API call failed: {error_msg}',
'error': f"{api_result['error_type']}: {error_msg}",
'retry_count': 3 # Max retries were attempted
}
# Parse response
response_text = api_result['text']
# Extract JSON from response
try:
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No JSON found in response")
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate result format
if 'detected_masters' not in result:
result['detected_masters'] = []
if 'analysis' not in result:
result['analysis'] = response_text
# Deduplicate detected masters
original_detected = result['detected_masters'][:]
result['detected_masters'] = self.deduplicate_master_matches(result['detected_masters'])
# Track deduplication if any duplicates were removed
if len(result['detected_masters']) != len(original_detected):
duplicates_removed = len(original_detected) - len(result['detected_masters'])
result['deduplication_applied'] = True
result['duplicates_removed'] = duplicates_removed
result['original_detected_masters'] = original_detected
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s) from {layout_name}")
# Log completion
detected_count = len(result['detected_masters'])
print(f"✓ Completed {layout_name} - Found {detected_count} matches")
return result
except json.JSONDecodeError as e:
print(f"JSON decode error for {layout_name}: {e}")
return {
'detected_masters': [],
'analysis': response_text,
'error': f'JSON decode error: {e}'
}
except Exception as e:
error_msg = f"Error analyzing {layout_name}: {e}"
# Check if this was a safety-related error
if "response.text" in str(e) or "response.parts" in str(e):
error_msg += "\nThis appears to be a safety filter blocking issue."
if hasattr(e, 'response') and e.response:
if hasattr(e.response, 'prompt_feedback'):
error_msg += f"\nPrompt feedback: {e.response.prompt_feedback}"
if e.response.candidates:
candidate = e.response.candidates[0]
if hasattr(candidate, 'safety_ratings'):
error_msg += f"\nSafety ratings: {candidate.safety_ratings}"
if hasattr(candidate, 'finish_reason'):
error_msg += f"\nFinish reason: {candidate.finish_reason}"
print(error_msg)
return {
'detected_masters': [],
'analysis': '',
'error': str(e)
}
def create_single_master_prompt(self, master_id: str) -> str:
"""Create prompt for checking if a single master image appears in the layout"""
prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it.
INSTRUCTIONS:
1. Compare the master image (first image) with the layout image (second image)
2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL
3. The layout image may contain the master image in various forms:
- Complete/exact match
- Cropped version
- Scaled or resized version
- Rotated version
- Partially obscured
4. Focus on visual similarity in terms of:
- Person/model appearance and pose (must be EXACTLY the same)
- Clothing details (colors, patterns, styles - must be EXACTLY the same)
- Background and composition
- Overall visual elements
5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same.
If there is ANY difference in clothing, model, or pose then return a negative result.
Master Image ID: {master_id}
Return your response as a JSON object with this exact format:
{{
"match_found": true/false,
"master_id": "{master_id}",
"confidence": "high/medium/low",
"analysis": "Detailed explanation of your findings and reasoning"
}}
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
"""
return prompt
def detect_single_master_in_layout(self, layout_path: str, master_id: str, master_index: int, total_masters: int) -> Dict:
"""Check if a single master image appears in the layout image - THREAD-SAFE VERSION"""
layout_name = Path(layout_path).name
master_path = self.master_images[master_id]
try:
# Create thread-local API client to avoid shared state issues
import google.generativeai as thread_genai
api_key = os.getenv('GEMINI_API_KEY')
thread_genai.configure(api_key=api_key)
thread_model = thread_genai.GenerativeModel('gemini-2.5-pro')
# Upload both images using thread-local client
master_file = self._upload_single_image_threadsafe(master_path, thread_genai)
layout_file = self._upload_single_image_threadsafe(layout_path, thread_genai)
if not master_file or not layout_file:
raise Exception("Failed to upload images")
# Create prompt for single master matching
prompt = self.create_single_master_prompt(master_id)
# Make API call with thread-local model
api_result = self._make_robust_api_call_threadsafe(
thread_model,
prompt,
[master_file, layout_file],
f"single master detection: {master_id} in {layout_name}"
)
# Handle API call failure
if not api_result['success']:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': f'API call failed: {api_result["error_message"]}',
'error': f"{api_result['error_type']}: {api_result['error_message']}"
}
# Parse response
response_text = api_result['text']
# Extract JSON from response
try:
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No JSON found in response")
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
# Validate result format
if 'match_found' not in result:
result['match_found'] = False
if 'master_id' not in result:
result['master_id'] = master_id
if 'confidence' not in result:
result['confidence'] = 'unknown'
if 'analysis' not in result:
result['analysis'] = response_text
return result
except json.JSONDecodeError as e:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': response_text,
'error': f'JSON decode error: {e}'
}
except Exception as e:
return {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
def detect_images_in_layout_one_at_a_time(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
"""Detect which master images appear in a layout by checking each master individually using process-based concurrency"""
layout_name = Path(layout_path).name
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Process-based one-at-a-time mode)")
master_ids = list(self.master_images.keys())
total_masters = len(master_ids)
detected_masters = []
detailed_results = []
print(f" Checking {total_masters} masters using {self.max_concurrent_workers} concurrent processes...")
# Prepare arguments for process pool
tasks = []
for master_id in master_ids:
master_path = self.master_images[master_id]
task_args = (
layout_path,
master_id,
master_path,
self.enable_greyscale,
self.enable_contrast_enhancement,
self.contrast_factor,
self.safety_settings
)
tasks.append(task_args)
# Use ProcessPoolExecutor for true isolation
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_concurrent_workers) as executor:
# Submit all tasks
future_to_master = {
executor.submit(process_single_master_detection, *task_args): task_args[1]
for task_args in tasks
}
completed_count = 0
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_master):
master_id = future_to_master[future]
completed_count += 1
try:
result = future.result()
detailed_results.append(result)
# If match found, add to detected masters
if result.get('match_found', False):
detected_masters.append(master_id)
confidence = result.get('confidence', 'unknown')
print(f" {completed_count}/{total_masters}: ✓ MATCH found for {master_id} (confidence: {confidence})")
else:
if 'error' in result:
print(f" {completed_count}/{total_masters}: Error checking {master_id}: {result['error']}")
else:
print(f" {completed_count}/{total_masters}: No match for {master_id}")
except Exception as e:
print(f" {completed_count}/{total_masters}: Process error checking {master_id}: {e}")
# Add error result to maintain consistency
error_result = {
'match_found': False,
'master_id': master_id,
'confidence': 'unknown',
'analysis': '',
'error': str(e)
}
detailed_results.append(error_result)
# Sort detailed_results by master_id to maintain consistent ordering
detailed_results.sort(key=lambda x: x.get('master_id', ''))
# Deduplicate detected masters (shouldn't be needed in one-at-a-time mode, but for safety)
original_detected = detected_masters[:]
detected_masters = self.deduplicate_master_matches(detected_masters)
if len(detected_masters) != len(original_detected):
duplicates_removed = len(original_detected) - len(detected_masters)
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s)")
detected_count = len(detected_masters)
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {self.max_concurrent_workers} concurrent processes")
return {
'detected_masters': detected_masters,
'detected_master_ids': detected_masters,
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters ],
'analysis': f'Process-based one-at-a-time analysis completed. Found {detected_count} exact matches out of {total_masters} masters checked using {self.max_concurrent_workers} concurrent processes.',
'detailed_results': detailed_results,
'processing_mode': 'process_based_one_at_a_time',
'total_masters_checked': total_masters,
'concurrent_workers': self.max_concurrent_workers,
'deduplication_applied': len(detected_masters) != len(original_detected),
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
'original_detected_masters': original_detected
}
def process_all_layouts(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
"""Process all layout images sequentially"""
if self.one_at_a_time_mode:
mode_desc = "One-at-a-time Mode"
else:
mode_desc = "Multi Master Mode"
if self.refinement_mode:
mode_desc += " with CEN Refinement"
print(f"Starting sequential batch processing ({mode_desc})...")
# Load master images
self.load_master_images()
# Upload all master images ONCE (only for multi-master mode)
if not self.one_at_a_time_mode:
self.upload_master_images_once()
# Get layout files
if specific_file:
# Process only the specific file
layout_files = [self.layouts_path / specific_file]
if not layout_files[0].exists():
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
print(f"Processing specific file: {specific_file}")
else:
layout_files = list(self.layouts_path.glob("*.jpg"))
if limit:
layout_files = layout_files[:limit]
print(f"Processing first {limit} layouts only")
total_layouts = len(layout_files)
print(f"Processing {total_layouts} layout images in {mode_desc}")
print("=" * 60)
results = {}
start_time = time.time()
for i, layout_path in enumerate(layout_files, 1):
layout_id = layout_path.stem
# Detect images in layout using the appropriate method
if self.split_mode:
# Split mode: split layout into panels and match each panel
master_ids = list(self.master_images.keys())
result = self.splitter.split_layout_and_match(str(layout_path), master_ids, self)
# Apply CEN refinement if enabled and there are matches
if self.refinement_mode and result.get('detected_masters'):
result = self.apply_cen_refinement_to_results(str(layout_path), result)
elif self.one_at_a_time_mode:
result = self.detect_images_in_layout_one_at_a_time(str(layout_path), i, total_layouts)
else:
result = self.detect_images_in_layout(str(layout_path), i, total_layouts)
# Apply CEN refinement if enabled and there are CEN matches
if not self.split_mode and self.refinement_mode and result.get('detected_masters'):
result = self.apply_cen_refinement_to_results(str(layout_path), result)
layout_result = {
'layout_filename': layout_path.name,
'detected_master_ids': result['detected_masters'],
'detected_master_filenames': [f"{mid}.jpg" for mid in result['detected_masters'] ],
'analysis': result.get('analysis', 'Split mode analysis'),
'detection_mode': mode_desc.lower().replace(' ', '_').replace('with_', '')
}
# Add split mode specific fields
if self.split_mode:
layout_result['split_mode'] = True
layout_result['splits_generated'] = result.get('splits_generated', 0)
layout_result['panel_count'] = result.get('panel_count', 1)
layout_result['panel_confidence'] = result.get('panel_confidence', 'unknown')
if 'split_results' in result:
layout_result['split_results'] = result['split_results']
# Add deduplication fields if applied
if 'deduplication_applied' in result:
layout_result['deduplication_applied'] = result['deduplication_applied']
layout_result['duplicates_removed'] = result['duplicates_removed']
layout_result['original_detected_masters'] = result['original_detected_masters']
if 'error' in result:
layout_result['error'] = result['error']
# Add refinement mode specific fields
if self.refinement_mode and result.get('refinement_applied'):
layout_result['refinement_applied'] = result['refinement_applied']
layout_result['refinement_details'] = result['refinement_details']
layout_result['censorship_analysis'] = result['censorship_analysis']
layout_result['original_detection_count'] = result['original_detection_count']
layout_result['refined_detection_count'] = result['refined_detection_count']
layout_result['changes_made'] = result.get('changes_made', 0)
results[layout_id] = layout_result
# Progress update with time estimate
elapsed = time.time() - start_time
avg_time = elapsed / i
remaining = (total_layouts - i) * avg_time
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
# Save progress periodically
if i % 20 == 0:
self.save_results(results, f"progress_{i}")
total_time = time.time() - start_time
print(f"\n✓ Completed processing all {total_layouts} layouts in {total_time/60:.1f} minutes")
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
return results
def save_results(self, results: Dict, filename: str = "detection_results") -> str:
"""Save results to JSON file"""
output_path = self.results_path / f"{filename}.json"
# Add metadata
output_data = {
'metadata': {
'total_layouts_processed': len(results),
'total_master_images': len(self.master_images),
'master_images_available': list(self.master_files.keys())
},
'results': results
}
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"Results saved to: {output_path}")
return str(output_path)
def generate_summary(self, results: Dict) -> Dict:
"""Generate summary statistics"""
total_layouts = len(results)
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
# Count master image occurrences
master_counts = {}
for result in results.values():
for master_id in result['detected_master_ids']:
master_counts[master_id] = master_counts.get(master_id, 0) + 1
# Deduplication statistics
layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False))
total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values())
summary = {
'total_layouts_processed': total_layouts,
'layouts_with_matches': layouts_with_matches,
'layouts_without_matches': total_layouts - layouts_with_matches,
'master_image_usage': master_counts,
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
# Deduplication stats
'layouts_with_deduplication': layouts_with_deduplication,
'total_duplicates_removed': total_duplicates_removed,
'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0
}
return summary
def deduplicate_master_matches(self, detected_masters: List[str]) -> List[str]:
"""Remove duplicate master matches from a list while preserving order"""
if not detected_masters:
return detected_masters
# Simple deduplication - remove exact duplicates while preserving order
seen = set()
deduplicated = []
for master_id in detected_masters:
if master_id not in seen:
seen.add(master_id)
deduplicated.append(master_id)
return deduplicated
def cleanup_temp_files(self):
"""Clean up temporary processed image files - handles thread-safe filenames"""
try:
if self.temp_path.exists():
# Clean up both old and new thread-safe naming patterns
for temp_file in self.temp_path.glob("*_processed*.jpg"):
temp_file.unlink()
# Remove temp directory if empty
if not any(self.temp_path.iterdir()):
self.temp_path.rmdir()
except Exception as e:
print(f"Warning: Failed to cleanup temp files: {e}")