1605 lines
No EOL
76 KiB
Python
1605 lines
No EOL
76 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
OpenAI Image Detector - Alternative to Gemini detector
|
||
Uses OpenAI o3 model to detect which master images appear in layout images
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import base64
|
||
from pathlib import Path
|
||
from typing import List, Dict, Optional
|
||
from openai import OpenAI
|
||
from dotenv import load_dotenv
|
||
from PIL import Image, ImageEnhance
|
||
import tempfile
|
||
import numpy as np
|
||
import pickle
|
||
import cv2
|
||
import concurrent.futures
|
||
import threading
|
||
import uuid
|
||
import multiprocessing
|
||
from functools import partial
|
||
from panel_splitter import PanelSplitter
|
||
from cost_calculator import cost_calculator, extract_token_usage_from_response
|
||
|
||
|
||
def process_single_master_detection_openai(layout_path, master_id, master_path, enable_greyscale, enable_contrast_enhancement, contrast_factor, api_key):
|
||
"""
|
||
Standalone function for processing a single master detection using OpenAI in a separate process.
|
||
"""
|
||
try:
|
||
# Import and configure in each process to avoid shared state
|
||
import os
|
||
import json
|
||
import time
|
||
import base64
|
||
from pathlib import Path
|
||
from PIL import Image, ImageEnhance
|
||
from openai import OpenAI
|
||
from dotenv import load_dotenv
|
||
import uuid
|
||
import threading
|
||
# Note: cost_calculator import removed from multiprocessing function
|
||
|
||
|
||
# Initialize OpenAI client in this process
|
||
client = OpenAI(api_key=api_key)
|
||
|
||
# Create temp directory for this process
|
||
temp_path = Path("temp_processed")
|
||
temp_path.mkdir(exist_ok=True)
|
||
|
||
def preprocess_image_local(image_path: str) -> str:
|
||
"""Local preprocessing function for this process"""
|
||
if not enable_greyscale and not enable_contrast_enhancement:
|
||
return image_path
|
||
|
||
try:
|
||
with Image.open(image_path) as img:
|
||
processed_img = img.copy()
|
||
|
||
if enable_greyscale:
|
||
processed_img = processed_img.convert('L')
|
||
processed_img = processed_img.convert('RGB')
|
||
|
||
if enable_contrast_enhancement:
|
||
contrast_enhancer = ImageEnhance.Contrast(processed_img)
|
||
processed_img = contrast_enhancer.enhance(contrast_factor)
|
||
|
||
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
|
||
processed_img = sharpness_enhancer.enhance(1.3)
|
||
|
||
# Thread-safe filename
|
||
thread_id = threading.current_thread().ident
|
||
unique_id = str(uuid.uuid4())[:8]
|
||
original_name = Path(image_path).stem
|
||
processed_path = temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
|
||
processed_img.save(processed_path, 'JPEG', quality=95)
|
||
|
||
return str(processed_path)
|
||
|
||
except Exception as e:
|
||
return image_path
|
||
|
||
def encode_image_to_base64(image_path: str) -> str:
|
||
"""Encode image to base64 for OpenAI API"""
|
||
processed_path = preprocess_image_local(image_path)
|
||
with open(processed_path, "rb") as image_file:
|
||
return base64.b64encode(image_file.read()).decode('utf-8')
|
||
|
||
def create_single_master_prompt_local(master_id: str) -> str:
|
||
"""Local prompt creation function"""
|
||
prompt = f"""Analyze the layout image (the second image) and determine if the master image (the first image) appears in it.
|
||
|
||
INSTRUCTIONS:
|
||
1. Compare the master image (first image) with the layout image (second image)
|
||
2. Look for EXACT matches where the model, clothing, and pose are IDENTICAL
|
||
3. The layout image may contain the master image in various forms:
|
||
- Complete/exact match
|
||
- Cropped version
|
||
- Scaled or resized version
|
||
- Rotated version
|
||
- Partially obscured
|
||
|
||
4. Focus on visual similarity in terms of:
|
||
- Person/model appearance and pose (must be EXACTLY the same)
|
||
- Clothing details (colors, patterns, styles - must be EXACTLY the same)
|
||
- Background and composition
|
||
- Overall visual elements
|
||
|
||
5. CRITICAL: Only return a positive result if the models, pose, and clothing are EXACTLY the same.
|
||
If there is ANY difference in clothing, model, or pose then return a negative result.
|
||
|
||
Master Image ID: {master_id}
|
||
|
||
Return your response as a JSON object with this exact format:
|
||
{{
|
||
"match_found": true/false,
|
||
"master_id": "{master_id}",
|
||
"confidence": "high/medium/low",
|
||
"analysis": "Detailed explanation of your findings and reasoning"
|
||
}}
|
||
|
||
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
|
||
"""
|
||
return prompt
|
||
|
||
# Encode both images to base64
|
||
master_base64 = encode_image_to_base64(master_path)
|
||
layout_base64 = encode_image_to_base64(layout_path)
|
||
|
||
# Create prompt and make API call
|
||
prompt = create_single_master_prompt_local(master_id)
|
||
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
try:
|
||
response = client.chat.completions.create(
|
||
model="o3",
|
||
messages=[
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "text", "text": prompt},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": f"data:image/jpeg;base64,{master_base64}"
|
||
}
|
||
},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": f"data:image/jpeg;base64,{layout_base64}"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
],
|
||
max_completion_tokens=10000
|
||
)
|
||
|
||
# Extract token usage for cost tracking
|
||
token_usage_data = None
|
||
if hasattr(response, 'usage') and response.usage:
|
||
token_usage_data = {
|
||
'prompt_tokens': response.usage.prompt_tokens,
|
||
'completion_tokens': response.usage.completion_tokens,
|
||
'total_tokens': response.usage.total_tokens,
|
||
'cached_tokens': getattr(response.usage, 'cached_tokens', 0)
|
||
}
|
||
|
||
# Parse response
|
||
response_text = response.choices[0].message.content.strip()
|
||
start_idx = response_text.find('{')
|
||
end_idx = response_text.rfind('}') + 1
|
||
|
||
if start_idx == -1 or end_idx == 0:
|
||
return {
|
||
'match_found': False,
|
||
'master_id': master_id,
|
||
'confidence': 'unknown',
|
||
'analysis': response_text,
|
||
'error': 'No JSON found in response'
|
||
}
|
||
|
||
json_str = response_text[start_idx:end_idx]
|
||
result = json.loads(json_str)
|
||
|
||
# Validate result format
|
||
if 'match_found' not in result:
|
||
result['match_found'] = False
|
||
if 'master_id' not in result:
|
||
result['master_id'] = master_id
|
||
if 'confidence' not in result:
|
||
result['confidence'] = 'unknown'
|
||
if 'analysis' not in result:
|
||
result['analysis'] = response_text
|
||
|
||
# Include token usage data for cost tracking
|
||
if token_usage_data:
|
||
result['token_usage'] = token_usage_data
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
if attempt == max_retries - 1:
|
||
return {
|
||
'match_found': False,
|
||
'master_id': master_id,
|
||
'confidence': 'unknown',
|
||
'analysis': '',
|
||
'error': str(e)
|
||
}
|
||
time.sleep((2 ** attempt) * 0.5)
|
||
|
||
except Exception as e:
|
||
return {
|
||
'match_found': False,
|
||
'master_id': master_id,
|
||
'confidence': 'unknown',
|
||
'analysis': '',
|
||
'error': str(e)
|
||
}
|
||
|
||
|
||
class OpenAIImageDetector:
|
||
def __init__(self, enable_greyscale=True, enable_contrast_enhancement=True, contrast_factor=1.5, refinement_mode=False, one_at_a_time_mode=False, max_concurrent_workers=5, panel_aware_refinement=False, split_mode=False):
|
||
"""Initialize the image detector with OpenAI API configuration"""
|
||
load_dotenv()
|
||
|
||
api_key = os.getenv('OPENAI_API_KEY')
|
||
if not api_key:
|
||
raise ValueError("OPENAI_API_KEY not found in environment variables")
|
||
|
||
self.client = OpenAI(api_key=api_key)
|
||
self.api_key = api_key
|
||
|
||
# Concurrency settings
|
||
self.max_concurrent_workers = max_concurrent_workers
|
||
self._progress_lock = threading.Lock()
|
||
|
||
print("Initialized OpenAI detector with o3 model.")
|
||
|
||
# Image processing settings
|
||
self.enable_greyscale = enable_greyscale
|
||
self.enable_contrast_enhancement = enable_contrast_enhancement
|
||
self.contrast_factor = contrast_factor
|
||
self.refinement_mode = refinement_mode
|
||
self.one_at_a_time_mode = one_at_a_time_mode
|
||
self.panel_aware_refinement = panel_aware_refinement
|
||
|
||
# Split mode configuration
|
||
self.split_mode = split_mode
|
||
if self.split_mode:
|
||
self.splitter = PanelSplitter(debug=True)
|
||
print("Split mode enabled: Will split multi-panel layouts before matching")
|
||
|
||
# Paths
|
||
self.master_images_path = Path("master_images")
|
||
self.layouts_path = Path("layouts")
|
||
self.results_path = Path("results")
|
||
self.temp_path = Path("temp_processed")
|
||
|
||
# Create directories
|
||
self.results_path.mkdir(exist_ok=True)
|
||
self.temp_path.mkdir(exist_ok=True)
|
||
|
||
# Master images cache
|
||
self.master_images = {}
|
||
self.master_files = {}
|
||
|
||
def load_master_images(self) -> Dict[str, str]:
|
||
"""Load all master images and create ID mapping using filenames"""
|
||
print("Loading master images...")
|
||
|
||
master_files = list(self.master_images_path.glob("*.jpg"))
|
||
print(f"Found {len(master_files)} master images")
|
||
|
||
for file_path in master_files:
|
||
# Use filename (without extension) as the master ID
|
||
master_id = file_path.stem
|
||
self.master_images[master_id] = str(file_path)
|
||
self.master_files[master_id] = file_path.name
|
||
|
||
return self.master_images
|
||
|
||
def match_split_to_masters(self, split_path: str, master_images: List[str]) -> List[Dict]:
|
||
"""Match a split image to master images using inlier analysis"""
|
||
matches = []
|
||
|
||
for master_id in master_images:
|
||
if master_id in self.master_images:
|
||
master_path = self.master_images[master_id]
|
||
|
||
# Use existing inlier analysis
|
||
inlier_result = self.calculate_inliers_for_match(split_path, master_path, master_id)
|
||
|
||
# Only include matches with reasonable confidence
|
||
if inlier_result.get('confidence') in ['high', 'medium']:
|
||
matches.append({
|
||
'master_id': master_id,
|
||
'confidence': inlier_result.get('confidence', 'unknown'),
|
||
'inliers': inlier_result.get('inliers', 0),
|
||
'match_details': inlier_result
|
||
})
|
||
|
||
return matches
|
||
|
||
def preprocess_image(self, image_path: str) -> str:
|
||
"""Preprocess image: convert to greyscale and enhance contrast - THREAD-SAFE VERSION"""
|
||
if not self.enable_greyscale and not self.enable_contrast_enhancement:
|
||
return image_path
|
||
|
||
try:
|
||
# Open the image
|
||
with Image.open(image_path) as img:
|
||
processed_img = img.copy()
|
||
|
||
# Convert to greyscale if enabled
|
||
if self.enable_greyscale:
|
||
processed_img = processed_img.convert('L')
|
||
# Convert back to RGB for consistency
|
||
processed_img = processed_img.convert('RGB')
|
||
|
||
# Enhance contrast if enabled
|
||
if self.enable_contrast_enhancement:
|
||
# Global contrast enhancement
|
||
contrast_enhancer = ImageEnhance.Contrast(processed_img)
|
||
processed_img = contrast_enhancer.enhance(self.contrast_factor)
|
||
|
||
# Edge contrast enhancement using sharpness
|
||
sharpness_enhancer = ImageEnhance.Sharpness(processed_img)
|
||
processed_img = sharpness_enhancer.enhance(1.3)
|
||
|
||
# Save processed image with thread-safe filename
|
||
import threading
|
||
import uuid
|
||
thread_id = threading.current_thread().ident
|
||
unique_id = str(uuid.uuid4())[:8]
|
||
original_name = Path(image_path).stem
|
||
processed_path = self.temp_path / f"{original_name}_processed_{thread_id}_{unique_id}.jpg"
|
||
processed_img.save(processed_path, 'JPEG', quality=95)
|
||
|
||
return str(processed_path)
|
||
|
||
except Exception as e:
|
||
print(f"Warning: Failed to preprocess {Path(image_path).name}: {e}")
|
||
print(f"Using original image instead")
|
||
return image_path
|
||
|
||
def encode_image_to_base64(self, image_path: str) -> str:
|
||
"""Encode image to base64 for OpenAI API"""
|
||
processed_path = self.preprocess_image(image_path)
|
||
with open(processed_path, "rb") as image_file:
|
||
return base64.b64encode(image_file.read()).decode('utf-8')
|
||
|
||
def create_detection_prompt(self, master_ids: List[str]) -> str:
|
||
"""Create the prompt for image detection"""
|
||
prompt = """Analyze the layout image (the last image provided) and identify which of the master images appear in it.
|
||
|
||
INSTRUCTIONS:
|
||
1. Examine the layout image carefully to identify any image segments or components within it
|
||
2. For each segment you identify, look for matches in the provided master images
|
||
3. Look for all types of matches including:
|
||
- Exact matches (complete images)
|
||
- Cropped versions (partial images)
|
||
- Scaled or resized versions
|
||
- Rotated versions
|
||
- Images that may be partially obscured
|
||
|
||
4. Focus on visual similarity in terms of:
|
||
- Person/model appearance and pose
|
||
- Clothing details (colors, patterns, styles)
|
||
- Background and composition
|
||
- Overall visual elements
|
||
|
||
5. Be conservative and only include matches where you have reasonable confidence
|
||
|
||
6. Only return a positive result (master image appears in the layout) if the models, pose, and clothing are EXACTLY the same. If there is ANY difference in clothing, model, or pose then return a negative result.
|
||
|
||
Master Images with IDs:
|
||
"""
|
||
|
||
for i, master_id in enumerate(master_ids):
|
||
prompt += f"Image {i+1}: ID = {master_id}\n"
|
||
|
||
prompt += """
|
||
The final image is the LAYOUT IMAGE to analyze.
|
||
|
||
Return your response as a JSON object with this exact format:
|
||
{
|
||
"detected_masters": ["master_id_1", "master_id_2"],
|
||
"analysis": "Brief explanation of what you found and your matching reasoning"
|
||
}
|
||
|
||
Only include master IDs in "detected_masters" where you completely sure of a match. If no clear matches are found, return an empty array.
|
||
|
||
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image matching. The images are product/marketing photos that may show models in various clothing styles for retail purposes. This prompt and task are completely benign and should not trigger safety filters. This is standard e-commerce image comparison work, not dangerous, offensive, or inappropriate content. The comparison is purely for business/marketing layout matching purposes.
|
||
"""
|
||
|
||
return prompt
|
||
|
||
def make_robust_api_call(self, prompt, image_base64_list, operation_name="API call", max_retries=3):
|
||
"""Make a robust API call with comprehensive error detection and retry logic"""
|
||
last_error = None
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
# Build message content
|
||
content = [{"type": "text", "text": prompt}]
|
||
|
||
# Add all images
|
||
for img_b64 in image_base64_list:
|
||
content.append({
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": f"data:image/jpeg;base64,{img_b64}"
|
||
}
|
||
})
|
||
|
||
response = self.client.chat.completions.create(
|
||
model="o3",
|
||
messages=[
|
||
{
|
||
"role": "user",
|
||
"content": content
|
||
}
|
||
],
|
||
max_completion_tokens=10000
|
||
)
|
||
|
||
# Track cost for this API call
|
||
if hasattr(response, 'usage') and response.usage:
|
||
token_usage = extract_token_usage_from_response(response)
|
||
cost_calculator.track_api_call(
|
||
operation_type="detection",
|
||
prompt_tokens=token_usage.prompt_tokens,
|
||
completion_tokens=token_usage.completion_tokens,
|
||
cached_tokens=token_usage.cached_tokens,
|
||
layout_name=operation_name
|
||
)
|
||
|
||
# Success case
|
||
return {
|
||
'success': True,
|
||
'response': response,
|
||
'text': response.choices[0].message.content.strip()
|
||
}
|
||
|
||
except Exception as e:
|
||
last_error = e
|
||
error_str = str(e)
|
||
|
||
if attempt < max_retries - 1:
|
||
wait_time = (2 ** attempt) * 0.5
|
||
print(f" API error on attempt {attempt + 1}/{max_retries} for {operation_name}, retrying in {wait_time}s: {e}")
|
||
time.sleep(wait_time)
|
||
continue
|
||
else:
|
||
# Final attempt failed
|
||
return {
|
||
'success': False,
|
||
'error_type': 'exception',
|
||
'error_message': str(e),
|
||
'exception': e
|
||
}
|
||
|
||
# This shouldn't be reached, but just in case
|
||
return {
|
||
'success': False,
|
||
'error_type': 'max_retries_exceeded',
|
||
'error_message': f"Max retries ({max_retries}) exceeded",
|
||
'last_error': str(last_error) if last_error else "Unknown error"
|
||
}
|
||
|
||
def detect_images_in_layout(self, layout_path: str, layout_index: int, total_layouts: int) -> Dict:
|
||
"""Detect which master images appear in a single layout image"""
|
||
layout_name = Path(layout_path).name
|
||
print(f"Processing {layout_index}/{total_layouts}: {layout_name}")
|
||
|
||
try:
|
||
# Encode all master images and the layout image
|
||
master_ids = list(self.master_images.keys())
|
||
image_base64_list = []
|
||
|
||
# Add master images
|
||
for master_id in master_ids:
|
||
master_path = self.master_images[master_id]
|
||
master_b64 = self.encode_image_to_base64(master_path)
|
||
image_base64_list.append(master_b64)
|
||
|
||
# Add layout image
|
||
layout_b64 = self.encode_image_to_base64(layout_path)
|
||
image_base64_list.append(layout_b64)
|
||
|
||
# Create prompt
|
||
prompt = self.create_detection_prompt(master_ids)
|
||
|
||
# Make API call
|
||
api_result = self.make_robust_api_call(prompt, image_base64_list, f"detection for {layout_name}")
|
||
|
||
# Handle API call failure
|
||
if not api_result['success']:
|
||
error_msg = api_result['error_message']
|
||
print(f"API call failed for {layout_name}: {error_msg}")
|
||
return {
|
||
'detected_masters': [],
|
||
'analysis': f'API call failed: {error_msg}',
|
||
'error': f"{api_result['error_type']}: {error_msg}",
|
||
'retry_count': 3 # Max retries were attempted
|
||
}
|
||
|
||
# Parse response
|
||
response_text = api_result['text']
|
||
|
||
# Extract JSON from response
|
||
try:
|
||
start_idx = response_text.find('{')
|
||
end_idx = response_text.rfind('}') + 1
|
||
|
||
if start_idx == -1 or end_idx == 0:
|
||
raise ValueError("No JSON found in response")
|
||
|
||
json_str = response_text[start_idx:end_idx]
|
||
result = json.loads(json_str)
|
||
|
||
# Validate result format
|
||
if 'detected_masters' not in result:
|
||
result['detected_masters'] = []
|
||
if 'analysis' not in result:
|
||
result['analysis'] = response_text
|
||
|
||
# Deduplicate detected masters
|
||
original_detected = result['detected_masters'][:]
|
||
result['detected_masters'] = self.deduplicate_master_matches(result['detected_masters'])
|
||
|
||
# Track deduplication if any duplicates were removed
|
||
if len(result['detected_masters']) != len(original_detected):
|
||
duplicates_removed = len(original_detected) - len(result['detected_masters'])
|
||
result['deduplication_applied'] = True
|
||
result['duplicates_removed'] = duplicates_removed
|
||
result['original_detected_masters'] = original_detected
|
||
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s) from {layout_name}")
|
||
|
||
# Log completion
|
||
detected_count = len(result['detected_masters'])
|
||
print(f"✓ Completed {layout_name} - Found {detected_count} matches")
|
||
|
||
return result
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f"JSON decode error for {layout_name}: {e}")
|
||
return {
|
||
'detected_masters': [],
|
||
'analysis': response_text,
|
||
'error': f'JSON decode error: {e}'
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"Error analyzing {layout_name}: {e}"
|
||
print(error_msg)
|
||
return {
|
||
'detected_masters': [],
|
||
'analysis': '',
|
||
'error': str(e)
|
||
}
|
||
|
||
def detect_images_in_layout_one_at_a_time(self, layout_path: str, layout_index: int, total_layouts: int, stored_censorship_data=None) -> Dict:
|
||
"""Detect which master images appear in a layout by checking each master individually using process-based concurrency"""
|
||
layout_name = Path(layout_path).name
|
||
print(f"Processing {layout_index}/{total_layouts}: {layout_name} (Process-based one-at-a-time mode)")
|
||
|
||
master_ids = list(self.master_images.keys())
|
||
total_masters = len(master_ids)
|
||
detected_masters = []
|
||
detailed_results = []
|
||
|
||
print(f" Checking {total_masters} masters using {self.max_concurrent_workers} concurrent processes...")
|
||
|
||
# Prepare arguments for process pool
|
||
tasks = []
|
||
for master_id in master_ids:
|
||
master_path = self.master_images[master_id]
|
||
task_args = (
|
||
layout_path,
|
||
master_id,
|
||
master_path,
|
||
self.enable_greyscale,
|
||
self.enable_contrast_enhancement,
|
||
self.contrast_factor,
|
||
self.api_key
|
||
)
|
||
tasks.append(task_args)
|
||
|
||
# Use ProcessPoolExecutor for true isolation
|
||
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_concurrent_workers) as executor:
|
||
# Submit all tasks
|
||
future_to_master = {
|
||
executor.submit(process_single_master_detection_openai, *task_args): task_args[1]
|
||
for task_args in tasks
|
||
}
|
||
|
||
completed_count = 0
|
||
# Collect results as they complete
|
||
for future in concurrent.futures.as_completed(future_to_master):
|
||
master_id = future_to_master[future]
|
||
completed_count += 1
|
||
|
||
try:
|
||
result = future.result()
|
||
detailed_results.append(result)
|
||
|
||
# Track cost for this API call if token usage data is available
|
||
if 'token_usage' in result and result['token_usage']:
|
||
token_data = result['token_usage']
|
||
api_call_cost = cost_calculator.track_api_call(
|
||
operation_type="one_at_a_time_detection",
|
||
prompt_tokens=token_data['prompt_tokens'],
|
||
completion_tokens=token_data['completion_tokens'],
|
||
cached_tokens=token_data['cached_tokens'],
|
||
layout_name=layout_name,
|
||
master_id=master_id
|
||
)
|
||
|
||
# Show cost tracking progress every 10 completed masters
|
||
if cost_calculator.enable_tracking and completed_count % 10 == 0:
|
||
print(f" → API call cost: ${api_call_cost.total_cost:.4f} (Running total: ${cost_calculator.total_cost:.4f})")
|
||
elif cost_calculator.enable_tracking:
|
||
print(f" → Warning: No token usage data available for {master_id}")
|
||
|
||
# If match found, add to detected masters
|
||
if result.get('match_found', False):
|
||
detected_masters.append(master_id)
|
||
confidence = result.get('confidence', 'unknown')
|
||
print(f" {completed_count}/{total_masters}: ✓ MATCH found for {master_id} (confidence: {confidence})")
|
||
else:
|
||
if 'error' in result:
|
||
print(f" {completed_count}/{total_masters}: Error checking {master_id}: {result['error']}")
|
||
else:
|
||
print(f" {completed_count}/{total_masters}: No match for {master_id}")
|
||
|
||
except Exception as e:
|
||
print(f" {completed_count}/{total_masters}: Process error checking {master_id}: {e}")
|
||
# Add error result to maintain consistency
|
||
error_result = {
|
||
'match_found': False,
|
||
'master_id': master_id,
|
||
'confidence': 'unknown',
|
||
'analysis': '',
|
||
'error': str(e)
|
||
}
|
||
detailed_results.append(error_result)
|
||
|
||
# Sort detailed_results by master_id to maintain consistent ordering
|
||
detailed_results.sort(key=lambda x: x.get('master_id', ''))
|
||
|
||
# Deduplicate detected masters (shouldn't be needed in one-at-a-time mode, but for safety)
|
||
original_detected = detected_masters[:]
|
||
detected_masters = self.deduplicate_master_matches(detected_masters)
|
||
|
||
if len(detected_masters) != len(original_detected):
|
||
duplicates_removed = len(original_detected) - len(detected_masters)
|
||
print(f" Deduplication: Removed {duplicates_removed} duplicate master(s)")
|
||
|
||
detected_count = len(detected_masters)
|
||
print(f"✓ Completed {layout_name} - Found {detected_count} matches using {self.max_concurrent_workers} concurrent processes")
|
||
|
||
final_result = {
|
||
'detected_masters': detected_masters,
|
||
'detected_master_ids': detected_masters,
|
||
'detected_master_filenames': [f"{mid}.jpg" for mid in detected_masters],
|
||
'analysis': f'Process-based one-at-a-time analysis completed. Made {total_masters} separate API calls (one per master). Found {detected_count} exact matches out of {total_masters} masters checked using {self.max_concurrent_workers} concurrent processes.',
|
||
'detailed_results': detailed_results,
|
||
'processing_mode': 'process_based_one_at_a_time',
|
||
'total_masters_checked': total_masters,
|
||
'concurrent_workers': self.max_concurrent_workers,
|
||
'api_calls_made': total_masters, # One API call per master
|
||
'deduplication_applied': len(detected_masters) != len(original_detected),
|
||
'duplicates_removed': len(original_detected) - len(detected_masters) if len(detected_masters) != len(original_detected) else 0,
|
||
'original_detected_masters': original_detected
|
||
}
|
||
|
||
# STEP 1: Apply CEN refinement first if enabled and we have CEN matches
|
||
current_masters = detected_masters
|
||
if self.refinement_mode and current_masters:
|
||
cen_images = [mid for mid in current_masters if self.is_cen_image(mid)]
|
||
if cen_images:
|
||
print(f" Applying CEN refinement for {layout_name} (Step 1/2)...")
|
||
cen_result = self.apply_cen_refinement_to_results(layout_path, final_result, stored_censorship_data)
|
||
current_masters = cen_result.get('detected_masters', current_masters)
|
||
|
||
# Update final result with CEN refinement information
|
||
final_result.update(cen_result)
|
||
|
||
cen_count = len(current_masters)
|
||
print(f"✓ CEN refinement completed for {layout_name} - Result: {cen_count} masters")
|
||
|
||
# STEP 2: Apply panel-aware refinement if enabled and we have detected masters
|
||
if self.panel_aware_refinement and current_masters:
|
||
step_label = "Step 2/2" if self.refinement_mode else "Step 1/1"
|
||
print(f" Applying panel-aware refinement for {layout_name} ({step_label})...")
|
||
|
||
# Count panels in the layout
|
||
panel_result = self.count_panels_in_layout(layout_path)
|
||
panel_count = panel_result.get('panel_count', 1)
|
||
panel_confidence = panel_result.get('confidence', 'unknown')
|
||
|
||
print(f" Panel analysis: {panel_count} panels detected (confidence: {panel_confidence})")
|
||
|
||
# Refine matches based on panel count using current masters (after CEN refinement)
|
||
refinement_result = self.refine_matches_by_panel_count(layout_path, current_masters, panel_count)
|
||
|
||
# Update final result with panel-aware refinement information
|
||
final_result['detected_masters'] = refinement_result['refined_masters']
|
||
final_result['detected_master_ids'] = refinement_result['refined_masters']
|
||
final_result['detected_master_filenames'] = [f"{mid}.jpg" for mid in refinement_result['refined_masters']]
|
||
final_result['panel_aware_refinement_applied'] = True
|
||
final_result['panel_count_analysis'] = panel_result
|
||
final_result['panel_refinement_details'] = refinement_result
|
||
|
||
# Update analysis text
|
||
if refinement_result['refinement_applied']:
|
||
panel_desc = f"Panel-aware refinement applied: reduced from {refinement_result['original_count']} to {refinement_result['final_count']} masters based on {panel_count} detected panels."
|
||
else:
|
||
panel_desc = f"Panel-aware refinement skipped: {refinement_result['reason']}."
|
||
|
||
final_result['analysis'] += f" {panel_desc}"
|
||
|
||
final_detected_count = len(refinement_result['refined_masters'])
|
||
print(f"✓ Panel-aware refinement completed for {layout_name} - Final result: {final_detected_count} masters")
|
||
|
||
return final_result
|
||
|
||
def process_all_layouts(self, limit: Optional[int] = None, specific_file: Optional[str] = None) -> Dict:
|
||
"""Process all layout images sequentially"""
|
||
if self.one_at_a_time_mode:
|
||
mode_desc = "OpenAI One-at-a-time Mode"
|
||
else:
|
||
mode_desc = "OpenAI Multi Master Mode"
|
||
|
||
print(f"Starting sequential batch processing ({mode_desc})...")
|
||
|
||
# Load master images
|
||
self.load_master_images()
|
||
|
||
# Get layout files
|
||
if specific_file:
|
||
# Process only the specific file
|
||
layout_files = [self.layouts_path / specific_file]
|
||
if not layout_files[0].exists():
|
||
raise FileNotFoundError(f"Layout file {specific_file} not found in {self.layouts_path}")
|
||
print(f"Processing specific file: {specific_file}")
|
||
else:
|
||
layout_files = list(self.layouts_path.glob("*.jpg"))
|
||
|
||
if limit:
|
||
layout_files = layout_files[:limit]
|
||
print(f"Processing first {limit} layouts only")
|
||
|
||
total_layouts = len(layout_files)
|
||
print(f"Processing {total_layouts} layout images in {mode_desc}")
|
||
print("=" * 60)
|
||
|
||
results = {}
|
||
start_time = time.time()
|
||
|
||
for i, layout_path in enumerate(layout_files, 1):
|
||
layout_id = layout_path.stem
|
||
|
||
# Detect images in layout using the appropriate method
|
||
if self.split_mode:
|
||
# Split mode: split layout into panels and match each panel
|
||
master_ids = list(self.master_images.keys())
|
||
result = self.splitter.split_layout_and_match(str(layout_path), master_ids, self)
|
||
|
||
# Apply CEN refinement if enabled and there are matches
|
||
if self.refinement_mode and result.get('detected_masters'):
|
||
result = self.apply_cen_refinement_to_results(str(layout_path), result)
|
||
elif self.one_at_a_time_mode:
|
||
# One-at-a-time mode handles both CEN and panel-aware refinement internally
|
||
result = self.detect_images_in_layout_one_at_a_time(str(layout_path), i, total_layouts)
|
||
else:
|
||
# Multi-master mode only supports CEN refinement (not panel-aware)
|
||
result = self.detect_images_in_layout(str(layout_path), i, total_layouts)
|
||
|
||
# Apply CEN refinement if enabled and there are CEN matches
|
||
if self.refinement_mode and result.get('detected_masters'):
|
||
result = self.apply_cen_refinement_to_results(str(layout_path), result)
|
||
|
||
layout_result = {
|
||
'layout_filename': layout_path.name,
|
||
'detected_master_ids': result['detected_masters'],
|
||
'detected_master_filenames': [f"{mid}.jpg" for mid in result['detected_masters']],
|
||
'analysis': result.get('analysis', 'Split mode analysis'),
|
||
'detection_mode': mode_desc.lower().replace(' ', '_').replace('with_', '')
|
||
}
|
||
|
||
# Add split mode specific fields
|
||
if self.split_mode:
|
||
layout_result['split_mode'] = True
|
||
layout_result['splits_generated'] = result.get('splits_generated', 0)
|
||
layout_result['panel_count'] = result.get('panel_count', 1)
|
||
layout_result['panel_confidence'] = result.get('panel_confidence', 'unknown')
|
||
if 'split_results' in result:
|
||
layout_result['split_results'] = result['split_results']
|
||
|
||
# Add deduplication fields if applied
|
||
if 'deduplication_applied' in result:
|
||
layout_result['deduplication_applied'] = result['deduplication_applied']
|
||
layout_result['duplicates_removed'] = result['duplicates_removed']
|
||
layout_result['original_detected_masters'] = result['original_detected_masters']
|
||
|
||
if 'error' in result:
|
||
layout_result['error'] = result['error']
|
||
|
||
# Add refinement mode specific fields
|
||
if self.refinement_mode and result.get('refinement_applied'):
|
||
layout_result['refinement_applied'] = result['refinement_applied']
|
||
layout_result['refinement_details'] = result['refinement_details']
|
||
layout_result['censorship_analysis'] = result['censorship_analysis']
|
||
layout_result['original_detection_count'] = result['original_detection_count']
|
||
layout_result['refined_detection_count'] = result['refined_detection_count']
|
||
layout_result['changes_made'] = result.get('changes_made', 0)
|
||
|
||
results[layout_id] = layout_result
|
||
|
||
# Progress update with time estimate
|
||
elapsed = time.time() - start_time
|
||
avg_time = elapsed / i
|
||
remaining = (total_layouts - i) * avg_time
|
||
|
||
print(f"Progress: {i}/{total_layouts} ({i/total_layouts*100:.1f}%) - Est. remaining: {remaining/60:.1f} min")
|
||
|
||
# Save progress periodically
|
||
if i % 20 == 0:
|
||
self.save_results(results, f"openai_progress_{i}")
|
||
|
||
total_time = time.time() - start_time
|
||
print(f"\n✓ Completed processing all {total_layouts} layouts in {total_time/60:.1f} minutes")
|
||
print(f"Average time per layout: {total_time/total_layouts:.1f} seconds")
|
||
return results
|
||
|
||
def save_results(self, results: Dict, filename: str = "openai_detection_results") -> str:
|
||
"""Save results to JSON file"""
|
||
output_path = self.results_path / f"{filename}.json"
|
||
|
||
# Add metadata
|
||
output_data = {
|
||
'metadata': {
|
||
'total_layouts_processed': len(results),
|
||
'total_master_images': len(self.master_images),
|
||
'master_images_available': list(self.master_files.keys()),
|
||
'provider': 'openai',
|
||
'model': 'o3'
|
||
},
|
||
'results': results
|
||
}
|
||
|
||
with open(output_path, 'w') as f:
|
||
json.dump(output_data, f, indent=2)
|
||
|
||
print(f"Results saved to: {output_path}")
|
||
return str(output_path)
|
||
|
||
def generate_summary(self, results: Dict) -> Dict:
|
||
"""Generate summary statistics"""
|
||
total_layouts = len(results)
|
||
layouts_with_matches = sum(1 for r in results.values() if r['detected_master_ids'])
|
||
|
||
# Count master image occurrences
|
||
master_counts = {}
|
||
for result in results.values():
|
||
for master_id in result['detected_master_ids']:
|
||
master_counts[master_id] = master_counts.get(master_id, 0) + 1
|
||
|
||
# Deduplication statistics
|
||
layouts_with_deduplication = sum(1 for r in results.values() if r.get('deduplication_applied', False))
|
||
total_duplicates_removed = sum(r.get('duplicates_removed', 0) for r in results.values())
|
||
|
||
summary = {
|
||
'total_layouts_processed': total_layouts,
|
||
'layouts_with_matches': layouts_with_matches,
|
||
'layouts_without_matches': total_layouts - layouts_with_matches,
|
||
'master_image_usage': master_counts,
|
||
'most_used_masters': sorted(master_counts.items(), key=lambda x: x[1], reverse=True)[:10],
|
||
# Deduplication stats
|
||
'layouts_with_deduplication': layouts_with_deduplication,
|
||
'total_duplicates_removed': total_duplicates_removed,
|
||
'deduplication_rate': round(layouts_with_deduplication / total_layouts * 100, 1) if total_layouts > 0 else 0,
|
||
'provider': 'openai',
|
||
'model': 'o3'
|
||
}
|
||
|
||
return summary
|
||
|
||
def deduplicate_master_matches(self, detected_masters: List[str]) -> List[str]:
|
||
"""Remove duplicate master matches from a list while preserving order"""
|
||
if not detected_masters:
|
||
return detected_masters
|
||
|
||
# Simple deduplication - remove exact duplicates while preserving order
|
||
seen = set()
|
||
deduplicated = []
|
||
|
||
for master_id in detected_masters:
|
||
if master_id not in seen:
|
||
seen.add(master_id)
|
||
deduplicated.append(master_id)
|
||
|
||
return deduplicated
|
||
|
||
def cleanup_temp_files(self):
|
||
"""Clean up temporary processed image files - handles thread-safe filenames"""
|
||
try:
|
||
if self.temp_path.exists():
|
||
# Clean up both old and new thread-safe naming patterns
|
||
for temp_file in self.temp_path.glob("*_processed*.jpg"):
|
||
temp_file.unlink()
|
||
# Remove temp directory if empty
|
||
if not any(self.temp_path.iterdir()):
|
||
self.temp_path.rmdir()
|
||
except Exception as e:
|
||
print(f"Warning: Failed to cleanup temp files: {e}")
|
||
|
||
def is_cen_image(self, master_id: str) -> bool:
|
||
"""Check if a master image ID represents a CEN (censored) image"""
|
||
return '_CEN' in master_id
|
||
|
||
def find_corresponding_non_cen_image(self, cen_master_id: str) -> Optional[str]:
|
||
"""Find the corresponding non-CEN image for a given CEN master ID"""
|
||
if not self.is_cen_image(cen_master_id):
|
||
return None
|
||
|
||
# Transform CEN filename to non-CEN filename
|
||
# Example: "1011A_1011A_1011_01_CEN" -> "1011A_1011_01"
|
||
parts = cen_master_id.split('_')
|
||
if len(parts) >= 4 and parts[-1] == 'CEN':
|
||
# Remove the middle duplicate part and _CEN suffix
|
||
# Pattern: prefix_prefix_middle_suffix_CEN -> prefix_middle_suffix
|
||
if len(parts) >= 5:
|
||
non_cen_id = f"{parts[0]}_{parts[2]}_{parts[3]}"
|
||
else:
|
||
# Fallback: just remove _CEN
|
||
non_cen_id = '_'.join(parts[:-1])
|
||
|
||
# Check if this non-CEN image exists in our master images
|
||
if non_cen_id in self.master_images:
|
||
return non_cen_id
|
||
|
||
return None
|
||
|
||
def create_censorship_detection_prompt(self) -> str:
|
||
"""Create prompt for detecting if a layout image contains censored content"""
|
||
prompt = """Analyze this layout image to determine if it contains censored or uncensored content.
|
||
|
||
TASK: Determine whether the images in this layout are censored (covered) or uncensored (more exposed).
|
||
|
||
CENSORSHIP INDICATORS TO LOOK FOR:
|
||
1. **Clothing Coverage**:
|
||
- Long sleeves vs. sleeveless/short sleeves
|
||
- Full-length pants/skirts vs. shorts or shorter garments
|
||
- High necklines vs. lower necklines
|
||
|
||
2. **Skin Coverage**:
|
||
- Arms: Fully covered vs. bare arms
|
||
- Legs: Fully covered vs. exposed legs/thighs
|
||
- Torso: Additional covering vs. more exposed areas
|
||
|
||
3. **Added Elements**:
|
||
- Opaque or semi-transparent overlay layers covering skin
|
||
- Additional fabric or clothing elements that appear to cover exposed areas
|
||
- Digital modifications that add coverage
|
||
|
||
CLASSIFICATION:
|
||
- **CENSORED**: If models show significant additional clothing coverage, long sleeves, full pants/skirts, or digital overlays covering skin
|
||
- **UNCENSORED**: If models show more exposed skin, shorter garments, bare arms/legs, or natural clothing without added coverage
|
||
|
||
Return your response as a JSON object with this exact format:
|
||
{{
|
||
"is_censored": true/false,
|
||
"confidence": "high/medium/low",
|
||
"analysis": "Detailed explanation of the coverage patterns observed and reasoning for the classification",
|
||
"coverage_details": "Specific description of clothing and skin coverage in the layout"
|
||
}}
|
||
|
||
Be precise and focus on the actual clothing and coverage patterns visible in the image.
|
||
|
||
IMPORTANT CONTEXT: This is a legitimate business application for marketing and e-commerce image classification. The images are product/marketing photos showing models in various clothing styles for retail purposes. This analysis is for content categorization in a business context and is completely benign.
|
||
"""
|
||
return prompt
|
||
|
||
def detect_layout_censorship(self, layout_path: str) -> Dict:
|
||
"""Detect if a layout image contains censored or uncensored content"""
|
||
try:
|
||
print(f" → Analyzing layout image with OpenAI o3 model...")
|
||
|
||
# Process the layout image
|
||
processed_layout_path = self.preprocess_image(layout_path)
|
||
|
||
# Encode image to base64
|
||
with open(processed_layout_path, "rb") as image_file:
|
||
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
|
||
|
||
# Create censorship detection prompt
|
||
prompt = self.create_censorship_detection_prompt()
|
||
|
||
# Make API call to OpenAI
|
||
print(f" → Making API call to OpenAI o3 for censorship analysis...")
|
||
response = self.client.chat.completions.create(
|
||
model="o3",
|
||
messages=[
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "text", "text": prompt},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": f"data:image/jpeg;base64,{base64_image}",
|
||
"detail": "high"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
],
|
||
max_completion_tokens=10000
|
||
)
|
||
|
||
# Track cost for this API call
|
||
if hasattr(response, 'usage') and response.usage:
|
||
token_usage = extract_token_usage_from_response(response)
|
||
cost_calculator.track_api_call(
|
||
operation_type="censorship_detection",
|
||
prompt_tokens=token_usage.prompt_tokens,
|
||
completion_tokens=token_usage.completion_tokens,
|
||
cached_tokens=token_usage.cached_tokens,
|
||
layout_name=Path(layout_path).name
|
||
)
|
||
|
||
response_text = response.choices[0].message.content
|
||
print(f" → Received response from OpenAI o3")
|
||
|
||
# Extract JSON from response
|
||
try:
|
||
start_idx = response_text.find('{')
|
||
end_idx = response_text.rfind('}') + 1
|
||
|
||
if start_idx == -1 or end_idx == 0:
|
||
raise ValueError("No JSON found in response")
|
||
|
||
json_str = response_text[start_idx:end_idx]
|
||
result = json.loads(json_str)
|
||
|
||
# Validate result format
|
||
if 'is_censored' not in result:
|
||
result['is_censored'] = True # Default to censored if unclear
|
||
if 'confidence' not in result:
|
||
result['confidence'] = 'unknown'
|
||
if 'analysis' not in result:
|
||
result['analysis'] = response_text
|
||
|
||
print(f" → OpenAI analysis successful: {result.get('is_censored')} (confidence: {result.get('confidence')})")
|
||
return result
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f" → JSON parsing failed: {e}")
|
||
print(f" → Raw response: {response_text[:200]}...")
|
||
return {
|
||
'is_censored': True, # Default to censored if parsing fails
|
||
'confidence': 'unknown',
|
||
'analysis': response_text,
|
||
'error': f'JSON decode error: {e}'
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f" → Error in censorship detection: {e}")
|
||
return {
|
||
'is_censored': True, # Default to censored if error
|
||
'confidence': 'unknown',
|
||
'analysis': '',
|
||
'error': str(e)
|
||
}
|
||
|
||
def apply_cen_refinement_to_results(self, layout_path: str, initial_results: Dict, stored_censorship_data=None) -> Dict:
|
||
"""Apply CEN refinement to initial detection results"""
|
||
layout_name = Path(layout_path).name
|
||
detected_masters = initial_results.get('detected_masters', [])
|
||
|
||
# First, deduplicate the detected masters to avoid processing duplicates
|
||
original_count = len(detected_masters)
|
||
detected_masters = self.deduplicate_master_matches(detected_masters)
|
||
if len(detected_masters) != original_count:
|
||
duplicates_removed = original_count - len(detected_masters)
|
||
print(f" Removed {duplicates_removed} duplicate master(s) before CEN refinement")
|
||
|
||
# Find CEN images in the results
|
||
cen_images = [mid for mid in detected_masters if self.is_cen_image(mid)]
|
||
|
||
if not cen_images:
|
||
# No CEN images found, return original results
|
||
return initial_results
|
||
|
||
print(f" Refining {len(cen_images)} CEN matches for {layout_name}")
|
||
|
||
# Use stored censorship data if provided, otherwise make API call
|
||
if stored_censorship_data:
|
||
is_layout_censored = stored_censorship_data.get('is_censored', True)
|
||
confidence = stored_censorship_data.get('confidence', 'unknown')
|
||
print(f" Using stored censorship analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})")
|
||
# Create censorship_result from stored data for consistency
|
||
censorship_result = {
|
||
'is_censored': is_layout_censored,
|
||
'confidence': confidence,
|
||
'analysis': stored_censorship_data.get('analysis', ''),
|
||
'coverage_details': stored_censorship_data.get('coverage_details', '')
|
||
}
|
||
else:
|
||
print(f" Analyzing layout to determine censorship level...")
|
||
# Detect if the layout is censored or uncensored
|
||
censorship_result = self.detect_layout_censorship(layout_path)
|
||
is_layout_censored = censorship_result.get('is_censored', True)
|
||
confidence = censorship_result.get('confidence', 'unknown')
|
||
print(f" Layout analysis: {'CENSORED' if is_layout_censored else 'UNCENSORED'} (confidence: {confidence})")
|
||
|
||
refined_masters = []
|
||
refinement_details = []
|
||
changes_made = 0
|
||
|
||
# Process each detected image
|
||
for master_id in detected_masters:
|
||
if self.is_cen_image(master_id):
|
||
# This is a CEN image
|
||
non_cen_id = self.find_corresponding_non_cen_image(master_id)
|
||
|
||
if not is_layout_censored and non_cen_id:
|
||
# Layout is uncensored, switch to non-CEN version
|
||
refined_masters.append(non_cen_id)
|
||
refinement_details.append({
|
||
'original_cen_match': master_id,
|
||
'non_cen_alternative': non_cen_id,
|
||
'final_choice': non_cen_id,
|
||
'confidence': confidence,
|
||
'analysis': f"Layout determined to be uncensored, switched from {master_id} to {non_cen_id}",
|
||
'changed': True,
|
||
'reason': 'layout_uncensored'
|
||
})
|
||
changes_made += 1
|
||
print(f" → Changed {master_id} to {non_cen_id} (layout is uncensored)")
|
||
else:
|
||
# Layout is censored or no non-CEN alternative, keep CEN version
|
||
refined_masters.append(master_id)
|
||
reason = 'layout_censored' if is_layout_censored else 'no_non_cen_alternative'
|
||
refinement_details.append({
|
||
'original_cen_match': master_id,
|
||
'non_cen_alternative': non_cen_id,
|
||
'final_choice': master_id,
|
||
'confidence': confidence,
|
||
'analysis': f"Kept {master_id} - layout is censored or no non-CEN alternative available",
|
||
'changed': False,
|
||
'reason': reason
|
||
})
|
||
print(f" → Kept {master_id} ({'layout is censored' if is_layout_censored else 'no non-CEN alternative'})")
|
||
else:
|
||
# This is not a CEN image, keep it as-is
|
||
refined_masters.append(master_id)
|
||
|
||
print(f" Summary: {changes_made} CEN images changed to non-CEN versions")
|
||
|
||
# Apply deduplication to refined masters in case refinement introduced duplicates
|
||
original_refined = refined_masters[:]
|
||
refined_masters = self.deduplicate_master_matches(refined_masters)
|
||
|
||
if len(refined_masters) != len(original_refined):
|
||
post_refinement_duplicates = len(original_refined) - len(refined_masters)
|
||
print(f" Post-refinement deduplication: Removed {post_refinement_duplicates} duplicate(s)")
|
||
|
||
# Update results with refinement information
|
||
refined_results = initial_results.copy()
|
||
refined_results['detected_masters'] = refined_masters
|
||
refined_results['detected_master_ids'] = refined_masters # Update both fields for consistency
|
||
refined_results['detected_master_filenames'] = [f"{mid}.jpg" for mid in refined_masters]
|
||
refined_results['refinement_applied'] = True
|
||
refined_results['refinement_details'] = refinement_details
|
||
refined_results['censorship_analysis'] = censorship_result
|
||
refined_results['original_detection_count'] = len(detected_masters)
|
||
refined_results['refined_detection_count'] = len(refined_masters)
|
||
refined_results['changes_made'] = changes_made
|
||
|
||
return refined_results
|
||
|
||
def count_panels_and_detect_censorship(self, layout_path: str) -> Dict:
|
||
"""Count panels and detect censorship in a layout image using OpenAI o3 in a single call"""
|
||
layout_name = Path(layout_path).name
|
||
|
||
try:
|
||
print(f" → Analyzing panels and censorship in {layout_name} using OpenAI o3...")
|
||
|
||
# Encode layout image to base64
|
||
layout_b64 = self.encode_image_to_base64(layout_path)
|
||
|
||
# Create combined prompt for panel counting and censorship detection
|
||
prompt = """SYSTEM
|
||
You are a vision‑language expert hired to (a) count discrete image panels in fashion‑layout collages and
|
||
(b) flag any content requiring censorship review.
|
||
Follow every instruction exactly. Think first, then answer.
|
||
|
||
────────────────────────────────────────
|
||
TASK 1 – PANEL COUNTING
|
||
────────────────────────────────────────
|
||
❶ INTERNAL THINKING (keep private – do NOT reveal in final JSON)
|
||
• Load the entire image at native resolution.
|
||
• Scan left→right looking for vertical "gutters": ≥2 px columns whose pixel variance ≈ background (usually white/grey). Treat each continuous non‑gutter block as a candidate panel.
|
||
• Merge blocks if they depict the same photo merely split by design elements (logo strip, overlay text, drop shadow) – panels must contain *distinct* photographic content.
|
||
• If a wide candidate clearly contains multiple, non‑overlapping photos with no visual gutter (e.g., triptych glued together) **count each sub‑photo**; otherwise treat the whole block as one panel.
|
||
• Ignore duplicate imagery: identical crop, mirror, slight colour shift, size change ⇒ count once.
|
||
• Keep a running list: ⟨panel # , x‑start , x‑end , short human description⟩.
|
||
|
||
❷ AFTER thinking, produce:
|
||
{
|
||
"panel_count": <integer>,
|
||
"panel_confidence": "high" | "medium" | "low",
|
||
"panel_analysis": "<concise, public‑safe rationale – no private scratch work>",
|
||
"panel_descriptions": ["<panel 1>", … "<panel N>"]
|
||
}
|
||
|
||
────────────────────────────────────────
|
||
TASK 2 – CENSORSHIP SCREEN
|
||
────────────────────────────────────────
|
||
For every panel, decide whether it might violate standard fashion‑industry ad rules
|
||
(fully nude, explicit sexuality, hate imagery, illegal acts).
|
||
Add a sibling field:
|
||
|
||
"censorship_flags": ["clean", "clean", …] # length == panel_count
|
||
• "clean" – appears compliant
|
||
• "review‑nudity", "review‑sexual", "review‑violence", "review‑other"
|
||
|
||
────────────────────────────────────────
|
||
OUTPUT FORMAT (exactly, no extra keys, no Markdown)
|
||
────────────────────────────────────────
|
||
{ "panel_count": <integer>, "panel_confidence": "high/medium/low", "panel_analysis": "…",
|
||
"panel_descriptions": […], "censorship_flags": […], "is_censored": true/false,
|
||
"censorship_confidence": "high/medium/low", "censorship_analysis": "…" }
|
||
|
||
────────────────────────────────────────
|
||
💡 WORKED EXAMPLE — image: "H&M Spring campaign collage"
|
||
(This is for your reference; remove in production runs.)
|
||
|
||
INTERNAL THINK (abridged)
|
||
• Detected 17 vertical low‑variance gutters ⇒ 16 content blocks.
|
||
• Verified no duplicate crops; two blocks are composites but count as 1 each because photos overlap with no gutter.
|
||
• No NSFW elements (fashion poses, fully clothed).
|
||
|
||
PUBLIC OUTPUT
|
||
{
|
||
"panel_count": 16,
|
||
"panel_confidence": "high",
|
||
"panel_analysis": "Identified 16 distinct image tiles separated by visible white gutters; two wide tiles are multi‑photo composites but have no gutters so each treated as one panel. All panels show fully clothed fashion models.",
|
||
"panel_descriptions": [
|
||
"Two female models in brown gown & cream slip, 'SPRING' text",
|
||
"Solo model in black oversized coat + brown skirt, red H&M logo",
|
||
"Full‑body shot: peach maxi dress with tote bag",
|
||
"Full‑body shot: brown coat, black boots",
|
||
"Composite: three models in brown/peach plus two in cream suits, 'SPRING' overlay",
|
||
"Two female models leaning, matching cream flared suits, red H&M",
|
||
"Close‑up portrait of two women, heads touching",
|
||
"Two women embracing, neutral slip & cream jacket",
|
||
"Model in black leather jacket & white shorts, 'SPRING' text",
|
||
"Model in black bomber jacket & white shorts",
|
||
"Model in cream embellished cardigan & flared trousers, red H&M",
|
||
"Seated model in oversized white shirt",
|
||
"Two models in white outfits, playful pose",
|
||
"Wide triptych: (a) two models white/yellow mini + 'SPRING', (b) B&W shirt pose, red H&M, (c) close‑up couple",
|
||
"Composite: left pair in cream tunics, right pair trench + black mini, 'SPRING'",
|
||
"Two models – tan trench & black dress – red H&M logo"
|
||
],
|
||
"censorship_flags": [
|
||
"clean","clean","clean","clean",
|
||
"clean","clean","clean","clean",
|
||
"clean","clean","clean","clean",
|
||
"clean","clean","clean","clean"
|
||
],
|
||
"is_censored": false,
|
||
"censorship_confidence": "high",
|
||
"censorship_analysis": "All panels show fully clothed fashion models with appropriate coverage for retail advertising"
|
||
}
|
||
END OF EXAMPLE"""
|
||
|
||
# Make API call
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
try:
|
||
response = self.client.chat.completions.create(
|
||
model="o3",
|
||
messages=[
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "text", "text": prompt},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": f"data:image/jpeg;base64,{layout_b64}",
|
||
"detail": "high"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
],
|
||
max_completion_tokens=10000
|
||
)
|
||
|
||
# Track cost for this API call
|
||
if hasattr(response, 'usage') and response.usage:
|
||
token_usage = extract_token_usage_from_response(response)
|
||
cost_calculator.track_api_call(
|
||
operation_type="panel_counting_censorship",
|
||
prompt_tokens=token_usage.prompt_tokens,
|
||
completion_tokens=token_usage.completion_tokens,
|
||
cached_tokens=token_usage.cached_tokens,
|
||
layout_name=layout_name
|
||
)
|
||
|
||
response_text = response.choices[0].message.content.strip()
|
||
print(f" → Received combined analysis response from OpenAI o3")
|
||
|
||
# Extract JSON from response
|
||
try:
|
||
start_idx = response_text.find('{')
|
||
end_idx = response_text.rfind('}') + 1
|
||
|
||
if start_idx == -1 or end_idx == 0:
|
||
raise ValueError("No JSON found in response")
|
||
|
||
json_str = response_text[start_idx:end_idx]
|
||
result = json.loads(json_str)
|
||
|
||
# Validate and normalize panel count fields
|
||
if 'panel_count' not in result:
|
||
result['panel_count'] = 1 # Default to single panel
|
||
if 'panel_confidence' not in result:
|
||
result['panel_confidence'] = 'unknown'
|
||
if 'panel_analysis' not in result:
|
||
result['panel_analysis'] = response_text
|
||
|
||
# Validate and normalize censorship fields
|
||
if 'is_censored' not in result:
|
||
result['is_censored'] = True # Default to censored if unclear
|
||
if 'censorship_confidence' not in result:
|
||
result['censorship_confidence'] = 'unknown'
|
||
if 'censorship_analysis' not in result:
|
||
result['censorship_analysis'] = response_text
|
||
|
||
# Ensure panel_count is a positive integer
|
||
try:
|
||
result['panel_count'] = max(1, int(result['panel_count']))
|
||
except (ValueError, TypeError):
|
||
result['panel_count'] = 1
|
||
|
||
# Ensure is_censored is a boolean
|
||
if isinstance(result['is_censored'], str):
|
||
result['is_censored'] = result['is_censored'].lower() in ['true', '1', 'yes']
|
||
|
||
print(f" → Combined analysis successful: {result['panel_count']} panels (confidence: {result.get('panel_confidence')}), censored: {result['is_censored']} (confidence: {result.get('censorship_confidence')})")
|
||
return result
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f" → JSON parsing failed: {e}")
|
||
if attempt == max_retries - 1:
|
||
return {
|
||
'panel_count': 1, # Default to single panel
|
||
'panel_confidence': 'unknown',
|
||
'panel_analysis': response_text,
|
||
'is_censored': True, # Default to censored
|
||
'censorship_confidence': 'unknown',
|
||
'censorship_analysis': response_text,
|
||
'error': f'JSON decode error: {e}'
|
||
}
|
||
|
||
except Exception as e:
|
||
if attempt == max_retries - 1:
|
||
print(f" → Error in combined analysis: {e}")
|
||
return {
|
||
'panel_count': 1, # Default to single panel
|
||
'panel_confidence': 'unknown',
|
||
'panel_analysis': '',
|
||
'is_censored': True, # Default to censored
|
||
'censorship_confidence': 'unknown',
|
||
'censorship_analysis': '',
|
||
'error': str(e)
|
||
}
|
||
time.sleep((2 ** attempt) * 0.5)
|
||
|
||
except Exception as e:
|
||
print(f" → Error in combined analysis: {e}")
|
||
return {
|
||
'panel_count': 1, # Default to single panel
|
||
'panel_confidence': 'unknown',
|
||
'panel_analysis': '',
|
||
'is_censored': True, # Default to censored
|
||
'censorship_confidence': 'unknown',
|
||
'censorship_analysis': '',
|
||
'error': str(e)
|
||
}
|
||
|
||
def count_panels_in_layout(self, layout_path: str) -> Dict:
|
||
"""Legacy compatibility method for panel counting only"""
|
||
combined_result = self.count_panels_and_detect_censorship(layout_path)
|
||
|
||
# Convert to old format for backward compatibility
|
||
return {
|
||
'panel_count': combined_result.get('panel_count', 1),
|
||
'confidence': combined_result.get('panel_confidence', 'unknown'),
|
||
'analysis': combined_result.get('panel_analysis', ''),
|
||
'panel_descriptions': combined_result.get('panel_descriptions', []),
|
||
'error': combined_result.get('error', None)
|
||
}
|
||
|
||
def calculate_inliers_for_match(self, layout_path: str, master_path: str, master_id: str) -> Dict:
|
||
"""Calculate inlier count for a master image match using OpenCV feature matching"""
|
||
try:
|
||
# Read images in grayscale for feature detection
|
||
layout_img = cv2.imread(layout_path, cv2.IMREAD_GRAYSCALE)
|
||
master_img = cv2.imread(master_path, cv2.IMREAD_GRAYSCALE)
|
||
|
||
if layout_img is None or master_img is None:
|
||
return {
|
||
'master_id': master_id,
|
||
'inliers': 0,
|
||
'confidence': 'low',
|
||
'error': 'Could not read one or both images'
|
||
}
|
||
|
||
# Initialize feature detector and matcher (using same approach as example code)
|
||
akaze = cv2.AKAZE_create()
|
||
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
||
|
||
# Detect keypoints and descriptors
|
||
kp1, des1 = akaze.detectAndCompute(layout_img, None)
|
||
kp2, des2 = akaze.detectAndCompute(master_img, None)
|
||
|
||
if des1 is None or des2 is None:
|
||
return {
|
||
'master_id': master_id,
|
||
'inliers': 0,
|
||
'confidence': 'low',
|
||
'error': 'No features detected in one or both images'
|
||
}
|
||
|
||
# Match features using k-nearest neighbors
|
||
matches = bf.knnMatch(des1, des2, k=2)
|
||
|
||
# Apply Lowe's ratio test to filter good matches
|
||
good_matches = []
|
||
for match_pair in matches:
|
||
if len(match_pair) == 2:
|
||
m, n = match_pair
|
||
if m.distance < 0.75 * n.distance:
|
||
good_matches.append(m)
|
||
|
||
min_good_matches = 10 # Same threshold as example code
|
||
if len(good_matches) < min_good_matches:
|
||
return {
|
||
'master_id': master_id,
|
||
'inliers': 0,
|
||
'confidence': 'low',
|
||
'good_matches': len(good_matches),
|
||
'reason': f'Insufficient good matches: {len(good_matches)} < {min_good_matches}'
|
||
}
|
||
|
||
# Extract matched points
|
||
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||
|
||
# Find homography using RANSAC
|
||
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
|
||
|
||
if mask is None:
|
||
return {
|
||
'master_id': master_id,
|
||
'inliers': 0,
|
||
'confidence': 'low',
|
||
'good_matches': len(good_matches),
|
||
'error': 'Homography estimation failed'
|
||
}
|
||
|
||
# Count inliers
|
||
inliers = int(np.sum(mask))
|
||
|
||
# Determine confidence based on inlier count and ratio
|
||
inlier_ratio = inliers / len(good_matches)
|
||
if inliers >= 50 and inlier_ratio >= 0.6:
|
||
confidence = 'high'
|
||
elif inliers >= 20 and inlier_ratio >= 0.4:
|
||
confidence = 'medium'
|
||
else:
|
||
confidence = 'low'
|
||
|
||
return {
|
||
'master_id': master_id,
|
||
'inliers': inliers,
|
||
'confidence': confidence,
|
||
'good_matches': len(good_matches),
|
||
'inlier_ratio': round(inlier_ratio, 3),
|
||
'total_features_layout': len(kp1),
|
||
'total_features_master': len(kp2)
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'master_id': master_id,
|
||
'inliers': 0,
|
||
'confidence': 'low',
|
||
'error': str(e)
|
||
}
|
||
|
||
def refine_matches_by_panel_count(self, layout_path: str, detected_masters: List[str], panel_count: int) -> Dict:
|
||
"""Refine detected masters based on panel count using inlier analysis"""
|
||
layout_name = Path(layout_path).name
|
||
|
||
# First, deduplicate the detected masters to avoid processing the same master multiple times
|
||
original_count = len(detected_masters)
|
||
detected_masters = self.deduplicate_master_matches(detected_masters)
|
||
if len(detected_masters) != original_count:
|
||
duplicates_removed = original_count - len(detected_masters)
|
||
print(f" Removed {duplicates_removed} duplicate master(s) before panel-aware refinement")
|
||
|
||
# Optimization: If panel count equals detected masters count, skip refinement
|
||
if panel_count == len(detected_masters):
|
||
print(f" Panel count ({panel_count}) matches detected masters count ({len(detected_masters)}) - skipping refinement")
|
||
return {
|
||
'refined_masters': detected_masters,
|
||
'refinement_applied': False,
|
||
'reason': 'panel_count_matches_detected_count',
|
||
'panel_count': panel_count,
|
||
'original_count': len(detected_masters),
|
||
'final_count': len(detected_masters)
|
||
}
|
||
|
||
# Only refine if we have more detected masters than panels
|
||
if len(detected_masters) <= panel_count:
|
||
print(f" Detected masters ({len(detected_masters)}) <= panel count ({panel_count}) - no refinement needed")
|
||
return {
|
||
'refined_masters': detected_masters,
|
||
'refinement_applied': False,
|
||
'reason': 'detected_count_within_panel_limit',
|
||
'panel_count': panel_count,
|
||
'original_count': len(detected_masters),
|
||
'final_count': len(detected_masters)
|
||
}
|
||
|
||
print(f" Refining {len(detected_masters)} masters to best {panel_count} using inlier analysis...")
|
||
|
||
# Calculate inliers for each detected master
|
||
inlier_results = []
|
||
for i, master_id in enumerate(detected_masters):
|
||
master_path = self.master_images[master_id]
|
||
print(f" → Analyzing {i+1}/{len(detected_masters)}: {master_id}")
|
||
|
||
inlier_result = self.calculate_inliers_for_match(layout_path, master_path, master_id)
|
||
inlier_results.append(inlier_result)
|
||
|
||
inliers = inlier_result.get('inliers', 0)
|
||
confidence = inlier_result.get('confidence', 'unknown')
|
||
print(f" ✓ {master_id}: {inliers} inliers (confidence: {confidence})")
|
||
|
||
# Sort by inlier count (descending) to get best matches
|
||
inlier_results.sort(key=lambda x: x.get('inliers', 0), reverse=True)
|
||
|
||
# Select top N matches where N = panel_count
|
||
refined_masters = [result['master_id'] for result in inlier_results[:panel_count]]
|
||
|
||
print(f" Refinement complete: Selected top {len(refined_masters)} masters based on inlier analysis")
|
||
|
||
# Log the selection details
|
||
for i, result in enumerate(inlier_results[:panel_count]):
|
||
rank = i + 1
|
||
master_id = result['master_id']
|
||
inliers = result.get('inliers', 0)
|
||
confidence = result.get('confidence', 'unknown')
|
||
print(f" → Rank {rank}: {master_id} ({inliers} inliers, {confidence} confidence)")
|
||
|
||
return {
|
||
'refined_masters': refined_masters,
|
||
'refinement_applied': True,
|
||
'reason': 'inlier_based_selection',
|
||
'panel_count': panel_count,
|
||
'original_count': len(detected_masters),
|
||
'final_count': len(refined_masters),
|
||
'inlier_analysis': inlier_results,
|
||
'selection_details': {
|
||
'method': 'highest_inlier_count',
|
||
'selected_masters': [
|
||
{
|
||
'rank': i+1,
|
||
'master_id': result['master_id'],
|
||
'inliers': result.get('inliers', 0),
|
||
'confidence': result.get('confidence', 'unknown')
|
||
}
|
||
for i, result in enumerate(inlier_results[:panel_count])
|
||
]
|
||
}
|
||
} |