1501 lines
No EOL
67 KiB
Python
1501 lines
No EOL
67 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Gemini Image Detector - Extracted ImageDetector class
|
|
Uses Google Gemini 2.5 Pro API to detect which master images appear in layout images
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
import google.generativeai as genai
|
|
from dotenv import load_dotenv
|
|
from PIL import Image, ImageEnhance
|
|
import tempfile
|
|
import numpy as np
|
|
import pickle
|
|
import cv2
|
|
import concurrent.futures
|
|
import threading
|
|
import uuid
|
|
import multiprocessing
|
|
from functools import partial
|
|
from panel_splitter import PanelSplitter
|
|
|
|
|
|
def process_single_master_detection(layout_path, master_id, master_path, enable_greyscale, enable_contrast_enhancement, contrast_factor, safety_settings):
|
|
"""
|
|
Standalone function for processing a single master detection in a separate process.
|
|
This ensures complete isolation from other workers.
|
|
"""
|
|
try:
|
|
# Import and configure in each process to avoid shared state
|
|
import os
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from PIL import Image, ImageEnhance
|
|
import google.generativeai as genai
|
|
from dotenv import load_dotenv
|
|
import uuid
|
|
import threading
|
|
|
|
# Load environment in this process
|
|
load_dotenv()
|
|
api_key = os.getenv('GEMINI_API_KEY')
|
|
if not api_key:
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables")
|
|
|
|
genai.configure(api_key=api_key)
|
|
model = genai.GenerativeModel('gemini-2.5-pro')
|
|
|
|
# Create temp directory for this process
|
|
temp_path = Path("temp_processed")
|
|
temp_path.mkdir(exist_ok=True)
|
|
|
|
def preprocess_image_local(image_path: str) -> str:
|
|
"""Local preprocessing function for this process"""
|
|
if not enable_greyscale and not enable_contrast_enhancement:
|
|
return image_path
|
|
|
|
try:
|
|
with Image.open(image_path) as img:
|
|
processed_img = img.copy()
|
|
|
|
if enable_greyscale:
|
|
processed_img = processed_img.convert('L')
|
|
processed_img = processed_img.convert('RGB')
|
|
|
|
if enable_contrast_enhancement:
|
|
contrast_enhancer = ImageEnhance.Contrast(processed_img)
|
|
processed_img = contrast_enhancer.enhance(contrast_factor)
|
|
|
|
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
|
|
processed_img = sharpness_enhancer.enhance(1.3)
|
|
|
|
# Thread-safe filename
|
|
thread_id = threading.current_thread().ident
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
original_name = Path(image_path).stem
|
|
processed_path = temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
|
|
processed_img.save(processed_path, 'JPEG', quality=95)
|
|
|
|
return str(processed_path)
|
|
|
|
except Exception as e:
|
|
return image_path
|
|
|
|
def upload_single_image_local(image_path: str):
|
|
"""Local upload function for this process"""
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
processed_path = preprocess_image_local(image_path)
|
|
uploaded_file = genai.upload_file(processed_path)
|
|
return uploaded_file
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
return None
|
|
import random
|
|
jitter = random.uniform(0.1, 0.5)
|
|
sleep_time = (0.5 * (attempt + 1)) + jitter
|
|
time.sleep(sleep_time)
|
|
return None
|
|
|
|
def create_single_master_prompt_local(master_id: str) -> str:
|
|
"""Local prompt creation function"""
|
|
prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it.
|
|
|
|
INSTRUCTIONS:
|
|
1. Compare the master image (first image) with the layout image (second image)
|
|
2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL
|
|
3. The layout image may contain the master image in various forms:
|
|
- Complete/exact match
|
|
- Cropped version
|
|
- Scaled or resized version
|
|
- Rotated version
|
|
- Partially obscured
|
|
|
|
4. Focus on visual similarity in terms of:
|
|
- Person/model appearance and pose (must be EXACTLY the same)
|
|
- Clothing details (colors, patterns, styles - must be EXACTLY the same)
|
|
- Background and composition
|
|
- Overall visual elements
|
|
|
|
5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same.
|
|
If there is ANY difference in clothing, model, or pose then return a negative result.
|
|
|
|
Master Image ID: {master_id}
|
|
|
|
Return your response as a JSON object with this exact format:
|
|
{{
|
|
"match_found": true/false,
|
|
"master_id": "{master_id}",
|
|
"confidence": "high/medium/low",
|
|
"analysis": "Detailed explanation of your findings and reasoning"
|
|
}}
|
|
|
|
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
|
|
"""
|
|
return prompt
|
|
|
|
# Upload both images
|
|
master_file = upload_single_image_local(master_path)
|
|
layout_file = upload_single_image_local(layout_path)
|
|
|
|
if not master_file or not layout_file:
|
|
raise Exception("Failed to upload images")
|
|
|
|
# Create prompt and make API call
|
|
prompt = create_single_master_prompt_local(master_id)
|
|
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = model.generate_content([prompt, master_file, layout_file], safety_settings=safety_settings)
|
|
|
|
if not response.candidates:
|
|
if attempt < max_retries - 1:
|
|
time.sleep((2 ** attempt) * 0.5)
|
|
continue
|
|
else:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': 'No candidates returned',
|
|
'error': 'Safety block'
|
|
}
|
|
|
|
candidate = response.candidates[0]
|
|
|
|
if candidate.finish_reason and candidate.finish_reason != 1:
|
|
if candidate.finish_reason in [3, 4, 5] and attempt < max_retries - 1:
|
|
time.sleep((2 ** attempt) * 0.5)
|
|
continue
|
|
else:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': f'Finished with reason: {candidate.finish_reason}',
|
|
'error': f'Finish reason: {candidate.finish_reason}'
|
|
}
|
|
|
|
# Parse response
|
|
response_text = response.text.strip()
|
|
start_idx = response_text.find('{')
|
|
end_idx = response_text.rfind('}') + 1
|
|
|
|
if start_idx == -1 or end_idx == 0:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': response_text,
|
|
'error': 'No JSON found in response'
|
|
}
|
|
|
|
json_str = response_text[start_idx:end_idx]
|
|
result = json.loads(json_str)
|
|
|
|
# Validate result format
|
|
if 'match_found' not in result:
|
|
result['match_found'] = False
|
|
if 'master_id' not in result:
|
|
result['master_id'] = master_id
|
|
if 'confidence' not in result:
|
|
result['confidence'] = 'unknown'
|
|
if 'analysis' not in result:
|
|
result['analysis'] = response_text
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': '',
|
|
'error': str(e)
|
|
}
|
|
time.sleep((2 ** attempt) * 0.5)
|
|
|
|
except Exception as e:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': '',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
class ImageDetector:
|
|
def __init__(self, enable_greyscale=True, enable_contrast_enhancement=True, contrast_factor=1.5, refinement_mode=False, one_at_a_time_mode=False, max_concurrent_workers=5, split_mode=False):
|
|
"""Initialize the image detector with Gemini API configuration"""
|
|
load_dotenv()
|
|
|
|
api_key = os.getenv('GEMINI_API_KEY')
|
|
if not api_key:
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables")
|
|
|
|
genai.configure(api_key=api_key)
|
|
self.model = genai.GenerativeModel('gemini-2.5-pro')
|
|
|
|
# Concurrency settings
|
|
self.max_concurrent_workers = max_concurrent_workers
|
|
self._progress_lock = threading.Lock()
|
|
|
|
# Safety settings to prevent false positives for benign content
|
|
self.safety_settings = [
|
|
{
|
|
"category": "HARM_CATEGORY_HARASSMENT",
|
|
"threshold": "BLOCK_NONE",
|
|
},
|
|
{
|
|
"category": "HARM_CATEGORY_HATE_SPEECH",
|
|
"threshold": "BLOCK_NONE",
|
|
},
|
|
{
|
|
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
|
|
"threshold": "BLOCK_NONE",
|
|
},
|
|
{
|
|
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
|
|
"threshold": "BLOCK_NONE",
|
|
},
|
|
]
|
|
|
|
print("Initialized with BLOCK_NONE safety settings for all categories to prevent false positives on benign marketing content.")
|
|
|
|
# Image processing settings
|
|
self.enable_greyscale = enable_greyscale
|
|
self.enable_contrast_enhancement = enable_contrast_enhancement
|
|
self.contrast_factor = contrast_factor
|
|
self.refinement_mode = refinement_mode
|
|
self.one_at_a_time_mode = one_at_a_time_mode
|
|
|
|
# Split mode configuration
|
|
self.split_mode = split_mode
|
|
if self.split_mode:
|
|
self.splitter = PanelSplitter(debug=True)
|
|
print("Split mode enabled: Will split multi-panel layouts before matching")
|
|
|
|
# Paths
|
|
self.master_images_path = Path("master_images")
|
|
self.layouts_path = Path("layouts")
|
|
self.results_path = Path("results")
|
|
self.temp_path = Path("temp_processed")
|
|
|
|
# Create directories
|
|
self.results_path.mkdir(exist_ok=True)
|
|
self.temp_path.mkdir(exist_ok=True)
|
|
|
|
# Master images cache
|
|
self.master_images = {}
|
|
self.master_files = {}
|
|
self.uploaded_masters = None # Cache uploaded master files
|
|
|
|
|
|
def load_master_images(self) -> Dict[str, str]:
|
|
"""Load all master images and create ID mapping using filenames"""
|
|
print("Loading master images...")
|
|
|
|
master_files = list(self.master_images_path.glob("*.jpg"))
|
|
print(f"Found {len(master_files)} master images")
|
|
|
|
for file_path in master_files:
|
|
# Use filename (without extension) as the master ID
|
|
master_id = file_path.stem
|
|
self.master_images[master_id] = str(file_path)
|
|
self.master_files[master_id] = file_path.name
|
|
|
|
return self.master_images
|
|
|
|
def match_split_to_masters(self, split_path: str, master_images: List[str]) -> List[Dict]:
|
|
"""Match a split image to master images using basic OpenCV matching"""
|
|
matches = []
|
|
|
|
# For Gemini detector, we'll use a simple approach since it doesn't have
|
|
# the sophisticated inlier analysis like OpenAI detector
|
|
# This is a placeholder - in practice, you might want to use the OpenAI
|
|
# detector's inlier analysis or implement a similar approach
|
|
|
|
# For now, return empty matches to avoid errors
|
|
# You could implement basic template matching or feature matching here
|
|
|
|
return matches
|
|
|
|
def preprocess_image(self, image_path: str) -> str:
|
|
"""Preprocess image: convert to greyscale and enhance contrast - THREAD-SAFE VERSION"""
|
|
if not self.enable_greyscale and not self.enable_contrast_enhancement:
|
|
return image_path
|
|
|
|
try:
|
|
# Open the image
|
|
with Image.open(image_path) as img:
|
|
processed_img = img.copy()
|
|
|
|
# Convert to greyscale if enabled
|
|
if self.enable_greyscale:
|
|
processed_img = processed_img.convert('L')
|
|
# Convert back to RGB for consistency
|
|
processed_img = processed_img.convert('RGB')
|
|
|
|
# Enhance contrast if enabled
|
|
if self.enable_contrast_enhancement:
|
|
# Global contrast enhancement
|
|
contrast_enhancer = ImageEnhance.Contrast(processed_img)
|
|
processed_img = contrast_enhancer.enhance(self.contrast_factor)
|
|
|
|
# Edge contrast enhancement using sharpness
|
|
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
|
|
processed_img = sharpness_enhancer.enhance(1.3)
|
|
|
|
# Save processed image with thread-safe filename
|
|
import threading
|
|
import uuid
|
|
thread_id = threading.current_thread().ident
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
original_name = Path(image_path).stem
|
|
processed_path = self.temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
|
|
processed_img.save(processed_path, 'JPEG', quality=95)
|
|
|
|
return str(processed_path)
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Failed to preprocess {Path(image_path).name}: {e}")
|
|
print(f"Using original image instead")
|
|
return image_path
|
|
|
|
def upload_master_images_once(self):
|
|
"""Upload all master images ONCE and cache them (with preprocessing)"""
|
|
if self.uploaded_masters is not None:
|
|
return self.uploaded_masters
|
|
|
|
processing_msg = []
|
|
if self.enable_greyscale:
|
|
processing_msg.append("greyscale conversion")
|
|
if self.enable_contrast_enhancement:
|
|
processing_msg.append("contrast enhancement")
|
|
|
|
if processing_msg:
|
|
print(f"Uploading master images with {' and '.join(processing_msg)} (one-time operation)...")
|
|
else:
|
|
print("Uploading master images (one-time operation)...")
|
|
|
|
uploaded_masters = []
|
|
master_paths = list(self.master_images.values())
|
|
|
|
for i, path in enumerate(master_paths):
|
|
try:
|
|
# Preprocess the image
|
|
processed_path = self.preprocess_image(path)
|
|
|
|
# Upload the processed image
|
|
uploaded_file = genai.upload_file(processed_path)
|
|
uploaded_masters.append(uploaded_file)
|
|
print(f"Uploaded master {i+1}/{len(master_paths)}: {Path(path).name}")
|
|
|
|
# Small delay to avoid rate limiting
|
|
if i < len(master_paths) - 1:
|
|
time.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
print(f"Error uploading {Path(path).name}: {e}")
|
|
# Retry once after delay
|
|
try:
|
|
time.sleep(1.0)
|
|
processed_path = self.preprocess_image(path)
|
|
uploaded_file = genai.upload_file(processed_path)
|
|
uploaded_masters.append(uploaded_file)
|
|
print(f"Retry successful for {Path(path).name}")
|
|
except Exception as e2:
|
|
print(f"Failed to upload {Path(path).name}: {e2}")
|
|
|
|
self.uploaded_masters = uploaded_masters
|
|
print(f"✓ Successfully uploaded {len(uploaded_masters)} master images")
|
|
return uploaded_masters
|
|
|
|
def upload_single_image(self, image_path: str) -> Optional:
|
|
"""Upload a single image with preprocessing and retry logic"""
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Preprocess the image
|
|
processed_path = self.preprocess_image(image_path)
|
|
|
|
# Upload the processed image
|
|
uploaded_file = genai.upload_file(processed_path)
|
|
return uploaded_file
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
print(f"Failed to upload {Path(image_path).name}: {e}")
|
|
return None
|
|
print(f"Upload retry {attempt + 1}/{max_retries} for {Path(image_path).name}: {e}")
|
|
# Progressive backoff with jitter to avoid thundering herd
|
|
import random
|
|
jitter = random.uniform(0.1, 0.5)
|
|
sleep_time = (0.5 * (attempt + 1)) + jitter
|
|
time.sleep(sleep_time)
|
|
return None
|
|
|
|
def _upload_single_image_threadsafe(self, image_path: str, thread_genai) -> Optional:
|
|
"""Thread-safe version of upload_single_image using thread-local client"""
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Preprocess the image
|
|
processed_path = self.preprocess_image(image_path)
|
|
|
|
# Upload the processed image using thread-local client
|
|
uploaded_file = thread_genai.upload_file(processed_path)
|
|
return uploaded_file
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
return None
|
|
# Progressive backoff with jitter to avoid thundering herd
|
|
import random
|
|
jitter = random.uniform(0.1, 0.5)
|
|
sleep_time = (0.5 * (attempt + 1)) + jitter
|
|
time.sleep(sleep_time)
|
|
return None
|
|
|
|
def _make_robust_api_call_threadsafe(self, thread_model, prompt, files, operation_name="API call", max_retries=3):
|
|
"""Thread-safe version of make_robust_api_call using thread-local model"""
|
|
last_error = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = thread_model.generate_content([prompt] + files, safety_settings=self.safety_settings)
|
|
|
|
# Check for blocked content or safety issues immediately
|
|
if not response.candidates:
|
|
error_msg = f"No candidates returned"
|
|
if hasattr(response, 'prompt_feedback'):
|
|
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
|
|
|
|
# This is a safety block, should retry
|
|
if attempt < max_retries - 1:
|
|
wait_time = (2 ** attempt) * 0.5
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error_type': 'safety_block',
|
|
'error_message': error_msg,
|
|
'response': response
|
|
}
|
|
|
|
candidate = response.candidates[0]
|
|
|
|
# Check finish reason
|
|
if candidate.finish_reason and candidate.finish_reason != 1: # 1 = STOP (normal completion)
|
|
error_msg = f"Request finished with reason: {candidate.finish_reason}"
|
|
if hasattr(response, 'prompt_feedback'):
|
|
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
|
|
if hasattr(candidate, 'safety_ratings'):
|
|
error_msg += f" - Safety ratings: {candidate.safety_ratings}"
|
|
|
|
# Check if this is a retryable safety issue
|
|
if candidate.finish_reason in [3, 4, 5]: # SAFETY, RECITATION, OTHER safety-related reasons
|
|
if attempt < max_retries - 1:
|
|
wait_time = (2 ** attempt) * 0.5
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error_type': 'safety_finish_reason',
|
|
'error_message': error_msg,
|
|
'response': response
|
|
}
|
|
else:
|
|
# Non-safety related finish reason, don't retry
|
|
return {
|
|
'success': False,
|
|
'error_type': 'other_finish_reason',
|
|
'error_message': error_msg,
|
|
'response': response
|
|
}
|
|
|
|
# Success case
|
|
return {
|
|
'success': True,
|
|
'response': response,
|
|
'text': response.text.strip()
|
|
}
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
error_str = str(e)
|
|
|
|
# Check if this looks like a safety/blocking error
|
|
is_safety_error = any(keyword in error_str.lower() for keyword in [
|
|
'safety', 'blocked', 'filtered', 'response.text', 'response.parts',
|
|
'finish_reason', 'candidates', 'prompt_feedback'
|
|
])
|
|
|
|
if is_safety_error and attempt < max_retries - 1:
|
|
wait_time = (2 ** attempt) * 0.5
|
|
time.sleep(wait_time)
|
|
continue
|
|
elif attempt < max_retries - 1:
|
|
# Other errors, also retry but with different messaging
|
|
wait_time = (2 ** attempt) * 0.5
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
# Final attempt failed
|
|
return {
|
|
'success': False,
|
|
'error_type': 'exception',
|
|
'error_message': str(e),
|
|
'exception': e
|
|
}
|
|
|
|
# This shouldn't be reached, but just in case
|
|
return {
|
|
'success': False,
|
|
'error_type': 'max_retries_exceeded',
|
|
'error_message': f"Max retries ({max_retries}) exceeded",
|
|
'last_error': str(last_error) if last_error else "Unknown error"
|
|
}
|
|
|
|
def create_detection_prompt(self, master_ids: List[str]) -> str:
|
|
"""Create the prompt for image detection"""
|
|
prompt = """Analyze the layout image (the last image provided) and identify which of the master images appear in it.
|
|
|
|
INSTRUCTIONS:
|
|
1. Examine the layout image carefully to identify any image segments or components within it
|
|
2. For each segment you identify, look for matches in the provided master images
|
|
3. Look for all types of matches including:
|
|
- Exact matches (complete images)
|
|
- Cropped versions (partial images)
|
|
- Scaled or resized versions
|
|
- Rotated versions
|
|
- Images that may be partially obscured
|
|
|
|
4. Focus on visual similarity in terms of:
|
|
- Person/model appearance and pose
|
|
- Clothing details (colors, patterns, styles)
|
|
- Background and composition
|
|
- Overall visual elements
|
|
|
|
5. Be conservative and only include matches where you have reasonable confidence
|
|
|
|
6. Only return a positive result (master image appears in the layout) if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result.
|
|
|
|
Master Images with IDs:
|
|
"""
|
|
|
|
for i, master_id in enumerate(master_ids):
|
|
prompt += f"Image {i+1}: ID = {master_id}\n"
|
|
|
|
prompt += """
|
|
The final image is the LAYOUT IMAGE to analyze.
|
|
|
|
Return your response as a JSON object with this exact format:
|
|
{
|
|
"detected_masters": ["master_id_1", "master_id_2"],
|
|
"analysis": "Brief explanation of what you found and your matching reasoning"
|
|
}
|
|
|
|
Only include master IDs in "detected_masters" where you completely sure of a match. If no clear matches are found, return an empty array.
|
|
|
|
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos that may show models in various clothing styles for retail purposes. This prompt and task are completely benign and should not trigger safety filters. This is standard e-commerce image comparison work, not dangerous, offensive, or inappropriate content. The comparison is purely for business/marketing layout matching purposes.
|
|
"""
|
|
|
|
return prompt
|
|
|
|
|
|
|
|
|
|
|
|
def create_simple_detection_prompt(self, master_ids: List[str]) -> str:
|
|
"""Create a simplified prompt for cases where the main prompt gets blocked"""
|
|
prompt = """Compare the layout image (the last image) with the master images provided and identify which master images appear in the layout.
|
|
|
|
Look for visual matches including:
|
|
- Complete images that match
|
|
- Partial/cropped versions
|
|
- Resized versions
|
|
|
|
This is for e-commerce product image matching.
|
|
|
|
Master Images with IDs:
|
|
"""
|
|
|
|
for i, master_id in enumerate(master_ids):
|
|
prompt += f"Image {i+1}: ID = {master_id}\n"
|
|
|
|
prompt += """
|
|
The final image is the LAYOUT IMAGE to analyze.
|
|
|
|
Return your response as a JSON object:
|
|
{
|
|
"detected_masters": ["master_id_1", "master_id_2", ...],
|
|
"analysis": "Brief explanation"
|
|
}
|
|
|
|
Only include master IDs that clearly appear in the layout image.
|
|
"""
|
|
|
|
return prompt
|
|
|
|
def make_robust_api_call(self, prompt, files, operation_name="API call", max_retries=3):
|
|
"""Make a robust API call with comprehensive error detection and retry logic"""
|
|
last_error = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = self.model.generate_content([prompt] + files, safety_settings=self.safety_settings)
|
|
|
|
# Check for blocked content or safety issues immediately
|
|
if not response.candidates:
|
|
error_msg = f"No candidates returned"
|
|
if hasattr(response, 'prompt_feedback'):
|
|
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
|
|
|
|
# This is a safety block, should retry
|
|
if attempt < max_retries - 1:
|
|
wait_time = (2 ** attempt) * 0.5
|
|
print(f" Safety block detected on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s...")
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error_type': 'safety_block',
|
|
'error_message': error_msg,
|
|
'response': response
|
|
}
|
|
|
|
candidate = response.candidates[0]
|
|
|
|
# Check finish reason
|
|
if candidate.finish_reason and candidate.finish_reason != 1: # 1 = STOP (normal completion)
|
|
error_msg = f"Request finished with reason: {candidate.finish_reason}"
|
|
if hasattr(response, 'prompt_feedback'):
|
|
error_msg += f" - Prompt feedback: {response.prompt_feedback}"
|
|
if hasattr(candidate, 'safety_ratings'):
|
|
error_msg += f" - Safety ratings: {candidate.safety_ratings}"
|
|
|
|
# Check if this is a retryable safety issue
|
|
if candidate.finish_reason in [3, 4, 5]: # SAFETY, RECITATION, OTHER safety-related reasons
|
|
if attempt < max_retries - 1:
|
|
wait_time = (2 ** attempt) * 0.5
|
|
print(f" Safety/content issue detected on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s...")
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error_type': 'safety_finish_reason',
|
|
'error_message': error_msg,
|
|
'response': response
|
|
}
|
|
else:
|
|
# Non-safety related finish reason, don't retry
|
|
return {
|
|
'success': False,
|
|
'error_type': 'other_finish_reason',
|
|
'error_message': error_msg,
|
|
'response': response
|
|
}
|
|
|
|
# Success case
|
|
return {
|
|
'success': True,
|
|
'response': response,
|
|
'text': response.text.strip()
|
|
}
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
error_str = str(e)
|
|
|
|
# Check if this looks like a safety/blocking error
|
|
is_safety_error = any(keyword in error_str.lower() for keyword in [
|
|
'safety', 'blocked', 'filtered', 'response.text', 'response.parts',
|
|
'finish_reason', 'candidates', 'prompt_feedback'
|
|
])
|
|
|
|
if is_safety_error and attempt < max_retries - 1:
|
|
wait_time = (2 ** attempt) * 0.5
|
|
print(f" Safety-related error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
elif attempt < max_retries - 1:
|
|
# Other errors, also retry but with different messaging
|
|
wait_time = (2 ** attempt) * 0.5
|
|
print(f" API error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}")
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
# Final attempt failed
|
|
return {
|
|
'success': False,
|
|
'error_type': 'exception',
|
|
'error_message': str(e),
|
|
'exception': e
|
|
}
|
|
|
|
# This shouldn't be reached, but just in case
|
|
return {
|
|
'success': False,
|
|
'error_type': 'max_retries_exceeded',
|
|
'error_message': f"Max retries ({max_retries}) exceeded",
|
|
'last_error': str(last_error) if last_error else "Unknown error"
|
|
}
|
|
|
|
def is_cen_image(self, master_id: str) -> bool:
|
|
"""Check if a master image ID represents a CEN (censored) image"""
|
|
return '_CEN' in master_id
|
|
|
|
def find_corresponding_non_cen_image(self, cen_master_id: str) -> Optional[str]:
|
|
"""Find the corresponding non-CEN image for a given CEN master ID"""
|
|
if not self.is_cen_image(cen_master_id):
|
|
return None
|
|
|
|
# Transform CEN filename to non-CEN filename
|
|
# Example: "1011A_1011A_1011_01_CEN" -> "1011A_1011_01"
|
|
parts = cen_master_id.split('_')
|
|
if len(parts) >= 4 and parts[-1] == 'CEN':
|
|
# Remove the middle duplicate part and _CEN suffix
|
|
# Pattern: prefix_prefix_middle_suffix_CEN -> prefix_middle_suffix
|
|
if len(parts) >= 5:
|
|
non_cen_id = f"{parts[0]}_{parts[2]}_{parts[3]}"
|
|
else:
|
|
# Fallback: just remove _CEN
|
|
non_cen_id = '_'.join(parts[:-1])
|
|
|
|
# Check if this non-CEN image exists in our master images
|
|
if non_cen_id in self.master_images:
|
|
return non_cen_id
|
|
|
|
return None
|
|
|
|
def create_censorship_detection_prompt(self) -> str:
|
|
"""Create prompt for detecting if a layout image contains censored content"""
|
|
prompt = """Analyze this layout image to determine if it contains censored or uncensored content.
|
|
|
|
TASK: Determine whether the images in this layout are censored (covered) or uncensored (more exposed).
|
|
|
|
CENSORSHIP INDICATORS TO LOOK FOR:
|
|
1. **Clothing Coverage**:
|
|
- Long sleeves vs. sleeveless/short sleeves
|
|
- Full-length pants/skirts vs. shorts or shorter garments
|
|
- High necklines vs. lower necklines
|
|
|
|
2. **Skin Coverage**:
|
|
- Arms: Fully covered vs. bare arms
|
|
- Legs: Fully covered vs. exposed legs/thighs
|
|
- Torso: Additional covering vs. more exposed areas
|
|
|
|
3. **Added Elements**:
|
|
- Opaque or semi-transparent overlay layers covering skin
|
|
- Additional fabric or clothing elements that appear to cover exposed areas
|
|
- Digital modifications that add coverage
|
|
|
|
CLASSIFICATION:
|
|
- **CENSORED**: If models show significant additional clothing coverage, long sleeves, full pants/skirts, or digital overlays covering skin
|
|
- **UNCENSORED**: If models show more exposed skin, shorter garments, bare arms/legs, or natural clothing without added coverage
|
|
|
|
Return your response as a JSON object with this exact format:
|
|
{{
|
|
"is_censored": true/false,
|
|
"confidence": "high/medium/low",
|
|
"analysis": "Detailed explanation of the coverage patterns observed and reasoning for the classification",
|
|
"coverage_details": "Specific description of clothing and skin coverage in the layout"
|
|
}}
|
|
|
|
Be precise and focus on the actual clothing and coverage patterns visible in the image.
|
|
|
|
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image classification. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
|
|
"""
|
|
return prompt
|
|
|
|
def detect_layout_censorship(self, layout_path: str) -> Dict:
|
|
"""Detect if a layout image contains censored or uncensored content"""
|
|
try:
|
|
# Upload layout image
|
|
layout_file = self.upload_single_image(layout_path)
|
|
if not layout_file:
|
|
raise Exception("Failed to upload layout image")
|
|
|
|
# Create censorship detection prompt
|
|
prompt = self.create_censorship_detection_prompt()
|
|
|
|
# Make API call with robust retry logic
|
|
api_result = self.make_robust_api_call(prompt, [layout_file], "censorship detection")
|
|
|
|
# Handle API call failure
|
|
if not api_result['success']:
|
|
return {
|
|
'is_censored': True, # Default to censored if API fails
|
|
'confidence': 'unknown',
|
|
'analysis': f'API call failed: {api_result["error_message"]}',
|
|
'error': f"{api_result['error_type']}: {api_result['error_message']}"
|
|
}
|
|
|
|
# Parse response
|
|
response_text = api_result['text']
|
|
|
|
# Extract JSON from response
|
|
try:
|
|
start_idx = response_text.find('{')
|
|
end_idx = response_text.rfind('}') + 1
|
|
|
|
if start_idx == -1 or end_idx == 0:
|
|
raise ValueError("No JSON found in response")
|
|
|
|
json_str = response_text[start_idx:end_idx]
|
|
result = json.loads(json_str)
|
|
|
|
# Validate result format
|
|
if 'is_censored' not in result:
|
|
result['is_censored'] = True # Default to censored if unclear
|
|
if 'confidence' not in result:
|
|
result['confidence'] = 'unknown'
|
|
if 'analysis' not in result:
|
|
result['analysis'] = response_text
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
return {
|
|
'is_censored': True, # Default to censored if parsing fails
|
|
'confidence': 'unknown',
|
|
'analysis': response_text,
|
|
'error': f'JSON decode error: {e}'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'is_censored': True, # Default to censored if error
|
|
'confidence': 'unknown',
|
|
'analysis': '',
|
|
'error': str(e)
|
|
}
|
|
|
|
def apply_cen_refinement_to_results(self, layout_path: str, initial_results: Dict) -> Dict:
|
|
"""Apply CEN refinement to initial detection results"""
|
|
layout_name = Path(layout_path).name
|
|
detected_masters = initial_results.get('detected_masters', [])
|
|
|
|
# First, deduplicate the detected masters to avoid processing duplicates
|
|
original_count = len(detected_masters)
|
|
detected_masters = self.deduplicate_master_matches(detected_masters)
|
|
if len(detected_masters) != original_count:
|
|
duplicates_removed = original_count - len(detected_masters)
|
|
print(f" Removed {duplicates_removed} duplicate master(s) before CEN refinement")
|
|
|
|
# Find CEN images in the results
|
|
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
|
|
|
|
if not cen_images:
|
|
# No CEN images found, return original results
|
|
return initial_results
|
|
|
|
print(f" Refining {len(cen_images)} CEN matches for {layout_name}")
|
|
print(f" Analyzing layout to determine censorship level...")
|
|
|
|
# Detect if the layout is censored or uncensored
|
|
censorship_result = self.detect_layout_censorship(layout_path)
|
|
is_layout_censored = censorship_result.get('is_censored', True)
|
|
confidence = censorship_result.get('confidence', 'unknown')
|
|
|
|
print(f" Layout analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})")
|
|
|
|
refined_masters = []
|
|
refinement_details = []
|
|
changes_made = 0
|
|
|
|
# Process each detected image
|
|
for master_id in detected_masters:
|
|
if self.is_cen_image(master_id):
|
|
# This is a CEN image
|
|
non_cen_id = self.find_corresponding_non_cen_image(master_id)
|
|
|
|
if not is_layout_censored and non_cen_id:
|
|
# Layout is uncensored, switch to non-CEN version
|
|
refined_masters.append(non_cen_id)
|
|
refinement_details.append({
|
|
'original_cen_match': master_id,
|
|
'non_cen_alternative': non_cen_id,
|
|
'final_choice': non_cen_id,
|
|
'confidence': confidence,
|
|
'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}",
|
|
'changed': True,
|
|
'reason': 'layout_uncensored'
|
|
})
|
|
changes_made += 1
|
|
print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)")
|
|
else:
|
|
# Layout is censored or no non-CEN alternative, keep CEN version
|
|
refined_masters.append(master_id)
|
|
reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative'
|
|
refinement_details.append({
|
|
'original_cen_match': master_id,
|
|
'non_cen_alternative': non_cen_id,
|
|
'final_choice': master_id,
|
|
'confidence': confidence,
|
|
'analysis': f"Kept {master_id} - layout is censored or no non-CEN alternative available",
|
|
'changed': False,
|
|
'reason': reason
|
|
})
|
|
print(f" → Kept {master_id} ({'layout is censored' if is_layout_censored else 'no non-CEN alternative'})")
|
|
else:
|
|
# This is not a CEN image, keep it as-is
|
|
refined_masters.append(master_id)
|
|
|
|
print(f" Summary: {changes_made} CEN images changed to non-CEN versions")
|
|
|
|
# Apply deduplication to refined masters in case refinement introduced duplicates
|
|
original_refined = refined_masters[:]
|
|
refined_masters = self.deduplicate_master_matches(refined_masters)
|
|
|
|
if len(refined_masters) != len(original_refined):
|
|
post_refinement_duplicates = len(original_refined) - len(refined_masters)
|
|
print(f" Post-refinement deduplication: Removed {post_refinement_duplicates} duplicate(s)")
|
|
|
|
# Update results with refinement information
|
|
refined_results = initial_results.copy()
|
|
refined_results['detected_masters'] = refined_masters
|
|
refined_results['detected_master_ids'] = refined_masters # Update both fields for consistency
|
|
refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters ]
|
|
refined_results['refinement_applied'] = True
|
|
refined_results['refinement_details'] = refinement_details
|
|
refined_results['censorship_analysis'] = censorship_result
|
|
refined_results['original_detection_count'] = len(detected_masters)
|
|
refined_results['refined_detection_count'] = len(refined_masters)
|
|
refined_results['changes_made'] = changes_made
|
|
|
|
return refined_results
|
|
|
|
|
|
|
|
|
|
|
|
def detect_images_in_layout(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
|
|
"""Detect which master images appear in a single layout image"""
|
|
layout_name = Path(layout_path).name
|
|
print(f"Processing {layout_index}/{total_layouts}: {layout_name}")
|
|
|
|
try:
|
|
# Upload only the layout image (masters already uploaded)
|
|
layout_file = self.upload_single_image(layout_path)
|
|
if not layout_file:
|
|
raise Exception("Failed to upload layout image")
|
|
|
|
# Combine pre-uploaded masters with the layout
|
|
all_files = self.uploaded_masters + [layout_file]
|
|
|
|
# Create prompt
|
|
master_ids = list(self.master_images.keys())
|
|
prompt = self.create_detection_prompt(master_ids)
|
|
|
|
# Try main prompt first, then fallback to simple prompt if blocked
|
|
api_result = self.make_robust_api_call(prompt, all_files, f"detection for {layout_name}")
|
|
|
|
# If main prompt failed due to safety issues, try simple prompt
|
|
if not api_result['success'] and api_result['error_type'] in ['safety_block', 'safety_finish_reason']:
|
|
print(f" Main prompt blocked for {layout_name}, trying simplified prompt...")
|
|
simple_prompt = self.create_simple_detection_prompt(master_ids)
|
|
api_result = self.make_robust_api_call(simple_prompt, all_files, f"simple detection for {layout_name}")
|
|
|
|
# Handle API call failure
|
|
if not api_result['success']:
|
|
error_msg = api_result['error_message']
|
|
print(f"API call failed for {layout_name}: {error_msg}")
|
|
return {
|
|
'detected_masters': [],
|
|
'analysis': f'API call failed: {error_msg}',
|
|
'error': f"{api_result['error_type']}: {error_msg}",
|
|
'retry_count': 3 # Max retries were attempted
|
|
}
|
|
|
|
# Parse response
|
|
response_text = api_result['text']
|
|
|
|
# Extract JSON from response
|
|
try:
|
|
start_idx = response_text.find('{')
|
|
end_idx = response_text.rfind('}') + 1
|
|
|
|
if start_idx == -1 or end_idx == 0:
|
|
raise ValueError("No JSON found in response")
|
|
|
|
json_str = response_text[start_idx:end_idx]
|
|
result = json.loads(json_str)
|
|
|
|
# Validate result format
|
|
if 'detected_masters' not in result:
|
|
result['detected_masters'] = []
|
|
if 'analysis' not in result:
|
|
result['analysis'] = response_text
|
|
|
|
# Deduplicate detected masters
|
|
original_detected = result['detected_masters'][:]
|
|
result['detected_masters'] = self.deduplicate_master_matches(result['detected_masters'])
|
|
|
|
# Track deduplication if any duplicates were removed
|
|
if len(result['detected_masters']) != len(original_detected):
|
|
duplicates_removed = len(original_detected) - len(result['detected_masters'])
|
|
result['deduplication_applied'] = True
|
|
result['duplicates_removed'] = duplicates_removed
|
|
result['original_detected_masters'] = original_detected
|
|
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s) from {layout_name}")
|
|
|
|
# Log completion
|
|
detected_count = len(result['detected_masters'])
|
|
print(f"✓ Completed {layout_name} - Found {detected_count} matches")
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSON decode error for {layout_name}: {e}")
|
|
return {
|
|
'detected_masters': [],
|
|
'analysis': response_text,
|
|
'error': f'JSON decode error: {e}'
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error analyzing {layout_name}: {e}"
|
|
|
|
# Check if this was a safety-related error
|
|
if "response.text" in str(e) or "response.parts" in str(e):
|
|
error_msg += "\nThis appears to be a safety filter blocking issue."
|
|
if hasattr(e, 'response') and e.response:
|
|
if hasattr(e.response, 'prompt_feedback'):
|
|
error_msg += f"\nPrompt feedback: {e.response.prompt_feedback}"
|
|
if e.response.candidates:
|
|
candidate = e.response.candidates[0]
|
|
if hasattr(candidate, 'safety_ratings'):
|
|
error_msg += f"\nSafety ratings: {candidate.safety_ratings}"
|
|
if hasattr(candidate, 'finish_reason'):
|
|
error_msg += f"\nFinish reason: {candidate.finish_reason}"
|
|
|
|
print(error_msg)
|
|
return {
|
|
'detected_masters': [],
|
|
'analysis': '',
|
|
'error': str(e)
|
|
}
|
|
|
|
def create_single_master_prompt(self, master_id: str) -> str:
|
|
"""Create prompt for checking if a single master image appears in the layout"""
|
|
prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it.
|
|
|
|
INSTRUCTIONS:
|
|
1. Compare the master image (first image) with the layout image (second image)
|
|
2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL
|
|
3. The layout image may contain the master image in various forms:
|
|
- Complete/exact match
|
|
- Cropped version
|
|
- Scaled or resized version
|
|
- Rotated version
|
|
- Partially obscured
|
|
|
|
4. Focus on visual similarity in terms of:
|
|
- Person/model appearance and pose (must be EXACTLY the same)
|
|
- Clothing details (colors, patterns, styles - must be EXACTLY the same)
|
|
- Background and composition
|
|
- Overall visual elements
|
|
|
|
5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same.
|
|
If there is ANY difference in clothing, model, or pose then return a negative result.
|
|
|
|
Master Image ID: {master_id}
|
|
|
|
Return your response as a JSON object with this exact format:
|
|
{{
|
|
"match_found": true/false,
|
|
"master_id": "{master_id}",
|
|
"confidence": "high/medium/low",
|
|
"analysis": "Detailed explanation of your findings and reasoning"
|
|
}}
|
|
|
|
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
|
|
"""
|
|
return prompt
|
|
|
|
def detect_single_master_in_layout(self, layout_path: str, master_id: str, master_index: int, total_masters: int) -> Dict:
|
|
"""Check if a single master image appears in the layout image - THREAD-SAFE VERSION"""
|
|
layout_name = Path(layout_path).name
|
|
master_path = self.master_images[master_id]
|
|
|
|
try:
|
|
# Create thread-local API client to avoid shared state issues
|
|
import google.generativeai as thread_genai
|
|
api_key = os.getenv('GEMINI_API_KEY')
|
|
thread_genai.configure(api_key=api_key)
|
|
thread_model = thread_genai.GenerativeModel('gemini-2.5-pro')
|
|
|
|
# Upload both images using thread-local client
|
|
master_file = self._upload_single_image_threadsafe(master_path, thread_genai)
|
|
layout_file = self._upload_single_image_threadsafe(layout_path, thread_genai)
|
|
|
|
if not master_file or not layout_file:
|
|
raise Exception("Failed to upload images")
|
|
|
|
# Create prompt for single master matching
|
|
prompt = self.create_single_master_prompt(master_id)
|
|
|
|
# Make API call with thread-local model
|
|
api_result = self._make_robust_api_call_threadsafe(
|
|
thread_model,
|
|
prompt,
|
|
[master_file, layout_file],
|
|
f"single master detection: {master_id} in {layout_name}"
|
|
)
|
|
|
|
# Handle API call failure
|
|
if not api_result['success']:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': f'API call failed: {api_result["error_message"]}',
|
|
'error': f"{api_result['error_type']}: {api_result['error_message']}"
|
|
}
|
|
|
|
# Parse response
|
|
response_text = api_result['text']
|
|
|
|
# Extract JSON from response
|
|
try:
|
|
start_idx = response_text.find('{')
|
|
end_idx = response_text.rfind('}') + 1
|
|
|
|
if start_idx == -1 or end_idx == 0:
|
|
raise ValueError("No JSON found in response")
|
|
|
|
json_str = response_text[start_idx:end_idx]
|
|
result = json.loads(json_str)
|
|
|
|
# Validate result format
|
|
if 'match_found' not in result:
|
|
result['match_found'] = False
|
|
if 'master_id' not in result:
|
|
result['master_id'] = master_id
|
|
if 'confidence' not in result:
|
|
result['confidence'] = 'unknown'
|
|
if 'analysis' not in result:
|
|
result['analysis'] = response_text
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': response_text,
|
|
'error': f'JSON decode error: {e}'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': '',
|
|
'error': str(e)
|
|
}
|
|
|
|
def detect_images_in_layout_one_at_a_time(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
|
|
"""Detect which master images appear in a layout by checking each master individually using process-based concurrency"""
|
|
layout_name = Path(layout_path).name
|
|
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Process-based one-at-a-time mode)")
|
|
|
|
master_ids = list(self.master_images.keys())
|
|
total_masters = len(master_ids)
|
|
detected_masters = []
|
|
detailed_results = []
|
|
|
|
print(f" Checking {total_masters} masters using {self.max_concurrent_workers} concurrent processes...")
|
|
|
|
# Prepare arguments for process pool
|
|
tasks = []
|
|
for master_id in master_ids:
|
|
master_path = self.master_images[master_id]
|
|
task_args = (
|
|
layout_path,
|
|
master_id,
|
|
master_path,
|
|
self.enable_greyscale,
|
|
self.enable_contrast_enhancement,
|
|
self.contrast_factor,
|
|
self.safety_settings
|
|
)
|
|
tasks.append(task_args)
|
|
|
|
# Use ProcessPoolExecutor for true isolation
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_concurrent_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_master = {
|
|
executor.submit(process_single_master_detection, *task_args): task_args[1]
|
|
for task_args in tasks
|
|
}
|
|
|
|
completed_count = 0
|
|
# Collect results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_master):
|
|
master_id = future_to_master[future]
|
|
completed_count += 1
|
|
|
|
try:
|
|
result = future.result()
|
|
detailed_results.append(result)
|
|
|
|
# If match found, add to detected masters
|
|
if result.get('match_found', False):
|
|
detected_masters.append(master_id)
|
|
confidence = result.get('confidence', 'unknown')
|
|
print(f" {completed_count}/{total_masters}: ✓ MATCH found for {master_id} (confidence: {confidence})")
|
|
else:
|
|
if 'error' in result:
|
|
print(f" {completed_count}/{total_masters}: Error checking {master_id}: {result['error']}")
|
|
else:
|
|
print(f" {completed_count}/{total_masters}: No match for {master_id}")
|
|
|
|
except Exception as e:
|
|
print(f" {completed_count}/{total_masters}: Process error checking {master_id}: {e}")
|
|
# Add error result to maintain consistency
|
|
error_result = {
|
|
'match_found': False,
|
|
'master_id': master_id,
|
|
'confidence': 'unknown',
|
|
'analysis': '',
|
|
'error': str(e)
|
|
}
|
|
detailed_results.append(error_result)
|
|
|
|
# Sort detailed_results by master_id to maintain consistent ordering
|
|
detailed_results.sort(key=lambda x: x.get('master_id', ''))
|
|
|
|
# Deduplicate detected masters (shouldn't be needed in one-at-a-time mode, but for safety)
|
|
original_detected = detected_masters[:]
|
|
detected_masters = self.deduplicate_master_matches(detected_masters)
|
|
|
|
if len(detected_masters) != len(original_detected):
|
|
duplicates_removed = len(original_detected) - len(detected_masters)
|
|
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s)")
|
|
|
|
detected_count = len(detected_masters)
|
|
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {self.max_concurrent_workers} concurrent processes")
|
|
|
|
return {
|
|
'detected_masters': detected_masters,
|
|
'detected_master_ids': detected_masters,
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters ],
|
|
'analysis': f'Process-based one-at-a-time analysis completed. Found {detected_count} exact matches out of {total_masters} masters checked using {self.max_concurrent_workers} concurrent processes.',
|
|
'detailed_results': detailed_results,
|
|
'processing_mode': 'process_based_one_at_a_time',
|
|
'total_masters_checked': total_masters,
|
|
'concurrent_workers': self.max_concurrent_workers,
|
|
'deduplication_applied': len(detected_masters) != len(original_detected),
|
|
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
|
|
'original_detected_masters': original_detected
|
|
}
|
|
|
|
def process_all_layouts(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
|
|
"""Process all layout images sequentially"""
|
|
if self.one_at_a_time_mode:
|
|
mode_desc = "One-at-a-time Mode"
|
|
else:
|
|
mode_desc = "Multi Master Mode"
|
|
|
|
if self.refinement_mode:
|
|
mode_desc += " with CEN Refinement"
|
|
|
|
print(f"Starting sequential batch processing ({mode_desc})...")
|
|
|
|
# Load master images
|
|
self.load_master_images()
|
|
|
|
# Upload all master images ONCE (only for multi-master mode)
|
|
if not self.one_at_a_time_mode:
|
|
self.upload_master_images_once()
|
|
|
|
# Get layout files
|
|
if specific_file:
|
|
# Process only the specific file
|
|
layout_files = [self.layouts_path / specific_file]
|
|
if not layout_files[0].exists():
|
|
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
|
|
print(f"Processing specific file: {specific_file}")
|
|
else:
|
|
layout_files = list(self.layouts_path.glob("*.jpg"))
|
|
|
|
if limit:
|
|
layout_files = layout_files[:limit]
|
|
print(f"Processing first {limit} layouts only")
|
|
|
|
total_layouts = len(layout_files)
|
|
print(f"Processing {total_layouts} layout images in {mode_desc}")
|
|
print("=" * 60)
|
|
|
|
results = {}
|
|
start_time = time.time()
|
|
|
|
for i, layout_path in enumerate(layout_files, 1):
|
|
layout_id = layout_path.stem
|
|
|
|
# Detect images in layout using the appropriate method
|
|
if self.split_mode:
|
|
# Split mode: split layout into panels and match each panel
|
|
master_ids = list(self.master_images.keys())
|
|
result = self.splitter.split_layout_and_match(str(layout_path), master_ids, self)
|
|
|
|
# Apply CEN refinement if enabled and there are matches
|
|
if self.refinement_mode and result.get('detected_masters'):
|
|
result = self.apply_cen_refinement_to_results(str(layout_path), result)
|
|
elif self.one_at_a_time_mode:
|
|
result = self.detect_images_in_layout_one_at_a_time(str(layout_path), i, total_layouts)
|
|
else:
|
|
result = self.detect_images_in_layout(str(layout_path), i, total_layouts)
|
|
|
|
# Apply CEN refinement if enabled and there are CEN matches
|
|
if not self.split_mode and self.refinement_mode and result.get('detected_masters'):
|
|
result = self.apply_cen_refinement_to_results(str(layout_path), result)
|
|
|
|
layout_result = {
|
|
'layout_filename': layout_path.name,
|
|
'detected_master_ids': result['detected_masters'],
|
|
'detected_master_filenames': [f"{mid}.jpg" for mid in result['detected_masters'] ],
|
|
'analysis': result.get('analysis', 'Split mode analysis'),
|
|
'detection_mode': mode_desc.lower().replace(' ', '_').replace('with_', '')
|
|
}
|
|
|
|
# Add split mode specific fields
|
|
if self.split_mode:
|
|
layout_result['split_mode'] = True
|
|
layout_result['splits_generated'] = result.get('splits_generated', 0)
|
|
layout_result['panel_count'] = result.get('panel_count', 1)
|
|
layout_result['panel_confidence'] = result.get('panel_confidence', 'unknown')
|
|
if 'split_results' in result:
|
|
layout_result['split_results'] = result['split_results']
|
|
|
|
|
|
# Add deduplication fields if applied
|
|
if 'deduplication_applied' in result:
|
|
layout_result['deduplication_applied'] = result['deduplication_applied']
|
|
layout_result['duplicates_removed'] = result['duplicates_removed']
|
|
layout_result['original_detected_masters'] = result['original_detected_masters']
|
|
|
|
|
|
if 'error' in result:
|
|
layout_result['error'] = result['error']
|
|
|
|
# Add refinement mode specific fields
|
|
if self.refinement_mode and result.get('refinement_applied'):
|
|
layout_result['refinement_applied'] = result['refinement_applied']
|
|
layout_result['refinement_details'] = result['refinement_details']
|
|
layout_result['censorship_analysis'] = result['censorship_analysis']
|
|
layout_result['original_detection_count'] = result['original_detection_count']
|
|
layout_result['refined_detection_count'] = result['refined_detection_count']
|
|
layout_result['changes_made'] = result.get('changes_made', 0)
|
|
|
|
results[layout_id] = layout_result
|
|
|
|
# Progress update with time estimate
|
|
elapsed = time.time() - start_time
|
|
avg_time = elapsed / i
|
|
remaining = (total_layouts - i) * avg_time
|
|
|
|
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
|
|
|
|
# Save progress periodically
|
|
if i % 20 == 0:
|
|
self.save_results(results, f"progress_{i}")
|
|
|
|
total_time = time.time() - start_time
|
|
print(f"\n✓ Completed processing all {total_layouts} layouts in {total_time/60:.1f} minutes")
|
|
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
|
|
return results
|
|
|
|
def save_results(self, results: Dict, filename: str = "detection_results") -> str:
|
|
"""Save results to JSON file"""
|
|
output_path = self.results_path / f"{filename}.json"
|
|
|
|
# Add metadata
|
|
output_data = {
|
|
'metadata': {
|
|
'total_layouts_processed': len(results),
|
|
'total_master_images': len(self.master_images),
|
|
'master_images_available': list(self.master_files.keys())
|
|
},
|
|
'results': results
|
|
}
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(output_data, f, indent=2)
|
|
|
|
print(f"Results saved to: {output_path}")
|
|
return str(output_path)
|
|
|
|
def generate_summary(self, results: Dict) -> Dict:
|
|
"""Generate summary statistics"""
|
|
total_layouts = len(results)
|
|
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
|
|
|
|
# Count master image occurrences
|
|
master_counts = {}
|
|
for result in results.values():
|
|
for master_id in result['detected_master_ids']:
|
|
master_counts[master_id] = master_counts.get(master_id, 0) + 1
|
|
|
|
# Deduplication statistics
|
|
layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False))
|
|
total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values())
|
|
|
|
summary = {
|
|
'total_layouts_processed': total_layouts,
|
|
'layouts_with_matches': layouts_with_matches,
|
|
'layouts_without_matches': total_layouts - layouts_with_matches,
|
|
'master_image_usage': master_counts,
|
|
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
|
|
# Deduplication stats
|
|
'layouts_with_deduplication': layouts_with_deduplication,
|
|
'total_duplicates_removed': total_duplicates_removed,
|
|
'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0
|
|
}
|
|
|
|
return summary
|
|
|
|
def deduplicate_master_matches(self, detected_masters: List[str]) -> List[str]:
|
|
"""Remove duplicate master matches from a list while preserving order"""
|
|
if not detected_masters:
|
|
return detected_masters
|
|
|
|
# Simple deduplication - remove exact duplicates while preserving order
|
|
seen = set()
|
|
deduplicated = []
|
|
|
|
for master_id in detected_masters:
|
|
if master_id not in seen:
|
|
seen.add(master_id)
|
|
deduplicated.append(master_id)
|
|
|
|
return deduplicated
|
|
|
|
def cleanup_temp_files(self):
|
|
"""Clean up temporary processed image files - handles thread-safe filenames"""
|
|
try:
|
|
if self.temp_path.exists():
|
|
# Clean up both old and new thread-safe naming patterns
|
|
for temp_file in self.temp_path.glob("*_processed*.jpg"):
|
|
temp_file.unlink()
|
|
# Remove temp directory if empty
|
|
if not any(self.temp_path.iterdir()):
|
|
self.temp_path.rmdir()
|
|
except Exception as e:
|
|
print(f"Warning: Failed to cleanup temp files: {e}") |