ai_qc/backend/api_server.py
nickviljoen e136f1f905 Fix backend web UI 404 error with absolute path
Applied the same web_ui.html path fix to backend/api_server.py that
was previously applied to the root api_server.py. The backend version
needs to reference the parent directory since web_ui.html is located
at /opt/ai_qc/web_ui.html while api_server.py is in /opt/ai_qc/backend/.

This fixes the 404 error when running via Waitress production server.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-06 22:07:23 +02:00

3547 lines
No EOL
159 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
API server for Visual AI QC application.
Provides API endpoints for visual quality control checks without web UI.
"""
import os
import sys
import json
import base64
import importlib
import traceback
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, Response, make_response
from dotenv import load_dotenv
# Determine environment and load appropriate config
def load_environment_config():
"""Load environment-specific configuration"""
# Check for environment variable first
environment = os.environ.get('ENVIRONMENT', 'development')
# Determine config file path based on environment
base_dir = os.path.dirname(os.path.abspath(__file__))
# Try new config structure first
config_path = os.path.join(base_dir, 'config', f'{environment}.env')
# Fall back to old config.env if new structure doesn't exist
if not os.path.exists(config_path):
old_config_path = os.path.join(base_dir, 'config.env')
if os.path.exists(old_config_path):
config_path = old_config_path
environment = 'production' # Assume production for backward compatibility
print(f"Using legacy config file: {config_path}")
else:
print(f"No configuration file found. Checked: {config_path} and {old_config_path}")
return environment
# Load the configuration
load_dotenv(config_path)
print(f"Environment: {environment}")
print(f"Loaded configuration from: {config_path}")
print(f"OPENAI_API_KEY set: {'OPENAI_API_KEY' in os.environ}")
print(f"GOOGLE_API_KEY set: {'GOOGLE_API_KEY' in os.environ}")
print(f"Port: {os.environ.get('PORT', 'not set')}")
return environment
# Load environment configuration
current_environment = load_environment_config()
# Add the parent directory to the Python path to ensure imports work correctly
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Import QC utilities and model configuration
from visual_qc_apps.utils import get_image_from_asset
from llm_config import run_visual_qc, get_model_info
from profile_config import QC_CHECKS, PROFILES, get_profile, get_check_llm_map
from brand_guidelines_db import BrandGuidelinesDB
from auth_middleware import AuthMiddleware
from PIL import Image
import io
# Create Flask app
app = Flask(__name__)
# Configure app based on environment
upload_folder = os.environ.get('UPLOAD_FOLDER', 'uploads')
output_folder = os.environ.get('OUTPUT_FOLDER', 'output')
debug_mode = os.environ.get('DEBUG_MODE', 'false').lower() == 'true'
app.config['UPLOAD_FOLDER'] = upload_folder
app.config['OUTPUT_FOLDER'] = output_folder
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'default-secret-change-this')
app.debug = debug_mode
print(f"Upload folder: {upload_folder}")
print(f"Output folder: {output_folder}")
print(f"Debug mode: {debug_mode}")
# Ensure directories exist
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True)
# Initialize authentication middleware
auth = AuthMiddleware(app)
# Initialize brand guidelines database
brand_db = BrandGuidelinesDB()
# Global progress tracking
progress_tracker = {}
# Dictionary to store QC app instances and prompts
qc_apps = {}
# Define QC checks that require reference assets
REFERENCE_ASSET_REQUIRED_CHECKS = {'brand_assets_visibility', 'visual_hierarchy', 'logo_visibility'}
def extract_json_from_response(response_text):
"""Extract JSON objects from the LLM's response"""
# First, try to find JSON blocks (```json ... ```)
json_pattern = r'```json\s*(.*?)\s*```'
json_matches = re.finditer(json_pattern, response_text, re.DOTALL)
# Get all JSON blocks as a list
json_objects = []
for match in json_matches:
try:
json_data = json.loads(match.group(1).strip())
json_objects.append(json_data)
except Exception as e:
print(f"Could not parse JSON block: {e}")
# If we found multiple JSON blocks, merge them (later blocks override earlier blocks)
if json_objects:
merged_json = {}
for json_obj in json_objects:
if json_obj: # If not empty
merged_json.update(json_obj)
if merged_json:
return merged_json
# If we couldn't extract JSON blocks or they were empty, look for JSON directly
try:
# Try to find pure JSON in response (without code blocks)
# Remove markdown code formatting first
clean_response = re.sub(r'```.*?```', '', response_text, flags=re.DOTALL)
# Look for text that looks like JSON (between { and })
json_pattern = r'\{.*\}'
json_match = re.search(json_pattern, clean_response, re.DOTALL)
if json_match:
try:
json_data = json.loads(json_match.group(0))
return json_data
except json.JSONDecodeError:
pass
except Exception as e:
print(f"Failed to extract direct JSON: {e}")
# If we couldn't find valid JSON, return an empty dict
return {}
def detect_and_crop_main_element(image_path, file_type_hint=None):
"""Detect main element in POS files and return cropped region coordinates"""
try:
if not file_type_hint or 'pos' not in file_type_hint.lower():
return None
# Create a prompt to detect the main marketing element
crop_detection_prompt = """
This appears to be a POS (Point of Sale) material. Please identify the main marketing element that should be used for QC analysis.
Look for the primary branded content area (usually the center panel or main marketing message) and ignore peripheral elements like:
- Side panels with fine print
- Edge decorations
- Background patterns
- Border elements
Respond with JSON format:
{
"main_element_detected": true/false,
"crop_coordinates": {
"x": left_position_percentage,
"y": top_position_percentage,
"width": width_percentage,
"height": height_percentage
},
"description": "description of the main element found"
}
Coordinates should be percentages (0-100) of the total image dimensions.
"""
# Run AI analysis to detect main element
result = run_visual_qc(
prompt=crop_detection_prompt,
asset_path=image_path,
model_name="Gemini"
)
# Extract crop coordinates
crop_data = extract_json_from_response(result['response'])
if crop_data.get('main_element_detected') and 'crop_coordinates' in crop_data:
return crop_data['crop_coordinates']
return None
except Exception as e:
print(f"Error detecting main element: {e}")
return None
def apply_crop_to_analysis(image_path, crop_coordinates):
"""Apply cropping to image for analysis (if coordinates provided)"""
if not crop_coordinates:
return image_path
try:
from PIL import Image
import os
# Load original image
with Image.open(image_path) as img:
width, height = img.size
# Convert percentage coordinates to pixels
x = int((crop_coordinates['x'] / 100) * width)
y = int((crop_coordinates['y'] / 100) * height)
crop_width = int((crop_coordinates['width'] / 100) * width)
crop_height = int((crop_coordinates['height'] / 100) * height)
# Ensure coordinates are within image bounds
x = max(0, min(x, width))
y = max(0, min(y, height))
crop_width = min(crop_width, width - x)
crop_height = min(crop_height, height - y)
# Crop the image
cropped_img = img.crop((x, y, x + crop_width, y + crop_height))
# Save cropped version
filename, ext = os.path.splitext(image_path)
cropped_path = f"{filename}_cropped{ext}"
cropped_img.save(cropped_path)
return cropped_path
except Exception as e:
print(f"Error applying crop: {e}")
return image_path
return image_path
def extract_score_from_result(result, profile_config=None, check_name=None):
"""Extract score from LLM result response with Unilever-specific logic"""
score = None
try:
# Use our extraction function to get score from JSON blocks
json_data = extract_json_from_response(result['response'])
# Unilever Key Visual profile specific logic
if (profile_config and
((hasattr(profile_config, 'name') and profile_config.name == 'Unilever Key Visual') or
(hasattr(profile_config, 'get') and profile_config.get('name') == 'Unilever Key Visual')) and
check_name in ['face_visibility', 'new_visibility', 'face_gaze_direction']):
# Check for zero score conditions based on missing elements
if check_name == 'face_visibility' and json_data.get('face_present') == False:
print(f"Unilever profile: No face detected for {check_name}, setting score to 0")
return 0
elif check_name == 'new_visibility' and json_data.get('new_present') == False:
print(f"Unilever profile: No 'new' element detected for {check_name}, setting score to 0")
return 0
elif check_name == 'face_gaze_direction' and json_data.get('face_present') == False:
print(f"Unilever profile: No face detected for {check_name}, setting score to 0")
return 0
# Standard scoring logic
if 'score' in json_data:
score = json_data.get('score')
print(f"Extracted score from JSON block: {score}")
# If we still don't have a score, look for any score in text
if score is None:
# Try to find a score pattern in the text
score_pattern = r'["\']score["\']\s*:\s*(\d+)'
score_match = re.search(score_pattern, result['response'])
if score_match:
score = int(score_match.group(1))
print(f"Extracted score from regex: {score}")
else:
# Look for descriptive scores in text
descriptive_score_pattern = r'score(?:\s+is|\s*:\s*|\s+of\s+)(?:\s*)(\d+)(?:\s*out\s*of\s*10)?'
descriptive_match = re.search(descriptive_score_pattern, result['response'].lower())
if descriptive_match:
score = int(descriptive_match.group(1))
print(f"Extracted score from descriptive text: {score}")
else:
# Try to determine score from pass/fail status (legacy mode)
result_text = result.get('response', '').upper()
if "PASS" in result_text:
score = 10 # Pass = 10/10
print("Detected PASS keyword, setting score to 10")
elif "FAIL" in result_text:
score = 3 # Fail = 3/10
print("Detected FAIL keyword, setting score to 3")
else:
score = 5 # Default middle score
print(f"Could not extract score, using default of 5")
except Exception as parse_error:
print(f"Error parsing score from response: {parse_error}")
score = 5 # Default to middle score
return score if score is not None else 5
def determine_grade(overall_score):
"""Determine Pass/Fail based on overall score"""
# Convert overall score to individual check average (1-10 scale)
avg_individual_score = overall_score / 10
if avg_individual_score >= 6:
return 'Pass'
else:
return 'Fail'
def process_single_check(check_name, qc_apps, profile_config, profile_weights, file_path,
analysis_reference_asset, brand_db, progress_tracker, session_id,
check_index, total_checks):
"""Process a single QC check - designed to run in parallel"""
try:
# Check if this check requires a reference asset but none is provided
if check_name in REFERENCE_ASSET_REQUIRED_CHECKS and not analysis_reference_asset:
# Return automatic fail with score 0
fail_response = f"Reference asset is required for the '{check_name}' QC check but was not provided."
return {
'check_name': check_name,
'status': 'success',
'score': 0,
'result': 'Fail',
'response': fail_response,
'json_data': {},
'requires_brand_guidelines': False,
'brand_guidelines_status': None
}
check_prompt = qc_apps[check_name]['prompt']
llm_model = profile_config.get_check_llm(check_name)
# Handle brand guidelines if needed
requires_brand_guidelines = any(keyword in check_prompt.lower() for keyword in [
'brand guideline', 'brand standard', 'brand requirement', 'brand specification',
'brand compliance', 'brand rule', 'brand policy'
])
brand_guidelines_status = None
detected_brand = None
# Since we skip triage, we won't have detected_brand automatically
# Could be enhanced with direct brand detection if needed
if requires_brand_guidelines and detected_brand:
brand_guidelines = brand_db.get_brand_guidelines(detected_brand)
if brand_guidelines:
brand_guidelines_status = f"Brand guidelines found for {detected_brand} ({len(brand_guidelines)} files)."
# Add guidelines to prompt (simplified version)
check_prompt += f"\n\nBrand Analysis Context: Using brand guidelines for {detected_brand}."
else:
brand_guidelines_status = f"INFO: Brand detected as '{detected_brand}' but no brand guidelines found."
# Add pre-analysis instructions if available
final_prompt = check_prompt
if profile_config.pre_analysis_instructions:
final_prompt = profile_config.pre_analysis_instructions + "\n\n" + check_prompt
# Add reference asset content if selected
reference_image_path = None
if analysis_reference_asset:
reference_content = get_reference_asset_content(analysis_reference_asset)
if reference_content:
final_prompt = reference_content + "\n\n" + final_prompt
print(f"Added reference asset {analysis_reference_asset} to {check_name} prompt")
# Also get the actual reference image path for LLM
reference_image_path = get_reference_asset_image_path(analysis_reference_asset)
print(f"Running check {check_index + 1}/{total_checks}: {check_name}")
result = run_visual_qc(
prompt=final_prompt,
asset_path=file_path,
reference_path=reference_image_path,
model_name=llm_model
)
# Extract score and data
json_data = extract_json_from_response(result['response'])
score = extract_score_from_result(result, profile_config, check_name)
weight = profile_weights.get(check_name, 0.1)
weighted_score = score * weight if score is not None else 0
return {
'check_name': check_name,
'status': 'success',
'result': result,
'response': result['response'],
'brand_guidelines_status': brand_guidelines_status,
'requires_brand_guidelines': requires_brand_guidelines,
'json_data': json_data,
'score': score,
'weight': weight,
'weighted_score': weighted_score,
'model_used': result.get('model_info', {}),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'display_name': check_name.replace('_', ' ').title()
}
except Exception as e:
print(f"Error in check {check_name}: {str(e)}")
weight = profile_weights.get(check_name, 0.1)
return {
'check_name': check_name,
'status': 'error',
'error': str(e),
'weight': weight,
'score': 0,
'weighted_score': 0
}
def process_checks_in_batches(enabled_checks, qc_apps, profile_config, profile_weights,
file_path, analysis_reference_asset, brand_db, progress_tracker,
session_id, batch_size=15):
"""Process QC checks in parallel batches"""
check_results = {}
total_checks = len(enabled_checks)
completed_checks = 0
print(f"Processing {total_checks} checks in batches of {batch_size}")
# Split checks into batches
for batch_start in range(0, total_checks, batch_size):
batch_end = min(batch_start + batch_size, total_checks)
batch_checks = enabled_checks[batch_start:batch_end]
batch_number = (batch_start // batch_size) + 1
total_batches = (total_checks + batch_size - 1) // batch_size
print(f"Processing batch {batch_number}/{total_batches} ({len(batch_checks)} checks)")
# Update progress for batch start
progress_tracker[session_id].update({
'current_batch': batch_number,
'total_batches': total_batches,
'current_check': f"Batch {batch_number}",
'current_check_display': f"Processing batch {batch_number}/{total_batches}",
'percentage': 10 + ((completed_checks / total_checks) * 80)
})
# Process batch in parallel
with ThreadPoolExecutor(max_workers=batch_size) as executor:
# Submit all checks in the batch
future_to_check = {}
for i, check_name in enumerate(batch_checks):
future = executor.submit(
process_single_check,
check_name, qc_apps, profile_config, profile_weights, file_path,
analysis_reference_asset, brand_db, progress_tracker, session_id,
batch_start + i, total_checks
)
future_to_check[future] = check_name
# Collect results as they complete
batch_results = {}
for future in as_completed(future_to_check):
check_name = future_to_check[future]
try:
result = future.result()
batch_results[result['check_name']] = result
completed_checks += 1
# Update progress for each completed check
progress_tracker[session_id].update({
'completed_checks': completed_checks,
'percentage': 10 + ((completed_checks / total_checks) * 80)
})
print(f"Completed check: {check_name} ({completed_checks}/{total_checks})")
except Exception as e:
print(f"Error getting result for check {check_name}: {str(e)}")
weight = profile_weights.get(check_name, 0.1)
batch_results[check_name] = {
'check_name': check_name,
'status': 'error',
'error': str(e),
'weight': weight,
'score': 0,
'weighted_score': 0
}
completed_checks += 1
# Add batch results to main results
check_results.update(batch_results)
print(f"Completed batch {batch_number}/{total_batches}")
# Small delay between batches to avoid overwhelming the APIs
if batch_number < total_batches:
time.sleep(0.5)
print(f"Completed all {total_checks} checks in {total_batches} batches")
return check_results
def process_single_check_with_triage(check_name, qc_apps, profile_config, profile_weights, file_path,
reference_asset, brand_db, progress_tracker, session_id,
check_index, total_checks):
"""Process a single QC check with triage logic - designed to run in parallel"""
try:
# Check if this check requires a reference asset but none is provided
if check_name in REFERENCE_ASSET_REQUIRED_CHECKS and not reference_asset:
# Return automatic fail with score 0
fail_response = f"Reference asset is required for the '{check_name}' QC check but was not provided."
return {
'check_name': check_name,
'status': 'success',
'score': 0,
'result': 'Fail',
'response': fail_response,
'json_data': {},
'requires_brand_guidelines': False,
'brand_guidelines_status': None
}
check_prompt = qc_apps[check_name]['prompt']
llm_model = profile_config.get_check_llm(check_name)
# Check if this check requires brand guidelines
requires_brand_guidelines = any(keyword in check_prompt.lower() for keyword in [
'brand guideline', 'brand standard', 'brand requirement', 'brand specification',
'brand compliance', 'brand rule', 'brand policy'
])
brand_guidelines_status = None
# Try to extract brand from triage results or detection
detected_brand = None
# Since we skip triage, we won't have detected_brand automatically
# Could be enhanced with direct brand detection if needed
if requires_brand_guidelines:
if detected_brand:
brand_guidelines = brand_db.get_brand_guidelines(detected_brand)
if not brand_guidelines:
brand_guidelines_status = f"INFO: Brand detected as '{detected_brand}' but no brand guidelines found. Performing generic brand analysis."
check_prompt += f"\n\nBrand Analysis Context: The detected brand is '{detected_brand}'. While specific brand guidelines are not available, please analyze the visual content for general brand consistency, professional appearance, and adherence to common branding best practices for this brand if you're familiar with it."
else:
brand_guidelines_status = f"Brand guidelines found for {detected_brand} ({len(brand_guidelines)} files)."
# Add brand guidelines content to the prompt
guidelines_content = "\n\n=== BRAND GUIDELINES REFERENCE ===\n"
guidelines_content += f"The following brand guidelines have been provided for {detected_brand}:\n\n"
for guideline in brand_guidelines:
guidelines_content += f"**Guideline File: {guideline.get('original_filename', 'Unknown')}**\n"
if guideline.get('description'):
guidelines_content += f"Description: {guideline['description']}\n"
if guideline.get('tags'):
guidelines_content += f"Tags: {', '.join(guideline['tags'])}\n"
# Try to read file content if it's a text-based file
guideline_file_path = guideline.get('file_path')
if guideline_file_path and os.path.exists(guideline_file_path):
try:
file_ext = os.path.splitext(guideline_file_path)[1].lower()
if file_ext in ['.txt', '.md', '.json']:
with open(guideline_file_path, 'r', encoding='utf-8') as f:
content = f.read()
if len(content) > 2000: # Limit content length
content = content[:2000] + "... [content truncated]"
guidelines_content += f"Content:\n{content}\n\n"
else:
guidelines_content += f"[File type {file_ext} - content not directly readable, but file is available as reference]\n\n"
except Exception as e:
guidelines_content += f"[Error reading file content: {str(e)}]\n\n"
else:
guidelines_content += "[File path not found]\n\n"
guidelines_content += "Please use these brand guidelines as reference when performing your analysis. Pay special attention to brand colors, fonts, logo usage, tone of voice, and any specific requirements mentioned in the guidelines.\n"
guidelines_content += "=== END BRAND GUIDELINES REFERENCE ===\n"
check_prompt += guidelines_content
else:
brand_guidelines_status = "INFO: Brand could not be determined. Performing generic analysis."
check_prompt += "\n\nGeneric Analysis: Since the brand could not be determined from the image, please analyze the visual content for general quality, professional appearance, and adherence to common design best practices."
# Add pre-analysis instructions if available
final_prompt = check_prompt
if profile_config.pre_analysis_instructions:
final_prompt = profile_config.pre_analysis_instructions + "\n\n" + check_prompt
# Add reference asset content if selected
reference_image_path = None
if reference_asset:
reference_content = get_reference_asset_content(reference_asset)
if reference_content:
final_prompt = reference_content + "\n\n" + final_prompt
print(f"Added reference asset {reference_asset} to {check_name} prompt")
# Also get the actual reference image path for LLM
reference_image_path = get_reference_asset_image_path(reference_asset)
print(f"Running check {check_index + 1}/{total_checks}: {check_name}")
result = run_visual_qc(
prompt=final_prompt,
asset_path=file_path,
reference_path=reference_image_path,
model_name=llm_model
)
# Extract score and data
json_data = extract_json_from_response(result['response'])
score = extract_score_from_result(result, profile_config, check_name)
weight = profile_weights.get(check_name, 0.1)
weighted_score = score * weight if score is not None else 0
return {
'check_name': check_name,
'status': 'completed',
'response': result['response'],
'brand_guidelines_status': brand_guidelines_status,
'requires_brand_guidelines': requires_brand_guidelines,
'json_data': json_data,
'score': score,
'weight': weight,
'weighted_score': weighted_score,
'model_used': result.get('model_info', {}),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
print(f"Error in check {check_name}: {str(e)}")
weight = profile_weights.get(check_name, 0.1)
return {
'check_name': check_name,
'status': 'error',
'error': str(e),
'weight': weight,
'score': 0,
'weighted_score': 0
}
def process_checks_in_batches_with_triage(enabled_checks, qc_apps, profile_config, profile_weights,
file_path, reference_asset, brand_db, progress_tracker,
session_id, batch_size=15, base_percentage=10, percentage_range=80):
"""Process QC checks in parallel batches with triage logic"""
check_results = {}
total_checks = len(enabled_checks)
completed_checks = 0
print(f"Processing {total_checks} checks in batches of {batch_size}")
# Split checks into batches
for batch_start in range(0, total_checks, batch_size):
batch_end = min(batch_start + batch_size, total_checks)
batch_checks = enabled_checks[batch_start:batch_end]
batch_number = (batch_start // batch_size) + 1
total_batches = (total_checks + batch_size - 1) // batch_size
print(f"Processing batch {batch_number}/{total_batches} ({len(batch_checks)} checks)")
# Update progress for batch start
progress_tracker[session_id].update({
'current_batch': batch_number,
'total_batches': total_batches,
'current_check': f"Batch {batch_number}",
'current_check_display': f"Processing batch {batch_number}/{total_batches}",
'percentage': base_percentage + ((completed_checks / total_checks) * percentage_range)
})
# Process batch in parallel
with ThreadPoolExecutor(max_workers=batch_size) as executor:
# Submit all checks in the batch
future_to_check = {}
for i, check_name in enumerate(batch_checks):
future = executor.submit(
process_single_check_with_triage,
check_name, qc_apps, profile_config, profile_weights, file_path,
reference_asset, brand_db, progress_tracker, session_id,
batch_start + i, total_checks
)
future_to_check[future] = check_name
# Collect results as they complete
batch_results = {}
for future in as_completed(future_to_check):
check_name = future_to_check[future]
try:
result = future.result()
batch_results[result['check_name']] = result
completed_checks += 1
# Update progress for each completed check
progress_tracker[session_id].update({
'completed_checks': completed_checks,
'percentage': base_percentage + ((completed_checks / total_checks) * percentage_range)
})
print(f"Completed check: {check_name} ({completed_checks}/{total_checks})")
except Exception as e:
print(f"Error getting result for check {check_name}: {str(e)}")
weight = profile_weights.get(check_name, 0.1)
batch_results[check_name] = {
'check_name': check_name,
'status': 'error',
'error': str(e),
'weight': weight,
'score': 0,
'weighted_score': 0
}
completed_checks += 1
# Add batch results to main results
check_results.update(batch_results)
print(f"Completed batch {batch_number}/{total_batches}")
# Small delay between batches to avoid overwhelming the APIs
if batch_number < total_batches:
time.sleep(0.5)
print(f"Completed all {total_checks} checks in {total_batches} batches")
return check_results
def create_thumbnail_base64(file_path, max_size=(300, 300)):
"""Create a base64 encoded thumbnail of the input file"""
try:
# Get the image using the existing utility
pil_image = get_image_from_asset(file_path)
if not pil_image:
return None
# Create thumbnail
thumbnail = pil_image.copy()
thumbnail.thumbnail(max_size, Image.Resampling.LANCZOS)
# Convert to base64
buffer = io.BytesIO()
# Convert to RGB if necessary (for PNG with transparency)
if thumbnail.mode in ('RGBA', 'LA'):
background = Image.new('RGB', thumbnail.size, (255, 255, 255))
background.paste(thumbnail, mask=thumbnail.split()[-1] if thumbnail.mode == 'RGBA' else None)
thumbnail = background
thumbnail.save(buffer, format='JPEG', quality=85)
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/jpeg;base64,{img_str}"
except Exception as e:
print(f"Error creating thumbnail: {e}")
return None
def save_results_to_file(report_data, filename, output_mode='html', session_id=None, file_path=None):
"""Save analysis results to file and return file path"""
print(f"DEBUG: save_results_to_file called with output_mode: '{output_mode}'")
if not session_id:
session_id = datetime.now().strftime('%Y%m%d_%H%M%S')
# Create filename base
base_filename = f"{session_id}_{filename.replace(' ', '_')}"
if output_mode == 'html':
print(f"DEBUG: Creating HTML file because output_mode == 'html'")
# Save HTML file
output_filename = f"{base_filename}_report.html"
output_path = os.path.join(app.config['OUTPUT_FOLDER'], output_filename)
html_content = generate_html_content(report_data, filename, file_path)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return output_path
else:
print(f"DEBUG: Creating JSON file because output_mode != 'html' (it's '{output_mode}')")
# Save JSON file
output_filename = f"{base_filename}_data.json"
output_path = os.path.join(app.config['OUTPUT_FOLDER'], output_filename)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
return output_path
def generate_html_content(report_data, filename, file_path=None):
"""Generate HTML content for report data with expandable sections"""
# Define a function to get color based on score
def get_score_result(score):
if score >= 6:
return "Pass", "#28a745" # Green for pass
else:
return "Fail", "#dc3545" # Red for fail
# Get reference asset information from profile selection
profile_selection = report_data.get('profile_selection', {})
reference_asset = profile_selection.get('reference_asset', None)
reference_asset_used = profile_selection.get('reference_asset_used', False)
# Build HTML for each check result with expandable sections
check_results_html = ""
for check_name, check_data in report_data['results'].items():
if check_data['status'] == 'success':
display_name = check_data.get('display_name', check_name)
score = check_data.get('score', 0)
result_text, score_color = get_score_result(score)
weight = check_data.get('weight', 0)
weighted_score = check_data.get('weighted_score', 0)
# Extract response text (strip JSON blocks for cleaner display)
response_text = check_data['response']
# Remove JSON code blocks for cleaner reading
response_text = re.sub(r'```json.*?```', '', response_text, flags=re.DOTALL)
response_text = response_text.strip()
# If response is empty after removing JSON, provide a fallback explanation
if not response_text:
score = check_data.get('score', 0)
result_text, _ = get_score_result(score)
if score == 0 and check_name in REFERENCE_ASSET_REQUIRED_CHECKS:
response_text = f"Reference asset is required for the '{display_name}' QC check but was not provided."
else:
response_text = f"QC check result: {result_text} (Score: {score}/10)"
# Create expandable section for each check
check_results_html += f"""
<div class="expandable-section">
<div class="expandable-header" onclick="toggleSection('{check_name}')">
<div class="check-title">
<h3>{display_name}</h3>
<span class="score-badge" style="background-color: {score_color};">{result_text}</span>
</div>
<div class="chevron" id="chevron-{check_name}">▼</div>
</div>
<div class="expandable-content" id="content-{check_name}">
<div class="check-metadata">
<p><strong>Score:</strong> {score}/10 | <strong>Weight:</strong> {weight:.1%} | <strong>Weighted Score:</strong> {weighted_score:.2f}</p>
{'<p><strong>⭐ Bonus Check:</strong> If missing required element, this scores 0</p>' if check_name in ['face_gaze_direction', 'face_visibility', 'new_visibility'] else ''}
<p><strong>Reference Asset:</strong> {'✅ Used' if check_name in REFERENCE_ASSET_REQUIRED_CHECKS and reference_asset_used else ('🚨 Required but missing' if check_name in REFERENCE_ASSET_REQUIRED_CHECKS else ' Not required')}</p>
{f'<p><strong>Reference Asset Details:</strong> {reference_asset}</p>' if check_name in REFERENCE_ASSET_REQUIRED_CHECKS and reference_asset_used and reference_asset else ''}
</div>
<div class="analysis-section">
<h4>Analysis Details:</h4>
<div class="response-text">{response_text.replace(chr(10), '<br>')}</div>
</div>
</div>
</div>
"""
# Get summary score result
overall_score = report_data['summary']['overall_score']
overall_result, overall_color = get_score_result(overall_score/10) # Normalize to 0-10 scale
# Determine the correct total score based on profile
profile_id = report_data.get('profile_id', '')
if profile_id == 'unilever_key_visual':
score_total = 120
else:
score_total = 100
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Visual AI QC Results for {filename}</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; margin: 20px; padding: 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; }}
.container {{ max-width: 1200px; margin: 0 auto; background-color: white; padding: 30px; border-radius: 20px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); }}
h1, h2, h3 {{ color: #2c3e50; }}
h1 {{ margin-bottom: 10px; font-size: 2.5em; text-align: center; }}
.file-preview {{ background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); padding: 20px; border-radius: 15px; margin: 20px 0; border-left: 5px solid #28a745; }}
.file-info {{ display: flex; align-items: center; gap: 20px; }}
.thumbnail {{ max-width: 150px; max-height: 150px; border-radius: 8px; box-shadow: 0 4px 8px rgba(0,0,0,0.1); border: 2px solid #dee2e6; }}
.file-details {{ flex: 1; }}
.filename {{ font-size: 1.2em; font-weight: bold; color: #495057; margin-bottom: 5px; }}
.file-meta {{ color: #6c757d; font-size: 0.9em; }}
.summary {{ background: linear-gradient(135deg, #e3f2fd 0%, #f3e5f5 100%); padding: 25px; border-radius: 15px; margin: 30px 0; border-left: 5px solid #667eea; }}
.summary-grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; margin-top: 15px; }}
.summary-item {{ background: white; padding: 15px; border-radius: 10px; text-align: center; }}
.score-display {{ font-size: 2.5em; font-weight: bold; color: {overall_color}; margin-bottom: 5px; }}
.grade {{ font-size: 1.3em; font-weight: bold; color: #495057; }}
.expandable-section {{ margin-bottom: 15px; border: 2px solid #e9ecef; border-radius: 12px; overflow: hidden; background: white; }}
.expandable-header {{ background: #f8f9fa; padding: 20px; cursor: pointer; display: flex; justify-content: space-between; align-items: center; transition: background-color 0.3s ease; }}
.expandable-header:hover {{ background: #e9ecef; }}
.check-title {{ display: flex; align-items: center; gap: 15px; }}
.check-title h3 {{ margin: 0; color: #495057; }}
.score-badge {{ padding: 8px 12px; border-radius: 20px; color: white; font-weight: bold; font-size: 0.9em; }}
.chevron {{ font-size: 1.2em; transition: transform 0.3s ease; color: #667eea; }}
.chevron.expanded {{ transform: rotate(180deg); }}
.expandable-content {{ padding: 0; max-height: 0; overflow: hidden; transition: all 0.3s ease; }}
.expandable-content.expanded {{ padding: 20px; max-height: 1000px; }}
.check-metadata {{ background: #f8f9fa; padding: 15px; border-radius: 8px; margin-bottom: 15px; }}
.analysis-section h4 {{ color: #495057; margin-bottom: 10px; }}
.response-text {{ background: #f8f9fa; padding: 15px; border-radius: 8px; line-height: 1.6; font-family: Georgia, serif; }}
.json-toggle {{ cursor: pointer; color: #667eea; text-decoration: underline; padding: 15px; text-align: center; font-weight: bold; }}
.json-view {{ display: none; margin-top: 20px; }}
.json-view pre {{ background-color: #2d3748; color: #e2e8f0; padding: 20px; border-radius: 10px; overflow-x: auto; font-size: 0.9em; }}
</style>
</head>
<body>
<div class="container">
<h1>🤖 Visual AI QC Results</h1>
<p style="text-align: center; color: #6c757d; font-size: 1.1em;">
Analysis completed on: {report_data['timestamp']}
</p>
<div class="file-preview">
<h3>📎 Analyzed File</h3>
<div class="file-info">
<img src="{create_thumbnail_base64(file_path) if file_path else 'data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMTUwIiBoZWlnaHQ9IjE1MCIgdmlld0JveD0iMCAwIDE1MCAxNTAiIGZpbGw9Im5vbmUiIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyI+CjxyZWN0IHdpZHRoPSIxNTAiIGhlaWdodD0iMTUwIiBmaWxsPSIjZjhmOWZhIiBzdHJva2U9IiNkZWUyZTYiIHN0cm9rZS13aWR0aD0iMiIvPgo8dGV4dCB4PSI3NSIgeT0iNzUiIHRleHQtYW5jaG9yPSJtaWRkbGUiIGZpbGw9IiM2Yzc1N2QiIGZvbnQtZmFtaWx5PSJBcmlhbCIgZm9udC1zaXplPSIxNCIgZHk9IjAuMzVlbSI+4p2MIEZpbGU8L3RleHQ+Cjx0ZXh0IHg9Ijc1IiB5PSI5NSIgdGV4dC1hbmNob3I9Im1pZGRsZSIgZmlsbD0iIzZjNzU3ZCIgZm9udC1mYW1pbHk9IkFyaWFsIiBmb250LXNpemU9IjEyIiBkeT0iMC4zNWVtIj5UaHVtYm5haWw8L3RleHQ+Cjwvc3ZnPg=='}" alt="File thumbnail" class="thumbnail" id="fileThumbnail">
<div class="file-details">
<div class="filename">{filename}</div>
<div class="file-meta">Original file processed for quality control analysis</div>
</div>
</div>
</div>
<div class="summary">
<h2>📊 Analysis Summary</h2>
<div class="summary-grid">
<div class="summary-item">
<div class="score-display">{overall_score}/{score_total}</div>
<div>Overall Score</div>
</div>
<div class="summary-item">
<div class="grade">{report_data['summary']['grade']}</div>
<div>Grade</div>
</div>
<div class="summary-item">
<div style="font-size: 1.5em; font-weight: bold; color: #495057;">{report_data['summary']['checks_count']}</div>
<div>Checks Performed</div>
</div>
<div class="summary-item">
<div style="font-size: 1.2em; color: #667eea; font-weight: bold;">{report_data['profile_name']}</div>
<div>Profile Used</div>
</div>
<div class="summary-item">
<div style="font-size: 1.2em; color: {'#28a745' if reference_asset_used else '#6c757d'}; font-weight: bold;">{'✅ Used' if reference_asset_used else ' None'}</div>
<div>Reference Asset</div>
</div>
</div>
</div>
<h2>🔍 Detailed Analysis Results</h2>
<p style="color: #6c757d; margin-bottom: 20px; font-style: italic;">
Click on any section below to expand and view detailed analysis
</p>
{check_results_html}
<div class="json-toggle" onclick="document.getElementById('json-data').style.display = document.getElementById('json-data').style.display === 'none' ? 'block' : 'none';">
📄 Show/Hide Raw JSON Data
</div>
<div id="json-data" class="json-view">
<pre>{json.dumps(report_data, indent=2)}</pre>
</div>
</div>
<script>
function toggleSection(checkName) {{
const content = document.getElementById('content-' + checkName);
const chevron = document.getElementById('chevron-' + checkName);
if (content.classList.contains('expanded')) {{
content.classList.remove('expanded');
chevron.classList.remove('expanded');
}} else {{
content.classList.add('expanded');
chevron.classList.add('expanded');
}}
}}
</script>
</body>
</html>
"""
return html_content
def generate_html_response(report_data, filename, save_to_file=False, session_id=None, file_path=None):
"""Generate HTML response for report data with optional file saving"""
html_content = generate_html_content(report_data, filename, file_path)
if save_to_file:
# Save to file and return file path info
output_path = save_results_to_file(report_data, filename, 'html', session_id, file_path)
return Response(html_content, mimetype='text/html'), output_path
else:
return Response(html_content, mimetype='text/html')
def generate_comprehensive_html_report(analysis_result, filename, file_path=None):
"""Generate comprehensive HTML report similar to the web UI format"""
summary = analysis_result.get('summary', {})
qc_analysis = analysis_result.get('qc_analysis', {})
profile_selection = analysis_result.get('profile_selection', {})
check_results = qc_analysis.get('check_results', {})
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
overall_score = summary.get('overall_score', 0)
profile_name = profile_selection.get('suggested_profile', 'Unknown Profile')
total_checks = qc_analysis.get('total_checks', 0)
completed_checks = qc_analysis.get('completed_checks', 0)
reference_asset = profile_selection.get('reference_asset', None)
reference_asset_used = profile_selection.get('reference_asset_used', False)
# Generate check results HTML
check_results_html = ''
for check_name, result in check_results.items():
if result.get('status') == 'completed':
score = result.get('score', 0)
result_text = "Pass" if score >= 6 else "Fail"
score_color = '#28a745' if score >= 6 else '#dc3545'
response = result.get('response', 'No response available')
display_name = check_name.replace('_', ' ').replace(chr(32).join([w.capitalize() for w in check_name.split('_')]), check_name.replace('_', ' ').title())
# Remove JSON blocks for cleaner display and handle empty responses
response = re.sub(r'```json.*?```', '', response, flags=re.DOTALL).strip()
if not response:
if score == 0 and check_name in REFERENCE_ASSET_REQUIRED_CHECKS:
response = f"Reference asset is required for the '{display_name}' QC check but was not provided."
else:
response = f"QC check result: {result_text} (Score: {score}/10)"
check_results_html += f'''
<div class="expandable-section">
<div class="expandable-header" onclick="toggleSection('{check_name}')">
<div class="check-title">
<h3>{display_name}</h3>
<span class="score-badge" style="background-color: {score_color};">{result_text}</span>
</div>
<div class="chevron" id="chevron-{check_name}">▼</div>
</div>
<div class="expandable-content" id="content-{check_name}">
<div class="check-metadata">
<p><strong>Score:</strong> {score}/10 | <strong>Weight:</strong> {result.get('weight', 0):.1%} | <strong>Weighted Score:</strong> {result.get('weighted_score', 0):.2f}</p>
{'<p><strong>⭐ Bonus Check:</strong> If missing required element, this scores 0</p>' if check_name in ['face_gaze_direction', 'face_visibility', 'new_visibility'] else ''}
<p><strong>Reference Asset:</strong> {'✅ Used' if check_name in REFERENCE_ASSET_REQUIRED_CHECKS and reference_asset_used else ('🚨 Required but missing' if check_name in REFERENCE_ASSET_REQUIRED_CHECKS else ' Not required')}</p>
{f'<p><strong>Reference Asset Details:</strong> {reference_asset}</p>' if check_name in REFERENCE_ASSET_REQUIRED_CHECKS and reference_asset_used and reference_asset else ''}
</div>
<div class="analysis-section">
<h4>Analysis Details:</h4>
<div class="response-text">{response.replace(chr(10), '<br>')}</div>
</div>
</div>
</div>
'''
# Convert overall score to pass/fail based on average of individual check scores
avg_individual_score = overall_score / 10 # Normalize to 1-10 scale
grade_text = 'Pass' if avg_individual_score >= 6 else 'Fail'
score_color = '#28a745' if avg_individual_score >= 6 else '#dc3545'
return f'''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Visual AI QC Results for {filename}</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; margin: 20px; padding: 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; }}
.container {{ max-width: 1200px; margin: 0 auto; background-color: white; padding: 30px; border-radius: 20px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); }}
h1, h2, h3 {{ color: #2c3e50; }}
h1 {{ margin-bottom: 10px; font-size: 2.5em; text-align: center; }}
.file-preview {{ background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); padding: 20px; border-radius: 15px; margin: 20px 0; border-left: 5px solid #28a745; }}
.file-info {{ display: flex; align-items: center; gap: 20px; }}
.thumbnail {{ max-width: 150px; max-height: 150px; border-radius: 8px; box-shadow: 0 4px 8px rgba(0,0,0,0.1); border: 2px solid #dee2e6; }}
.file-details {{ flex: 1; }}
.filename {{ font-size: 1.2em; font-weight: bold; color: #495057; margin-bottom: 5px; }}
.file-meta {{ color: #6c757d; font-size: 0.9em; }}
.summary {{ background: linear-gradient(135deg, #e3f2fd 0%, #f3e5f5 100%); padding: 25px; border-radius: 15px; margin: 30px 0; border-left: 5px solid #667eea; }}
.summary-grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; margin-top: 15px; }}
.summary-item {{ background: white; padding: 15px; border-radius: 10px; text-align: center; }}
.score-display {{ font-size: 2.5em; font-weight: bold; color: {score_color}; margin-bottom: 5px; }}
.grade {{ font-size: 1.3em; font-weight: bold; color: #495057; }}
.expandable-section {{ margin-bottom: 15px; border: 2px solid #e9ecef; border-radius: 12px; overflow: hidden; background: white; }}
.expandable-header {{ background: #f8f9fa; padding: 20px; cursor: pointer; display: flex; justify-content: space-between; align-items: center; transition: background-color 0.3s ease; }}
.expandable-header:hover {{ background: #e9ecef; }}
.check-title {{ display: flex; align-items: center; gap: 15px; }}
.check-title h3 {{ margin: 0; color: #495057; }}
.score-badge {{ padding: 8px 12px; border-radius: 20px; color: white; font-weight: bold; font-size: 0.9em; }}
.chevron {{ font-size: 1.2em; transition: transform 0.3s ease; color: #667eea; }}
.chevron.expanded {{ transform: rotate(180deg); }}
.expandable-content {{ padding: 0; max-height: 0; overflow: hidden; transition: all 0.3s ease; }}
.expandable-content.expanded {{ padding: 20px; max-height: 1000px; }}
.check-metadata {{ background: #f8f9fa; padding: 15px; border-radius: 8px; margin-bottom: 15px; }}
.analysis-section h4 {{ color: #495057; margin-bottom: 10px; }}
.response-text {{ background: #f8f9fa; padding: 15px; border-radius: 8px; line-height: 1.6; font-family: Georgia, serif; }}
</style>
</head>
<body>
<div class="container">
<h1>🤖 Visual AI QC Results</h1>
<p style="text-align: center; color: #6c757d; font-size: 1.1em;">
Analysis completed on: {timestamp}
</p>
<div class="file-preview">
<h3>📎 Analyzed File</h3>
<div class="file-info">
<img src="{create_thumbnail_base64(file_path) if file_path else 'data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMTUwIiBoZWlnaHQ9IjE1MCIgdmlld0JveD0iMCAwIDE1MCAxNTAiIGZpbGw9Im5vbmUiIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyI+CjxyZWN0IHdpZHRoPSIxNTAiIGhlaWdodD0iMTUwIiBmaWxsPSIjZjhmOWZhIiBzdHJva2U9IiNkZWUyZTYiIHN0cm9rZS13aWR0aD0iMiIvPgo8dGV4dCB4PSI3NSIgeT0iNzUiIHRleHQtYW5jaG9yPSJtaWRkbGUiIGZpbGw9IiM2Yzc1N2QiIGZvbnQtZmFtaWx5PSJBcmlhbCIgZm9udC1zaXplPSIxNCIgZHk9IjAuMzVlbSI+4p2MIEZpbGU8L3RleHQ+Cjx0ZXh0IHg9Ijc1IiB5PSI5NSIgdGV4dC1hbmNob3I9Im1pZGRsZSIgZmlsbD0iIzZjNzU3ZCIgZm9udC1mYW1pbHk9IkFyaWFsIiBmb250LXNpemU9IjEyIiBkeT0iMC4zNWVtIj5UaHVtYm5haWw8L3RleHQ+Cjwvc3ZnPg=='}" alt="File thumbnail" class="thumbnail" id="fileThumbnail">
<div class="file-details">
<div class="filename">{filename}</div>
<div class="file-meta">Original file processed for quality control analysis</div>
</div>
</div>
</div>
<div class="summary">
<h2>📊 Analysis Summary</h2>
<div class="summary-grid">
<div class="summary-item">
<div class="score-display">{overall_score}/100</div>
<div>Overall Score</div>
</div>
<div class="summary-item">
<div class="grade">{grade_text}</div>
<div>Grade</div>
</div>
<div class="summary-item">
<div style="font-size: 1.5em; font-weight: bold; color: #495057;">{completed_checks}</div>
<div>Checks Performed</div>
</div>
<div class="summary-item">
<div style="font-size: 1.2em; color: #667eea; font-weight: bold;">{profile_name}</div>
<div>Profile Used</div>
</div>
<div class="summary-item">
<div style="font-size: 1.2em; color: {'#28a745' if reference_asset_used else '#6c757d'}; font-weight: bold;">{'✅ Used' if reference_asset_used else ' None'}</div>
<div>Reference Asset</div>
</div>
</div>
</div>
<h2>🔍 Detailed Analysis Results</h2>
<p style="color: #6c757d; margin-bottom: 20px; font-style: italic;">
Click on any section below to expand and view detailed analysis
</p>
{check_results_html}
</div>
<script>
function toggleSection(checkName) {{
const content = document.getElementById('content-' + checkName);
const chevron = document.getElementById('chevron-' + checkName);
if (content.classList.contains('expanded')) {{
content.classList.remove('expanded');
chevron.classList.remove('expanded');
}} else {{
content.classList.add('expanded');
chevron.classList.add('expanded');
}}
}}
</script>
</body>
</html>'''
def get_reference_image_path(check_name):
"""Find a matching reference image - deprecated function, returns None"""
# This function is deprecated since numbered criteria images are no longer used
# Reference assets are now handled through the brand guidelines system
return None
def get_reference_asset_image_path(reference_asset_id):
"""
Get the actual file path for a reference asset image to send to LLM.
Args:
reference_asset_id: ID of the reference asset to retrieve
Returns:
File path to the reference image, or None if not found or not an image
"""
if not reference_asset_id or not reference_asset_id.strip():
return None
try:
# Get the reference asset file information from brand guidelines DB
file_record = brand_db.db["files"].get(reference_asset_id)
if not file_record:
print(f"DEBUG: Reference asset not found: {reference_asset_id}")
return None
file_path = file_record.get("stored_path", "")
if not file_path or not os.path.exists(file_path):
print(f"DEBUG: Reference asset file not found at path: {file_path}")
return None
# Check if it's an image file
image_extensions = ['.png', '.jpg', '.jpeg', '.bmp', '.webp', '.gif', '.tiff']
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in image_extensions:
print(f"DEBUG: Reference asset is not an image: {file_path}")
return None
print(f"DEBUG: Found reference image at: {file_path}")
return file_path
except Exception as e:
print(f"DEBUG: Error getting reference asset image path: {e}")
return None
def get_reference_asset_content(reference_asset_id):
"""
Retrieve and format reference asset content for use in QC prompts.
Args:
reference_asset_id: ID of the reference asset to retrieve
Returns:
Formatted string with reference asset information, or empty string if not found
"""
if not reference_asset_id or not reference_asset_id.strip():
return ""
try:
# Get the reference asset file information from brand guidelines DB
file_record = brand_db.db["files"].get(reference_asset_id)
if not file_record:
print(f"Reference asset not found: {reference_asset_id}")
return ""
brand_name = file_record["brand_name"]
description = file_record.get("description", "")
file_path = file_record.get("stored_path", "")
original_filename = file_record.get("original_filename", "")
# Build reference asset context for the prompt
reference_content = "\n\n=== REFERENCE ASSET GUIDELINES ===\n"
reference_content += f"Brand: {brand_name}\n"
reference_content += f"Reference File: {original_filename}\n"
if description:
reference_content += f"Description: {description}\n"
reference_content += "\nPlease use this reference asset as your guideline for analysis. "
reference_content += "Compare the uploaded image against these brand standards and requirements. "
reference_content += "Pay special attention to brand consistency, visual standards, and any specific "
reference_content += "requirements shown in the reference material.\n"
# Try to read text-based reference files if possible
if file_path and os.path.exists(file_path):
try:
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in ['.txt', '.md', '.json']:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if len(content) > 3000: # Limit content length for prompts
content = content[:3000] + "... [content truncated]"
reference_content += f"\nReference Content:\n{content}\n"
else:
reference_content += f"\nReference file ({file_ext}) is available for visual comparison.\n"
except Exception as e:
print(f"Error reading reference asset content: {e}")
reference_content += "\n[Reference file available but content could not be read]\n"
reference_content += "=== END REFERENCE ASSET GUIDELINES ===\n"
return reference_content
except Exception as e:
print(f"Error retrieving reference asset {reference_asset_id}: {e}")
return ""
def load_qc_apps():
"""Load all QC apps and their prompts"""
for check_name in QC_CHECKS:
try:
# Import the app module
try:
module_path = f"visual_qc_apps.{check_name}.app"
module = importlib.import_module(module_path)
# Get app class name
app_class_name = None
for key in dir(module):
if key.endswith('App') and key != 'FlaskAppTemplate':
app_class_name = key
break
except ImportError as e:
print(f"Import error for {check_name}: {e}")
continue
if app_class_name:
# Get the app class
app_class = getattr(module, app_class_name)
# Create an instance to get the prompt
app_instance = app_class()
# Find reference image for this check
reference_image = get_reference_image_path(check_name)
# Store the prompt, app class, and reference image
qc_apps[check_name] = {
"name": app_class_name,
"prompt": app_instance.prompt,
"instance": app_instance,
"display_name": app_class_name.replace("App", "").replace("_", " "),
"reference_image": reference_image
}
print(f"Loaded QC check: {check_name}" + (f" with reference image" if reference_image else ""))
else:
print(f"No app class found in {module_path}")
except Exception as e:
print(f"Error loading {check_name} app: {e}")
traceback.print_exc()
@app.route('/', methods=['GET'])
def serve_web_ui():
"""Serve the web UI"""
try:
# Use absolute path to web_ui.html (located in parent directory)
base_dir = os.path.dirname(os.path.abspath(__file__))
web_ui_path = os.path.join(os.path.dirname(base_dir), 'web_ui.html')
with open(web_ui_path, 'r') as f:
html_content = f.read()
return Response(html_content, mimetype='text/html')
except FileNotFoundError:
return jsonify({'error': 'Web UI not found'}), 404
@app.route('/health', methods=['GET'])
def health_check():
"""Simple health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat()
})
@app.route('/api/health/folders', methods=['GET'])
def health_check_folders():
"""Check if required folders exist and are writable"""
try:
upload_folder = app.config.get('UPLOAD_FOLDER', 'uploads')
output_folder = app.config.get('OUTPUT_FOLDER', 'output')
# Test if folders exist
upload_exists = os.path.exists(upload_folder)
output_exists = os.path.exists(output_folder)
# Test if we can create directories (if they don't exist)
upload_writable = False
output_writable = False
try:
os.makedirs(upload_folder, exist_ok=True)
upload_writable = True
except Exception as e:
upload_error = str(e)
try:
os.makedirs(output_folder, exist_ok=True)
output_writable = True
except Exception as e:
output_error = str(e)
# Test writing a file
test_file_path = os.path.join(upload_folder, 'test_write.tmp')
can_write_files = False
try:
with open(test_file_path, 'w') as f:
f.write('test')
os.remove(test_file_path)
can_write_files = True
except Exception as e:
write_error = str(e)
return jsonify({
'status': 'success',
'folders': {
'upload_folder': upload_folder,
'upload_exists': upload_exists,
'upload_writable': upload_writable,
'output_folder': output_folder,
'output_exists': output_exists,
'output_writable': output_writable,
'can_write_files': can_write_files
},
'errors': {
'upload_error': locals().get('upload_error'),
'output_error': locals().get('output_error'),
'write_error': locals().get('write_error')
}
})
except Exception as e:
import traceback
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}), 500
@app.route('/api/progress/<session_id>', methods=['GET'])
def get_progress(session_id):
"""Get current progress for a session"""
if session_id not in progress_tracker:
return jsonify({'status': 'error', 'message': 'Session not found'}), 404
return jsonify({
'status': 'success',
'progress': progress_tracker[session_id]
})
@app.route('/api/start_analysis', methods=['POST'])
@auth.require_auth
def start_analysis():
"""Start analysis and return session ID immediately"""
import threading
try:
# Check if file is in request
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file part'}), 400
file = request.files['file']
# Check if file was selected
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No selected file'}), 400
# Get parameters
profile = request.form.get('profile', 'general').lower()
brand = request.form.get('brand', 'general').lower()
output_mode = request.form.get('mode', 'json').lower()
model = request.form.get('model', 'profile')
reference_asset = request.form.get('reference_asset', '')
# Use profile if provided, otherwise fall back to brand
if profile and profile != 'general':
brand = profile.split('_')[0] if '_' in profile else profile
print(f"Starting analysis with profile: {profile}, brand: {brand}, mode: {output_mode}")
print(f"DEBUG: Raw mode parameter from request: '{request.form.get('mode')}'")
print(f"DEBUG: Processed output_mode: '{output_mode}'")
# Create unique session ID and save file
session_id = datetime.now().strftime('%Y%m%d_%H%M%S')
session_folder = os.path.join(app.config['UPLOAD_FOLDER'], session_id)
os.makedirs(session_folder, exist_ok=True)
file_path = os.path.join(session_folder, file.filename)
file.save(file_path)
# Initialize progress tracking with estimated total checks
# We'll update this with the actual number once we determine the profile
estimated_checks = 25 # Reasonable estimate for most profiles
progress_tracker[session_id] = {
'total_checks': estimated_checks,
'completed_checks': 0,
'current_check': 'Initializing',
'current_check_display': 'Initializing Analysis',
'stage': 'setup',
'percentage': 0,
'session_id': session_id,
'status': 'started'
}
# Start analysis in background thread with explicit parameters
def run_analysis(session_id, file_path, filename, brand, profile, output_mode, reference_asset):
print(f"Background thread started for session: {session_id}")
print(f"Parameters: brand={brand}, profile={profile}, mode={output_mode}")
try:
# Force reload QC apps to ensure they're available
if not qc_apps:
load_qc_apps()
# Use the explicitly passed parameters
analysis_brand = brand
analysis_profile = profile
analysis_mode = output_mode
analysis_reference_asset = reference_asset
print(f"DEBUG: analysis_mode = '{analysis_mode}'")
# Write debug info to file for easier debugging
with open('debug_mode.txt', 'a') as f:
f.write(f"Session {session_id}: analysis_mode = '{analysis_mode}'\n")
# Validate brand
if not analysis_brand or analysis_brand.strip() == '':
analysis_brand = 'general'
# Validate output mode
print(f"DEBUG: Before validation, analysis_mode = '{analysis_mode}'")
if analysis_mode not in ['json', 'html']:
print(f"DEBUG: analysis_mode '{analysis_mode}' not valid, defaulting to 'json'")
analysis_mode = 'json'
else:
print(f"DEBUG: analysis_mode '{analysis_mode}' is valid")
print(f"DEBUG: After validation, analysis_mode = '{analysis_mode}'")
# Use the directly specified profile (no triage needed)
suggested_profile = analysis_profile if analysis_profile and analysis_profile.strip() else 'general'
print(f"Using specified profile: {suggested_profile}")
# Update progress to show starting QC analysis
progress_tracker[session_id].update({
'stage': 'qc_analysis',
'current_check': 'initializing',
'current_check_display': 'Preparing Quality Analysis',
'completed_checks': 0,
'percentage': 5
})
# STEP 1: Run Quality Control Analysis
print(f"Step 1: Running QC analysis with profile '{suggested_profile}'")
# Get the profile configuration
profile_config = get_profile(suggested_profile)
if not profile_config:
raise Exception(f'Profile {suggested_profile} not found')
# Get enabled checks from profile
enabled_checks = profile_config.get_enabled_checks()
profile_weights = profile_config.get_check_weights()
# Filter to only include checks that exist in qc_apps
enabled_checks = [check for check in enabled_checks if check in qc_apps]
if not enabled_checks:
raise Exception(f'No enabled checks found for profile {suggested_profile}')
# Update progress tracker with total checks
progress_tracker[session_id].update({
'total_checks': len(enabled_checks),
'stage': 'qc_analysis',
'percentage': 10
})
# Run QC checks in parallel batches
check_results = process_checks_in_batches(
enabled_checks, qc_apps, profile_config, profile_weights,
file_path, analysis_reference_asset, brand_db, progress_tracker,
session_id, batch_size=15
)
# STEP 4: Calculate Overall Score
print(f"Step 4: Calculating overall score")
total_weighted_score = 0
total_weight = 0
completed_checks = 0
failed_checks = 0
for check_name, result in check_results.items():
weight = result.get('weight', 0.1)
total_weight += weight
if result['status'] == 'success':
completed_checks += 1
score = result.get('score')
if score is not None:
total_weighted_score += score * weight
else:
failed_checks += 1
# Calculate overall score
# For profiles with total_weight = 10.0 (like General Check), use direct weighted score
# For profiles with total_weight = 1.0, multiply by 10 to scale to 100
if total_weight >= 10.0:
overall_score = total_weighted_score # Already scaled correctly
else:
overall_score = (total_weighted_score * 10) # Scale to 100-point system
# STEP 5: Prepare Combined Response
print(f"Step 5: Preparing response")
# Create comprehensive response with all data
result_data = {
'status': 'success',
'session_id': session_id,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'filename': file.filename,
'profile': suggested_profile,
'profile_id': suggested_profile,
'profile_name': profile_config.name,
'model': 'Profile-based selection',
'results': check_results,
'triage_analysis': {
'status': 'skipped',
'results': {'primary_format': 'unknown', 'specific_type': 'user_selected', 'confidence_score': 10, 'recommended_qc_profile': suggested_profile},
'raw_response': 'Triage skipped - using user-selected profile directly'
},
'profile_selection': {
'selected_profile': suggested_profile,
'profile_source': 'user_selected',
'brand': analysis_brand,
'format_suffix': suggested_profile,
'reference_asset': analysis_reference_asset if analysis_reference_asset else None,
'reference_asset_used': bool(analysis_reference_asset)
},
'qc_analysis': {
'profile_used': suggested_profile,
'total_checks': len(enabled_checks),
'completed_checks': completed_checks,
'failed_checks': failed_checks,
'check_results': check_results
},
'summary': {
'overall_score': round(overall_score, 1),
'profile': profile_config.name,
'checks_count': completed_checks,
'total_checks': len(enabled_checks),
'total_weighted_score': total_weighted_score,
'total_weight': total_weight,
'grade': determine_grade(overall_score)
}
}
print(f"Analysis completed successfully")
# Save results to file
try:
print(f"DEBUG: Saving file with mode: '{analysis_mode}'")
output_path = save_results_to_file(result_data, file.filename, analysis_mode, session_id, file_path)
result_data['output_file'] = {
'path': output_path,
'filename': os.path.basename(output_path),
'url': f'{request.environ.get("SCRIPT_NAME", "")}/output/{os.path.basename(output_path)}'
}
print(f"Results saved to: {output_path}")
except Exception as save_error:
print(f"Error saving results to file: {save_error}")
# Store result in progress tracker
print(f"Analysis result status: {result_data.get('status')}")
if session_id in progress_tracker:
progress_tracker[session_id]['result'] = result_data
progress_tracker[session_id]['status'] = 'completed'
progress_tracker[session_id]['stage'] = 'complete'
progress_tracker[session_id]['percentage'] = 100
print(f"Results stored in progress tracker for session: {session_id}")
else:
print(f"ERROR: Session {session_id} not found in progress tracker!")
except Exception as e:
print(f"ERROR in background thread for session {session_id}: {str(e)}")
print(f"Exception type: {type(e)}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
if session_id in progress_tracker:
progress_tracker[session_id]['status'] = 'error'
progress_tracker[session_id]['stage'] = 'error'
progress_tracker[session_id]['error'] = str(e)
progress_tracker[session_id]['current_check'] = 'Error'
progress_tracker[session_id]['current_check_display'] = 'Analysis Failed'
# Start background thread with explicit parameters
threading.Thread(
target=run_analysis,
args=(session_id, file_path, file.filename, brand, profile, output_mode, reference_asset),
daemon=True
).start()
# Return session ID immediately
return jsonify({
'status': 'success',
'session_id': session_id,
'message': 'Analysis started'
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/output/<filename>', methods=['GET'])
def serve_output_file(filename):
"""Serve saved output files"""
try:
file_path = os.path.join(app.config['OUTPUT_FOLDER'], filename)
if os.path.exists(file_path):
if filename.endswith('.html'):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return Response(content, mimetype='text/html')
elif filename.endswith('.json'):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return Response(content, mimetype='application/json')
else:
return Response(open(file_path, 'rb').read(), mimetype='application/octet-stream')
else:
return jsonify({'error': 'File not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/output_files', methods=['GET'])
def list_output_files():
"""List all saved output files sorted by creation date (newest first)"""
try:
files = []
if os.path.exists(app.config['OUTPUT_FOLDER']):
for filename in os.listdir(app.config['OUTPUT_FOLDER']):
if filename.endswith(('.html', '.json')):
file_path = os.path.join(app.config['OUTPUT_FOLDER'], filename)
file_stats = os.stat(file_path)
files.append({
'filename': filename,
'size': file_stats.st_size,
'created': datetime.fromtimestamp(file_stats.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
'created_timestamp': file_stats.st_ctime, # Add timestamp for sorting
'url': f'{request.environ.get("SCRIPT_NAME", "")}/output/{filename}'
})
# Sort files by creation time (newest first)
files.sort(key=lambda x: x['created_timestamp'], reverse=True)
# Remove the timestamp field from response (not needed by frontend)
for file in files:
del file['created_timestamp']
return jsonify({'files': files})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/profiles', methods=['GET'])
def get_available_profiles():
"""Get all available profiles grouped by type"""
from profile_config import get_profile_summary
# Force reload profiles to ensure they're up to date
from profile_config import load_profiles
load_profiles()
profiles_summary = get_profile_summary()
# Group profiles by type
triage_profiles = {}
format_profiles = {}
for profile_id, profile_info in profiles_summary.items():
if '_triage' in profile_id:
brand = profile_id.replace('_triage', '')
triage_profiles[brand] = {
'id': profile_id,
'name': profile_info['name'],
'description': profile_info['description']
}
elif any(suffix in profile_id for suffix in ['_print', '_digital', '_ooh', '_packaging', '_event', '_indoor']):
format_profiles[profile_id] = {
'id': profile_id,
'name': profile_info['name'],
'description': profile_info['description'],
'enabled_checks': profile_info['enabled_checks'],
'total_checks': profile_info['total_checks']
}
return jsonify({
'status': 'success',
'triage_profiles': triage_profiles,
'format_profiles': format_profiles,
'all_profiles': profiles_summary
})
@app.route('/api/run_check', methods=['POST'])
def run_check():
"""API endpoint to run a single QC check"""
data = request.json
# Validate inputs
if not data or 'check_name' not in data or 'file_path' not in data:
return jsonify({'status': 'error', 'message': 'Missing required parameters'}), 400
check_name = data['check_name']
file_path = data['file_path']
model_name = data.get('model_name', 'Gemini') # Default to Gemini if not specified
output_mode = data.get('mode', 'json').lower() # Get output mode, default to JSON
# Validate check exists
if check_name not in qc_apps:
return jsonify({'status': 'error', 'message': f'QC check "{check_name}" not found'}), 404
# Validate file exists
if not os.path.exists(file_path):
return jsonify({'status': 'error', 'message': 'File not found'}), 404
try:
# Get the prompt
prompt = qc_apps[check_name]['prompt']
# Get reference image if available
reference_path = qc_apps[check_name].get('reference_image')
# Run the QC check with reference image if available
result = run_visual_qc(
prompt=prompt,
asset_path=file_path,
reference_path=reference_path,
model_name=model_name
)
# Extract score from result if possible
score = None
try:
# Use our extraction function to get score from JSON blocks
json_data = extract_json_from_response(result['response'])
if 'score' in json_data:
score = json_data.get('score')
print(f"Extracted score from JSON block: {score}")
# If we still don't have a score, look for any score in text
if score is None:
# Try to find a score pattern in the text
score_pattern = r'["\']score["\']\s*:\s*(\d+)'
score_match = re.search(score_pattern, result['response'])
if score_match:
score = int(score_match.group(1))
print(f"Extracted score from regex: {score}")
else:
# Look for visual evidence of actual scores in text like "score: 7", "score is 8", "score of 9 out of 10"
descriptive_score_pattern = r'score(?:\s+is|\s*:\s*|\s+of\s+)(?:\s*)(\d+)(?:\s*out\s*of\s*10)?'
descriptive_match = re.search(descriptive_score_pattern, result['response'].lower())
if descriptive_match:
score = int(descriptive_match.group(1))
print(f"Extracted score from descriptive text: {score}")
else:
# Try to determine score from pass/fail status (legacy mode)
result_text = result.get('response', '').upper()
if "PASS" in result_text:
score = 10 # Pass = 10/10
print("Detected PASS keyword, setting score to 10")
elif "FAIL" in result_text:
score = 3 # Fail = 3/10
print("Detected FAIL keyword, setting score to 3")
else:
print(f"Could not extract score, using default of 5")
score = 5 # Default middle score
except Exception as parse_error:
print(f"Error parsing score from response: {parse_error}")
score = 5 # Default to middle score
# Add the score to the result
result['score'] = score if score is not None else 5 # Default to middle score if extraction fails
# Process result for JSON mode
if output_mode == 'json':
# For JSON mode, we update the response to be ONLY the JSON part
json_data = extract_json_from_response(result['response'])
if json_data:
# If we found JSON in the response, replace the full text with just the extracted JSON
result['original_response'] = result['response'] # Save original for debugging
result['response'] = json.dumps(json_data, indent=2) # Pretty print the JSON
# Add metadata but don't save to file
output_json = {
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"check_name": check_name,
"display_name": qc_apps[check_name]['display_name'],
"model": model_name,
"file_analyzed": os.path.basename(file_path),
"result": result,
"score": result['score'],
"has_reference": reference_path is not None
}
return jsonify(output_json)
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}), 500
@app.route('/api/triage_file', methods=['POST'])
def api_triage_file():
"""API endpoint to perform file type triage detection"""
try:
# Force reload QC apps to ensure triage app is available
if not qc_apps:
load_qc_apps()
# Check if file is in request
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file part'}), 400
file = request.files['file']
# Check if file was selected
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No selected file'}), 400
# Get the brand/profile base name for triage routing
brand = request.form.get('brand', 'default')
# Create unique session ID and save file
session_id = datetime.now().strftime('%Y%m%d_%H%M%S')
session_folder = os.path.join(app.config['UPLOAD_FOLDER'], session_id)
os.makedirs(session_folder, exist_ok=True)
file_path = os.path.join(session_folder, file.filename)
file.save(file_path)
# Run file type triage detection
if 'file_type_triage' not in qc_apps:
return jsonify({'status': 'error', 'message': 'File type triage app not available'}), 500
# Get the triage prompt
prompt = qc_apps['file_type_triage']['prompt']
# Run the triage check
result = run_visual_qc(
prompt=prompt,
asset_path=file_path,
model_name='Gemini' # Use Gemini for triage
)
# Extract the triage results
triage_data = extract_json_from_response(result['response'])
if not triage_data:
return jsonify({
'status': 'error',
'message': 'Could not extract triage results',
'raw_response': result['response']
}), 500
# Generate the suggested profile name
format_suffix = triage_data.get('recommended_qc_profile', '_digital')
suggested_profile = f"{brand}{format_suffix}"
# Return triage results
return jsonify({
'status': 'success',
'session_id': session_id,
'file_path': file_path,
'filename': file.filename,
'triage_results': triage_data,
'suggested_profile': suggested_profile,
'brand': brand,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}), 500
@app.route('/api/process_triaged_file', methods=['POST'])
@auth.require_auth
def api_process_triaged_file():
"""API endpoint to process a file that has already been triaged"""
try:
data = request.json
# Validate required parameters
if not data or 'session_id' not in data or 'profile' not in data:
return jsonify({'status': 'error', 'message': 'Missing required parameters: session_id and profile'}), 400
session_id = data['session_id']
profile_name = data['profile']
output_mode = data.get('mode', 'json').lower()
# Reconstruct file path from session
session_folder = os.path.join(app.config['UPLOAD_FOLDER'], session_id)
if not os.path.exists(session_folder):
return jsonify({'status': 'error', 'message': 'Session not found'}), 404
# Find the file in the session folder
files = os.listdir(session_folder)
if not files:
return jsonify({'status': 'error', 'message': 'No file found in session'}), 404
filename = files[0] # Use the first file found
file_path = os.path.join(session_folder, filename)
# Validate profile exists
if profile_name not in PROFILES:
available_profiles = list(PROFILES.keys())
return jsonify({
'status': 'error',
'message': f'Invalid profile: {profile_name}. Available profiles: {available_profiles}'
}), 400
# Get profile and process checks
profile = get_profile(profile_name)
selected_checks = profile.get_enabled_checks()
profile_weights = profile.get_check_weights()
if not selected_checks:
return jsonify({'status': 'error', 'message': 'No QC checks available for the selected profile'}), 400
# Process each check (same logic as original process_file)
check_results = {}
overall_weighted_score = 0
total_weight = 0
for i, check_name in enumerate(selected_checks):
print(f"Processing check {i+1}/{len(selected_checks)}: {check_name}")
# Skip if check is not available
if check_name not in qc_apps:
check_results[check_name] = {
'status': 'error',
'message': f'QC check not found'
}
continue
# Get LLM preference for this check
ai_model = profile.get_check_llm(check_name)
# Get the prompt and reference image
prompt = qc_apps[check_name]['prompt']
reference_path = qc_apps[check_name].get('reference_image')
try:
# Run the QC check
result = run_visual_qc(
prompt=prompt,
asset_path=file_path,
reference_path=reference_path,
model_name=ai_model
)
# Extract score from result
score = extract_score_from_result(result, profile, check_name)
result['score'] = score
# Process result for JSON mode
if output_mode == 'json':
json_data = extract_json_from_response(result['response'])
if json_data:
result['original_response'] = result['response']
result['response'] = json.dumps(json_data, indent=2)
# Calculate weighted score
weight = profile_weights.get(check_name, 1.0 / len(selected_checks))
weighted_score = result['score'] * weight
overall_weighted_score += weighted_score
total_weight += weight
# Store result
check_results[check_name] = {
'status': 'success',
'display_name': qc_apps[check_name]['display_name'],
'score': result['score'],
'weight': weight,
'weighted_score': weighted_score,
'has_reference': reference_path is not None,
'result': result
}
except Exception as e:
check_results[check_name] = {
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}
# Calculate overall score
overall_score = 0
if total_weight > 0:
# Special case for Unilever key visual profile - show percentage of 120
if profile_name == 'unilever_key_visual':
# For Unilever profile, calculate as percentage of 120
# Maximum possible score is 10 * 1.2 = 12, so scale to 120
max_possible_score = 10 * total_weight # 10 * 1.2 = 12
overall_score = round((overall_weighted_score / max_possible_score) * 120, 1)
else:
# Maximum possible score is 10 * total_weight, so normalize to 100%
max_possible_score = 10 * total_weight
overall_score = round((overall_weighted_score / max_possible_score) * 100, 1)
else:
successful_checks = [r for r in check_results.values() if r.get('status') == 'success']
if successful_checks:
sum_scores = sum(r.get('score', 0) for r in successful_checks)
overall_score = round((sum_scores / len(successful_checks)) * 10, 1)
# Generate report data
report_data = {
'session_id': session_id,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'filename': filename,
'profile': profile_name,
'profile_id': profile_name, # Store the profile ID for HTML detection
'profile_name': profile.name,
'model': 'Profile-based selection',
'results': check_results,
'summary': {
'overall_score': overall_score,
'profile': profile.name,
'checks_count': len([r for r in check_results.values() if r.get('status') == 'success']),
'total_checks': len(check_results),
'total_weighted_score': overall_weighted_score,
'total_weight': total_weight
}
}
# Determine overall grade
grade = determine_grade(overall_score)
report_data['summary']['grade'] = grade
# Save results to file regardless of output mode
output_path = save_results_to_file(report_data, filename, output_mode, session_id, file_path)
# Return results based on output mode
if output_mode == 'html':
html_response = generate_html_response(report_data, filename)
# Add file path info to the response
return html_response
else:
# For JSON mode, return the data with file path info
report_data['output_file'] = {
'path': output_path,
'filename': os.path.basename(output_path),
'url': f'{request.environ.get("SCRIPT_NAME", "")}/output/{os.path.basename(output_path)}'
}
return jsonify(report_data)
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}), 500
@app.route('/api/process_file', methods=['POST'])
@auth.require_auth
def api_process_file():
"""API endpoint to process a file with specified profiles and checks"""
try:
# Force reload QC apps to ensure they're available for this request
if not qc_apps:
load_qc_apps()
print(f"API: Loaded {len(qc_apps)} QC apps for processing")
# Check if file is in request
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file part'}), 400
file = request.files['file']
# Check if file was selected
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No selected file'}), 400
# Get output mode (json or html)
output_mode = request.form.get('mode', 'json').lower()
if output_mode not in ['json', 'html']:
output_mode = 'json' # Default to JSON if invalid mode
# Get selected profiles from request (can be single or multiple)
# Support both single profile parameter and multiple profiles array
selected_profiles = []
if 'profiles[]' in request.form:
selected_profiles = request.form.getlist('profiles[]')
elif 'profile' in request.form:
selected_profiles = [request.form.get('profile')]
if not selected_profiles:
selected_profiles = ['default'] # Use default if none specified
# Validate profiles
for profile_id in selected_profiles:
if profile_id != 'custom' and profile_id not in PROFILES:
available_profiles = list(PROFILES.keys())
return jsonify({
'status': 'error',
'message': f'Invalid profile: {profile_id}. Available profiles: {available_profiles}'
}), 400
# Track all selected checks and weights from all profiles
all_selected_checks = set()
combined_profile_weights = {}
selected_profile_names = []
custom_selection = False
# Process each selected profile
for profile_id in selected_profiles:
if profile_id == 'custom':
# Custom selection
custom_checks = request.form.getlist('checks')
if not custom_checks:
return jsonify({'status': 'error', 'message': 'Custom profile selected but no checks were specified'}), 400
all_selected_checks.update(custom_checks)
custom_selection = True
else:
# Standard profile
profile = get_profile(profile_id)
selected_profile_names.append(profile.name)
profile_checks = profile.get_enabled_checks()
all_selected_checks.update(profile_checks)
# Add profile weights (we'll normalize later)
profile_weights = profile.get_check_weights()
for check, weight in profile_weights.items():
if check in combined_profile_weights:
# If a check is in multiple profiles, use the higher weight
combined_profile_weights[check] = max(combined_profile_weights[check], weight)
else:
combined_profile_weights[check] = weight
# Convert to list for consistency with the rest of the code
selected_checks = list(all_selected_checks)
profile_weights = combined_profile_weights
if not selected_checks:
return jsonify({'status': 'error', 'message': 'No QC checks available for the selected profile'}), 400
# Create unique session ID and save file
session_id = datetime.now().strftime('%Y%m%d_%H%M%S')
# Create session folder and save file
session_folder = os.path.join(app.config['UPLOAD_FOLDER'], session_id)
os.makedirs(session_folder, exist_ok=True)
file_path = os.path.join(session_folder, file.filename)
file.save(file_path)
# Process each check
check_results = {}
overall_weighted_score = 0
total_weight = 0
# Track progress for client
progress = {
'total_checks': len(selected_checks),
'completed_checks': 0,
'current_check': ''
}
for i, check_name in enumerate(selected_checks):
# Update progress
progress['current_check'] = check_name
progress['completed_checks'] = i
print(f"API Progress: {i+1}/{len(selected_checks)} - Processing {check_name}")
# Skip if check is not available
if check_name not in qc_apps:
check_results[check_name] = {
'status': 'error',
'message': f'QC check not found'
}
continue
# Determine which LLM to use based on the selected profiles
# Default to Gemini
ai_model = 'Gemini'
# Check each profile's LLM preference for this check
for profile_id in selected_profiles:
if profile_id != 'custom': # Skip custom profile
check_llm_map = get_check_llm_map(profile_id)
if check_name in check_llm_map:
# If any profile uses OpenAI for this check, prioritize OpenAI
if check_llm_map[check_name] == 'OpenAI':
ai_model = 'OpenAI'
break
# Get the prompt
prompt = qc_apps[check_name]['prompt']
# Get reference image if available
reference_path = qc_apps[check_name].get('reference_image')
try:
# Run the QC check
result = run_visual_qc(
prompt=prompt,
asset_path=file_path,
reference_path=reference_path,
model_name=ai_model
)
# Extract score from result
score = None
try:
# Use our extraction function to get score from JSON blocks
json_data = extract_json_from_response(result['response'])
if 'score' in json_data:
score = json_data.get('score')
print(f"Extracted score from JSON block: {score}")
# If we still don't have a score, look for any score in text
if score is None:
# Try to find a score pattern in the text
score_pattern = r'["\']score["\']\s*:\s*(\d+)'
score_match = re.search(score_pattern, result['response'])
if score_match:
score = int(score_match.group(1))
print(f"Extracted score from regex: {score}")
else:
# Look for descriptive scores in text
descriptive_score_pattern = r'score(?:\s+is|\s*:\s*|\s+of\s+)(?:\s*)(\d+)(?:\s*out\s*of\s*10)?'
descriptive_match = re.search(descriptive_score_pattern, result['response'].lower())
if descriptive_match:
score = int(descriptive_match.group(1))
print(f"Extracted score from descriptive text: {score}")
else:
# Try to determine score from pass/fail status (legacy mode)
result_text = result.get('response', '').upper()
if "PASS" in result_text:
score = 10 # Pass = 10/10
print("Detected PASS keyword, setting score to 10")
elif "FAIL" in result_text:
score = 3 # Fail = 3/10
print("Detected FAIL keyword, setting score to 3")
else:
score = 5 # Default middle score
print(f"Could not extract score, using default of 5")
except Exception as parse_error:
print(f"Error parsing score from response: {parse_error}")
score = 5 # Default to middle score
# Add the score to the result
result['score'] = score if score is not None else 5
# Process result for JSON mode
if output_mode == 'json':
# For JSON mode, we update the response to be ONLY the JSON part
json_data = extract_json_from_response(result['response'])
if json_data:
# If we found JSON in the response, replace the full text with just the extracted JSON
result['original_response'] = result['response'] # Save original for debugging
result['response'] = json.dumps(json_data, indent=2) # Pretty print the JSON
# Calculate weighted score
weight = profile_weights.get(check_name, 0)
if weight == 0 and ('default' in selected_profiles or len(selected_profiles) == 0):
weight = 1.0 / len(selected_checks)
weighted_score = result['score'] * weight
overall_weighted_score += weighted_score
total_weight += weight
# Store result
check_results[check_name] = {
'status': 'success',
'display_name': qc_apps[check_name]['display_name'],
'score': result['score'],
'weight': weight,
'weighted_score': weighted_score,
'has_reference': reference_path is not None,
'result': result
}
# Update progress after successful check
progress['completed_checks'] = i + 1
except Exception as e:
check_results[check_name] = {
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}
# Calculate overall score
overall_score = 0
if total_weight > 0:
# Special case for Unilever key visual profile - show percentage of 120
if len(selected_profiles) == 1 and selected_profiles[0] == 'unilever_key_visual':
# For Unilever profile, calculate as percentage of 120
# Maximum possible score is 10 * 1.2 = 12, so scale to 120
max_possible_score = 10 * total_weight # 10 * 1.2 = 12
overall_score = round((overall_weighted_score / max_possible_score) * 120, 1)
else:
# Maximum possible score is 10 * total_weight, so normalize to 100%
max_possible_score = 10 * total_weight
overall_score = round((overall_weighted_score / max_possible_score) * 100, 1)
else:
# Simple average if no weights
successful_checks = [r for r in check_results.values() if r.get('status') == 'success']
if successful_checks:
sum_scores = sum(r.get('score', 0) for r in successful_checks)
overall_score = round((sum_scores / len(successful_checks)) * 10, 1)
# Generate report data
report_data = {
'session_id': session_id,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'filename': file.filename,
'profiles': selected_profiles,
'profile_id': selected_profiles[0] if len(selected_profiles) == 1 else None, # Store single profile ID for HTML detection
'profile_name': ", ".join(selected_profile_names) or "Multiple Profiles",
'model': 'Profile-based selection',
'results': check_results,
'summary': {
'overall_score': overall_score,
'profile': ", ".join(selected_profile_names) or "Multiple Profiles",
'checks_count': len([r for r in check_results.values() if r.get('status') == 'success']),
'total_checks': len(check_results),
'total_weighted_score': overall_weighted_score,
'total_weight': total_weight
}
}
# Determine overall grade based on score (Pass/Fail)
avg_individual_score = overall_score / 10 # Convert to 1-10 scale
grade = 'Pass' if avg_individual_score >= 6 else 'Fail'
# Add grade to summary
report_data['summary']['grade'] = grade
# Save results to file regardless of output mode
output_path = save_results_to_file(report_data, file.filename, output_mode, session_id, file_path)
# Return data based on output mode
if output_mode == 'html':
# Create a more interactive HTML view with formatted results
# Define a function to get color based on score
def get_score_result(score):
if score >= 6:
return "Pass", "#28a745" # Green for pass
else:
return "Fail", "#dc3545" # Red for fail
# Build HTML for each check result
check_results_html = ""
for check_name, check_data in report_data['results'].items():
if check_data['status'] == 'success':
display_name = check_data.get('display_name', check_name)
score = check_data.get('score', 0)
result_text, score_color = get_score_result(score)
# Extract response text (strip JSON blocks for cleaner display)
response_text = check_data['response']
# Remove JSON code blocks for cleaner reading
import re
response_text = re.sub(r'```json.*?```', '', response_text, flags=re.DOTALL)
response_text = response_text.strip()
check_results_html += f"""
<div class="check-result">
<h3>{display_name} <span class="score" style="color: {score_color}">{result_text}</span></h3>
<div class="response">{response_text.replace(chr(10), '<br>')}</div>
</div>
<hr>
"""
# Get summary score result
overall_score = report_data['summary']['overall_score']
overall_result, overall_color = get_score_result(overall_score/10) # Normalize to 0-10 scale
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Visual AI QC Results for {file.filename}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; padding: 0; background-color: #f8f9fa; }}
.container {{ max-width: 1200px; margin: 0 auto; background-color: white; padding: 20px; border-radius: 8px; box-shadow: 0 0 10px rgba(0,0,0,0.1); }}
h1, h2, h3 {{ color: #333; }}
h1 {{ margin-bottom: 5px; }}
.summary {{ background-color: #e9ecef; padding: 15px; border-radius: 5px; margin: 20px 0; }}
.score {{ font-weight: normal; margin-left: 10px; }}
.check-result {{ margin-bottom: 20px; }}
.response {{ margin-top: 10px; line-height: 1.5; }}
hr {{ border: 0; height: 1px; background-color: #ddd; margin: 30px 0; }}
.grade {{ font-size: 1.2em; font-weight: bold; }}
pre {{ background-color: #f5f5f5; padding: 15px; border-radius: 5px; overflow-x: auto; }}
.json-toggle {{ cursor: pointer; color: #007bff; text-decoration: underline; }}
.json-view {{ display: none; margin-top: 20px; }}
</style>
</head>
<body>
<div class="container">
<h1>Visual AI QC Results</h1>
<p>File analyzed: <strong>{file.filename}</strong> | Timestamp: {report_data['timestamp']}</p>
<div class="summary">
<h2>Summary</h2>
<p>Overall Result: <span style="color: {overall_color}; font-weight: bold; font-size: 1.2em;">{overall_result}</span></p>
<p>Status: <span class="grade">{report_data['summary']['grade']}</span></p>
<p>Profile: {report_data['profile_name']}</p>
<p>Total Checks: {report_data['summary']['checks_count']}</p>
</div>
<h2>Detailed Results</h2>
{check_results_html}
<p class="json-toggle" onclick="document.getElementById('json-data').style.display = document.getElementById('json-data').style.display === 'none' ? 'block' : 'none';">
Show/Hide Raw JSON Data
</p>
<div id="json-data" class="json-view">
<pre>{json.dumps(report_data, indent=2)}</pre>
</div>
</div>
</body>
</html>
"""
# For HTML mode, redirect to the saved file
return Response(html_content, mimetype='text/html')
else:
# For JSON mode, return the data with file path info
report_data['output_file'] = {
'path': output_path,
'filename': os.path.basename(output_path),
'url': f'{request.environ.get("SCRIPT_NAME", "")}/output/{os.path.basename(output_path)}'
}
return jsonify(report_data)
except Exception as e:
# Include progress information in error response
error_progress = progress.get('completed_checks', 0) if 'progress' in locals() else 0
total_checks = progress.get('total_checks', 0) if 'progress' in locals() else 0
if not total_checks and 'selected_checks' in locals():
total_checks = len(selected_checks)
error_data = {
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc(),
'progress': {
'total_checks': total_checks,
'completed_checks': error_progress,
'percentage': (error_progress / total_checks * 100) if total_checks else 0
}
}
# Get output mode if possible, default to JSON if not defined or in case of error
output_mode = request.form.get('mode', 'json').lower() if 'mode' in request.form else 'json'
if output_mode == 'html':
# Create a more user-friendly error page
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Visual AI QC Error</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; padding: 0; background-color: #f8f9fa; }}
.container {{ max-width: 1200px; margin: 0 auto; background-color: white; padding: 20px; border-radius: 8px; box-shadow: 0 0 10px rgba(0,0,0,0.1); }}
h1 {{ color: #dc3545; margin-bottom: 5px; }}
.error-box {{ background-color: #f8d7da; border: 1px solid #f5c6cb; color: #721c24; padding: 15px; border-radius: 5px; margin: 20px 0; }}
.progress-box {{ background-color: #cce5ff; border: 1px solid #b8daff; color: #004085; padding: 15px; border-radius: 5px; margin: 20px 0; }}
pre {{ background-color: #f5f5f5; padding: 15px; border-radius: 5px; overflow-x: auto; }}
.json-toggle {{ cursor: pointer; color: #007bff; text-decoration: underline; }}
.json-view {{ display: none; margin-top: 20px; }}
.progress-bar {{ height: 20px; background-color: #e9ecef; border-radius: 5px; margin-top: 10px; }}
.progress-bar-fill {{ height: 100%; background-color: #007bff; border-radius: 5px; width: {error_data['progress']['percentage']}%; }}
</style>
</head>
<body>
<div class="container">
<h1>Error Processing Request</h1>
<div class="error-box">
<h3>Error Message</h3>
<p>{error_data['message']}</p>
</div>
<div class="progress-box">
<h3>Processing Progress</h3>
<p>Completed {error_data['progress']['completed_checks']} of {error_data['progress']['total_checks']} checks</p>
<div class="progress-bar">
<div class="progress-bar-fill"></div>
</div>
</div>
<p class="json-toggle" onclick="document.getElementById('error-data').style.display = document.getElementById('error-data').style.display === 'none' ? 'block' : 'none';">
Show/Hide Technical Details
</p>
<div id="error-data" class="json-view">
<pre>{json.dumps(error_data, indent=2)}</pre>
</div>
</div>
</body>
</html>
"""
return Response(html_content, mimetype='text/html'), 500
else:
# Default JSON mode
return jsonify(error_data), 500
@app.route('/analyze', methods=['POST'])
@app.route('/api/analyze', methods=['POST'])
@auth.require_auth
def api_analyze_with_triage():
"""
Smart analysis endpoint that combines triage and profile-based QC.
This endpoint:
1. Runs triage to determine file type
2. Automatically selects appropriate profile based on brand + file type
3. Runs full QC analysis with selected profile
4. Returns combined results showing both triage and QC analysis
Parameters:
- file: Image file to analyze
- brand: Brand name (diageo, unilever, general) - defaults to 'general'
- mode: Output mode (json, html) - defaults to 'json'
- return_file: Whether to return file info (true/false) - defaults to 'false'
"""
try:
# Force reload QC apps to ensure they're available
if not qc_apps:
load_qc_apps()
# Check if file is in request
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file part'}), 400
file = request.files['file']
# Check if file was selected
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No selected file'}), 400
# Get parameters
brand = request.form.get('brand', 'general').lower()
profile = request.form.get('profile', '').lower()
output_mode = request.form.get('mode', 'json').lower()
return_file = request.form.get('return_file', 'false').lower() == 'true'
# Validate brand - allow any brand name now that users can create custom profiles
# The old validation was too restrictive for custom profiles
if not brand or brand.strip() == '':
brand = 'general'
# Validate output mode
if output_mode not in ['json', 'html']:
output_mode = 'json'
# Create unique session ID and save file
session_id = datetime.now().strftime('%Y%m%d_%H%M%S')
session_folder = os.path.join(app.config['UPLOAD_FOLDER'], session_id)
os.makedirs(session_folder, exist_ok=True)
file_path = os.path.join(session_folder, file.filename)
file.save(file_path)
# Initialize progress tracking for this session
progress_tracker[session_id] = {
'total_checks': 0,
'completed_checks': 0,
'current_check': 'Initializing',
'current_check_display': 'Initializing Analysis',
'stage': 'setup',
'percentage': 0,
'session_id': session_id
}
# STEP 1: Run Triage Analysis
print(f"Step 1: Running triage analysis for {file.filename}")
# Update progress
progress_tracker[session_id].update({
'stage': 'triage',
'current_check': 'file_type_triage',
'current_check_display': 'File Type Detection',
'percentage': 10
})
if 'file_type_triage' not in qc_apps:
return jsonify({'status': 'error', 'message': 'File type triage app not available'}), 500
# Get the triage prompt and run triage
triage_prompt = qc_apps['file_type_triage']['prompt']
triage_result = run_visual_qc(
prompt=triage_prompt,
asset_path=file_path,
model_name='Gemini'
)
# Extract triage results
triage_data = extract_json_from_response(triage_result['response'])
if not triage_data:
return jsonify({
'status': 'error',
'message': 'Could not extract triage results',
'raw_triage_response': triage_result['response']
}), 500
# STEP 2: Determine Appropriate Profile
print(f"Step 2: Determining profile based on {'direct profile parameter' if profile else 'triage results'}")
if profile and profile.strip():
# Use the directly specified profile
suggested_profile = profile
print(f"Using directly specified profile: {suggested_profile}")
else:
# Get format suffix from triage results
format_suffix = triage_data.get('recommended_qc_profile', '_digital')
# Construct profile name
suggested_profile = f"{brand}{format_suffix}"
# Check if the suggested profile exists, fallback to main brand profile if not
if suggested_profile not in PROFILES:
suggested_profile = brand
if suggested_profile not in PROFILES:
suggested_profile = 'general' # Final fallback
print(f"Profile constructed from triage: {suggested_profile}")
print(f"Final selected profile: {suggested_profile}")
# STEP 3: Run Full QC Analysis with Selected Profile
print(f"Step 3: Running QC analysis with profile '{suggested_profile}'")
# Get the profile configuration
profile_config = get_profile(suggested_profile)
if not profile_config:
return jsonify({
'status': 'error',
'message': f'Profile {suggested_profile} not found'
}), 400
# Get enabled checks from profile using Profile object methods
enabled_checks = profile_config.get_enabled_checks()
profile_weights = profile_config.get_check_weights()
# Filter to only include checks that exist in qc_apps
enabled_checks = [check for check in enabled_checks if check in qc_apps]
if not enabled_checks:
return jsonify({
'status': 'error',
'message': f'No enabled checks found for profile {suggested_profile}'
}), 400
# Run QC checks in parallel batches
# Update progress tracker with total checks
progress_tracker[session_id].update({
'total_checks': len(enabled_checks),
'stage': 'qc_analysis',
'percentage': 20
})
# Get reference asset from form for this endpoint
reference_asset = request.form.get('reference_asset', '')
# Use the parallel processing function
check_results = process_checks_in_batches_with_triage(
enabled_checks, qc_apps, profile_config, profile_weights,
file_path, reference_asset, brand_db, progress_tracker,
session_id, batch_size=15, base_percentage=20, percentage_range=70
)
# STEP 4: Calculate Overall Score
print(f"Step 4: Calculating overall score")
total_weighted_score = 0
total_weight = 0
completed_checks = 0
failed_checks = 0
for check_name, result in check_results.items():
weight = result.get('weight', 0.1)
total_weight += weight
if result['status'] == 'completed':
completed_checks += 1
score = result.get('score')
if score is not None:
total_weighted_score += score * weight
else:
failed_checks += 1
# Calculate overall score - sum of weighted scores scaled to 100
# For profiles with total_weight = 10.0 (like General Check), use direct weighted score
# For profiles with total_weight = 1.0, multiply by 10 to scale to 100
if total_weight >= 10.0:
overall_score = total_weighted_score # Already scaled correctly
elif total_weight > 0:
overall_score = (total_weighted_score * 10) # Scale to 100-point system
else:
overall_score = 0
# STEP 5: Prepare Combined Response
print(f"Step 5: Preparing response")
# Update progress to completion
progress_tracker[session_id].update({
'stage': 'complete',
'current_check': 'Finalizing',
'current_check_display': 'Finalizing Report',
'percentage': 100,
'completed_checks': len(enabled_checks)
})
# Since triage is skipped, set default triage data
triage_data = {
'primary_format': 'user_specified',
'specific_type': 'profile_based_analysis',
'confidence_score': 10,
'format_indicators': 'User selected profile directly',
'secondary_format': '',
'recommended_qc_profile': suggested_profile
}
triage_result = {'response': 'Triage skipped - using user-selected profile directly'}
# Build comprehensive response
response_data = {
'status': 'success',
'session_id': session_id,
'filename': file.filename,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
# Triage Results Section
'triage_analysis': {
'primary_format': triage_data.get('primary_format', 'unknown'),
'specific_type': triage_data.get('specific_type', 'unknown'),
'confidence_score': triage_data.get('confidence_score', 0),
'format_indicators': triage_data.get('format_indicators', ''),
'secondary_format': triage_data.get('secondary_format', ''),
'recommended_qc_profile': triage_data.get('recommended_qc_profile', ''),
'full_triage_response': triage_result['response']
},
# Profile Selection Section
'profile_selection': {
'requested_brand': brand,
'suggested_profile': suggested_profile,
'profile_used': suggested_profile,
'profile_name': profile_config.name,
'profile_description': profile_config.description,
'reference_asset': reference_asset if reference_asset else None,
'reference_asset_used': bool(reference_asset)
},
# QC Analysis Results Section
'qc_analysis': {
'overall_score': round(overall_score, 2),
'total_checks': len(enabled_checks),
'completed_checks': completed_checks,
'failed_checks': failed_checks,
'total_weight': round(total_weight, 2),
'check_results': check_results
},
# Summary Section
'summary': {
'file_type_detected': f"{triage_data.get('primary_format', 'unknown')} - {triage_data.get('specific_type', 'unknown')}",
'profile_applied': profile_config.name,
'overall_score': round(overall_score, 2),
'score_percentage': f"{round(overall_score, 1)}%",
'checks_passed': completed_checks,
'checks_failed': failed_checks,
'confidence': triage_data.get('confidence_score', 0)
}
}
# Add file info if requested
if return_file:
response_data['file_info'] = {
'path': file_path,
'size': os.path.getsize(file_path),
'session_folder': session_folder
}
# Auto-save HTML report to output directory (regardless of output_mode)
try:
# Generate comprehensive HTML report using the same format as the web UI
html_report_content = generate_comprehensive_html_report(response_data, file.filename, file_path)
# Create output filename
safe_filename = re.sub(r'[^a-zA-Z0-9.-]', '_', file.filename)
output_filename = f"{session_id}_{safe_filename}_report.html"
output_path = os.path.join(app.config['OUTPUT_FOLDER'], output_filename)
# Save HTML report to output directory
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_report_content)
print(f"HTML report auto-saved to: {output_path}")
# Add output file info to response
response_data['output_file'] = {
'path': output_path,
'filename': output_filename,
'auto_saved': True
}
except Exception as e:
print(f"Error auto-saving HTML report: {str(e)}")
# Don't fail the entire request if auto-save fails
# Return appropriate format
if output_mode == 'html':
# Create HTML response with both triage and QC results
triage_html = f"""
<div class="triage-section">
<h2>📋 File Type Analysis (Triage)</h2>
<div class="triage-results">
<div class="result-item">
<strong>Detected Format:</strong> {triage_data.get('primary_format', 'unknown').title()} - {triage_data.get('specific_type', 'unknown')}
</div>
<div class="result-item">
<strong>Confidence:</strong> {triage_data.get('confidence_score', 0)}/10
</div>
<div class="result-item">
<strong>Key Indicators:</strong> {triage_data.get('format_indicators', 'N/A')}
</div>
<div class="result-item">
<strong>Recommended Profile:</strong> {triage_data.get('recommended_qc_profile', 'N/A')}
</div>
</div>
</div>
"""
profile_html = f"""
<div class="profile-section">
<h2>🎯 Profile Selection</h2>
<div class="profile-results">
<div class="result-item">
<strong>Brand:</strong> {brand.title()}
</div>
<div class="result-item">
<strong>Profile Used:</strong> {profile_config.name}
</div>
<div class="result-item">
<strong>Description:</strong> {profile_config.description}
</div>
</div>
</div>
"""
# Generate QC results HTML
check_results_html = ""
for check_name, result in check_results.items():
score = result.get('score', 0) if result.get('score') is not None else 0
status_class = "pass" if result['status'] == 'completed' and score >= 6 else "fail"
result_display = "Pass" if score >= 6 else "Fail" if score > 0 else "N/A"
check_results_html += f"""
<div class="check-result {status_class}">
<h3>{check_name.replace('_', ' ').title()}</h3>
<div class="score">Result: {result_display}</div>
<div class="weight">Weight: {result.get('weight', 0.1)}</div>
<div class="response">{result.get('response', 'No response')[:500]}...</div>
</div>
"""
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Visual AI QC Analysis Results</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f8f9fa; }}
.container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }}
.header {{ text-align: center; margin-bottom: 30px; border-bottom: 2px solid #007bff; padding-bottom: 20px; }}
.summary {{ background: #e3f2fd; padding: 20px; border-radius: 8px; margin-bottom: 30px; }}
.triage-section, .profile-section {{ background: #f0f8f0; padding: 20px; border-radius: 8px; margin-bottom: 20px; }}
.result-item {{ margin: 10px 0; padding: 10px; background: white; border-left: 4px solid #007bff; }}
.check-result {{ margin: 15px 0; padding: 15px; border-radius: 8px; border-left: 4px solid #ccc; }}
.check-result.pass {{ border-left-color: #28a745; background-color: #d4edda; }}
.check-result.fail {{ border-left-color: #dc3545; background-color: #f8d7da; }}
.score {{ font-weight: bold; color: #007bff; }}
.json-toggle {{ cursor: pointer; color: #007bff; text-decoration: underline; margin: 20px 0; }}
.json-view {{ background: #f8f9fa; padding: 15px; border-radius: 5px; overflow-x: auto; display: none; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🤖 Visual AI QC Analysis Results</h1>
<p>File: {file.filename} | Analyzed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
<div class="summary">
<h2>📊 Summary</h2>
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px;">
<div><strong>Overall Result:</strong> {'Pass' if overall_score/10 >= 6 else 'Fail'}</div>
<div><strong>File Type:</strong> {triage_data.get('primary_format', 'unknown').title()}</div>
<div><strong>Profile Used:</strong> {profile_config.name}</div>
<div><strong>Checks Performed:</strong> {completed_checks}/{len(enabled_checks)}</div>
</div>
</div>
{triage_html}
{profile_html}
<div class="qc-section">
<h2>🔍 Quality Control Analysis</h2>
{check_results_html}
</div>
<p class="json-toggle" onclick="document.getElementById('json-data').style.display = document.getElementById('json-data').style.display === 'none' ? 'block' : 'none';">
Show/Hide Raw JSON Data
</p>
<div id="json-data" class="json-view">
<pre>{json.dumps(response_data, indent=2)}</pre>
</div>
</div>
</body>
</html>
"""
return Response(html_content, mimetype='text/html')
else:
# Add session_id to response for progress tracking
response_data['session_id'] = session_id
# Return JSON response
return jsonify(response_data)
except Exception as e:
# Update progress to error state if session exists
if 'session_id' in locals() and session_id in progress_tracker:
progress_tracker[session_id].update({
'stage': 'error',
'current_check': 'Error',
'current_check_display': 'Analysis Failed',
'percentage': 0
})
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}), 500
finally:
# Schedule cleanup of progress tracker after 5 minutes
import threading
import time
def cleanup_progress():
time.sleep(300) # 5 minutes
if 'session_id' in locals() and session_id in progress_tracker:
del progress_tracker[session_id]
if 'session_id' in locals():
threading.Thread(target=cleanup_progress, daemon=True).start()
@app.route('/api/profiles', methods=['POST'])
@auth.require_auth
def create_profile():
"""Create a new QC profile"""
try:
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No data provided'}), 400
profile_name = data.get('name', '').strip()
if not profile_name:
return jsonify({'status': 'error', 'message': 'Profile name is required'}), 400
# Create profile filename (sanitize name)
import re
safe_name = re.sub(r'[^a-zA-Z0-9_-]', '_', profile_name.lower())
profile_filename = f"{safe_name}.json"
profile_path = os.path.join('profiles', profile_filename)
# Check if profile already exists
if os.path.exists(profile_path):
return jsonify({'status': 'error', 'message': 'Profile already exists'}), 400
# Create profile data structure
profile_data = {
"name": profile_name,
"description": data.get('description', ''),
"pass_threshold": data.get('pass_threshold', 85),
"checks": data.get('checks', {})
}
# Save profile to file
with open(profile_path, 'w') as f:
json.dump(profile_data, f, indent=2)
return jsonify({
'status': 'success',
'message': f'Profile "{profile_name}" created successfully',
'profile_id': safe_name,
'profile_path': profile_path
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/profiles/<profile_id>', methods=['PUT'])
@auth.require_auth
def update_profile(profile_id):
"""Update an existing QC profile"""
try:
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No data provided'}), 400
profile_filename = f"{profile_id}.json"
profile_path = os.path.join('profiles', profile_filename)
# Check if profile exists
if not os.path.exists(profile_path):
return jsonify({'status': 'error', 'message': 'Profile not found'}), 404
# Load existing profile
with open(profile_path, 'r') as f:
existing_profile = json.load(f)
# Update profile data
existing_profile.update({
"name": data.get('name', existing_profile.get('name')),
"description": data.get('description', existing_profile.get('description', '')),
"pass_threshold": data.get('pass_threshold', existing_profile.get('pass_threshold', 85)),
"checks": data.get('checks', existing_profile.get('checks', {}))
})
# Save updated profile
with open(profile_path, 'w') as f:
json.dump(existing_profile, f, indent=2)
return jsonify({
'status': 'success',
'message': f'Profile "{existing_profile["name"]}" updated successfully',
'profile_id': profile_id
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/profiles/<profile_id>', methods=['DELETE'])
@auth.require_auth
def delete_profile(profile_id):
"""Delete a QC profile"""
try:
profile_filename = f"{profile_id}.json"
profile_path = os.path.join('profiles', profile_filename)
# Check if profile exists
if not os.path.exists(profile_path):
return jsonify({'status': 'error', 'message': 'Profile not found'}), 404
# Load profile to get name for response
with open(profile_path, 'r') as f:
profile_data = json.load(f)
profile_name = profile_data.get('name', profile_id)
# Delete profile file
os.remove(profile_path)
return jsonify({
'status': 'success',
'message': f'Profile "{profile_name}" deleted successfully'
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/brand_guidelines', methods=['GET'])
def get_brand_guidelines():
"""Get all brand guidelines or guidelines for a specific brand"""
try:
brand_name = request.args.get('brand')
if brand_name:
guidelines = brand_db.get_brand_guidelines(brand_name)
else:
guidelines = brand_db.get_all_guidelines()
if brand_name:
return jsonify({
'status': 'success',
'guidelines': guidelines,
'brands': brand_db.get_all_brands()
})
else:
# Return full database structure that frontend expects
return jsonify({
'status': 'success',
'brands': brand_db.db['brands'],
'files': brand_db.db['files']
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/brand_guidelines', methods=['POST'])
@auth.require_auth
def upload_brand_guideline():
"""Upload a new brand guideline file"""
try:
# Check if file is in request
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No file selected'}), 400
# Get brand name and other metadata
brand_name = request.form.get('brand_name', '').strip()
description = request.form.get('description', '').strip()
tags = request.form.get('tags', '').strip().split(',') if request.form.get('tags') else []
if not brand_name:
return jsonify({'status': 'error', 'message': 'Brand name is required'}), 400
# Save uploaded file temporarily
temp_filename = f"temp_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}"
temp_path = os.path.join(app.config['UPLOAD_FOLDER'], temp_filename)
file.save(temp_path)
try:
# Add to brand guidelines database
file_record = brand_db.add_brand_guideline(
brand_name=brand_name,
file_path=temp_path,
description=description,
tags=[tag.strip() for tag in tags if tag.strip()]
)
# Remove temporary file
os.remove(temp_path)
return jsonify({
'status': 'success',
'message': f'Brand guideline uploaded successfully for {brand_name}',
'file_record': file_record
})
except Exception as e:
# Clean up temp file on error
if os.path.exists(temp_path):
os.remove(temp_path)
raise e
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/brand_guidelines/<file_id>', methods=['DELETE'])
@auth.require_auth
def delete_brand_guideline(file_id):
"""Delete a brand guideline file"""
try:
success = brand_db.delete_guideline(file_id)
if success:
return jsonify({
'status': 'success',
'message': 'Brand guideline deleted successfully'
})
else:
return jsonify({
'status': 'error',
'message': 'Brand guideline not found'
}), 404
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/detect_brand', methods=['POST'])
def detect_brand_from_file():
"""Detect brand from uploaded file using AI analysis"""
try:
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No file selected'}), 400
# Save file temporarily for analysis
temp_filename = f"brand_detect_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}"
temp_path = os.path.join(app.config['UPLOAD_FOLDER'], temp_filename)
file.save(temp_path)
try:
# Get available brands
available_brands = brand_db.get_all_brands()
# Create brand detection prompt - works whether brands are available or not
if available_brands:
brands_list = ', '.join(available_brands)
brand_detection_prompt = f"""
Analyze this image and determine:
1. Which brand this belongs to from the following options: {brands_list}
2. Whether this is a Key Visual or POS (Point of Sale) material
3. Your confidence level (0-100%)
Look for brand logos, colors, typography, and other brand elements.
If the brand is not in the provided list, still attempt to identify it.
Respond with JSON format:
{{
"detected_brand": "brand_name or null if uncertain",
"file_type": "Key Visual" or "POS",
"confidence": confidence_percentage,
"reasoning": "explanation of detection"
}}
"""
else:
brand_detection_prompt = """
Analyze this image and determine:
1. What brand this belongs to (identify any visible brand names, logos, or distinctive brand elements)
2. Whether this is a Key Visual or POS (Point of Sale) material
3. Your confidence level (0-100%)
Look for brand logos, colors, typography, and other brand elements.
Even if no brand guidelines are available, attempt to identify the brand from visual elements.
Respond with JSON format:
{
"detected_brand": "brand_name or null if uncertain",
"file_type": "Key Visual" or "POS",
"confidence": confidence_percentage,
"reasoning": "explanation of detection"
}
"""
# Run AI analysis
result = run_visual_qc(
prompt=brand_detection_prompt,
asset_path=temp_path,
model_name="Gemini"
)
# Extract JSON from response
detection_data = extract_json_from_response(result['response'])
# Clean up temp file
os.remove(temp_path)
return jsonify({
'status': 'success',
'detection': {
'detected_brand': detection_data.get('detected_brand'),
'file_type': detection_data.get('file_type', 'unknown'),
'confidence': detection_data.get('confidence', 0),
'reasoning': detection_data.get('reasoning', ''),
'available_brands': available_brands
}
})
except Exception as e:
# Clean up temp file on error
if os.path.exists(temp_path):
os.remove(temp_path)
raise e
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/qc-apps', methods=['GET'])
def get_qc_apps():
"""Get list of all QC applications with their metadata"""
try:
qc_apps_data = {}
# Build QC apps data from loaded QC checks
for check_name in qc_apps.keys():
# Get display name
display_name = check_name.replace('_', ' ').title()
# Default weights - can be customized
qc_apps_data[check_name] = {
'display_name': display_name,
'enabled': True,
'required_weight': 0.1, # Default weight
'optional_weight': 0.0,
'description': f'{display_name} quality check'
}
return jsonify({
'status': 'success',
'qc_apps': qc_apps_data,
'total_apps': len(qc_apps_data)
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/generate_report', methods=['POST'])
def api_generate_report():
"""Generate HTML report from existing session data"""
try:
# Get parameters
session_id = request.form.get('session_id')
if not session_id:
return jsonify({'status': 'error', 'message': 'Session ID required'}), 400
session_folder = os.path.join(app.config['UPLOAD_FOLDER'], session_id)
if not os.path.exists(session_folder):
return jsonify({'status': 'error', 'message': 'Session not found'}), 404
# Find the analysis result file
result_files = [f for f in os.listdir(session_folder) if f.endswith('_results.json')]
if not result_files:
return jsonify({'status': 'error', 'message': 'No analysis results found for session'}), 404
# Load the most recent results file
result_file = sorted(result_files)[-1]
result_path = os.path.join(session_folder, result_file)
with open(result_path, 'r') as f:
report_data = json.load(f)
# Find the original file
files = [f for f in os.listdir(session_folder) if not f.endswith('_results.json')]
if not files:
return jsonify({'status': 'error', 'message': 'Original file not found'}), 404
original_file = files[0]
file_path = os.path.join(session_folder, original_file)
# Generate HTML using existing function
html_content = generate_html_content(report_data, original_file, file_path)
return html_content, 200, {'Content-Type': 'text/html'}
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
# Authentication endpoints
@app.route('/auth/login', methods=['POST'])
def auth_login():
"""Process authentication tokens from MSAL popup."""
try:
# Get token from request
data = request.get_json()
if not data or 'token' not in data:
return jsonify({
'success': False,
'error': 'Token is required',
'authenticated': False
}), 400
token = data['token']
# Validate and set authentication token - returns response with cookie set
return auth.set_auth_token(token)
except Exception as e:
return jsonify({
'success': False,
'error': f'Authentication failed: {str(e)}',
'authenticated': False
}), 500
@app.route('/auth/logout', methods=['POST'])
def auth_logout():
"""Clear authentication session."""
try:
return auth.clear_auth_token()
except Exception as e:
return jsonify({
'success': False,
'error': f'Logout failed: {str(e)}'
}), 500
@app.route('/auth/status', methods=['GET'])
def auth_status():
"""Get current authentication status."""
try:
status = auth.get_auth_status()
return jsonify(status)
except Exception as e:
return jsonify({
'authenticated': False,
'error': f'Status check failed: {str(e)}'
}), 500
@app.route('/api/debug/test_file_upload', methods=['POST'])
@auth.require_auth
def debug_test_file_upload():
"""Debug endpoint to test file upload handling"""
try:
# Check if file is in request
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file part', 'files_in_request': list(request.files.keys())}), 400
file = request.files['file']
# Check if file was selected
if file.filename == '':
return jsonify({'status': 'error', 'message': 'No selected file'}), 400
# Get parameters
profile = request.form.get('profile', 'general').lower()
# Create session ID
session_id = datetime.now().strftime('%Y%m%d_%H%M%S')
# Check upload folder configuration
upload_folder = app.config.get('UPLOAD_FOLDER', 'uploads')
return jsonify({
'status': 'success',
'message': 'File upload test successful',
'session_id': session_id,
'filename': file.filename,
'profile': profile,
'upload_folder': upload_folder,
'upload_folder_exists': os.path.exists(upload_folder),
'form_data': dict(request.form),
'files': list(request.files.keys())
})
except Exception as e:
import traceback
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}), 500
# Initialize application
if not qc_apps:
load_qc_apps()
print(f"Initialized app with {len(qc_apps)} QC apps")
print(f"Brand Guidelines DB initialized: {len(brand_db.get_all_brands())} brands")
# When run directly
if __name__ == "__main__":
import argparse
# Get default port from environment, fallback to 7183
default_port = int(os.environ.get('PORT', 7183))
default_debug = debug_mode # Use the debug setting from environment
parser = argparse.ArgumentParser(description='Run Visual AI QC API server')
parser.add_argument('--host', type=str, default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=default_port, help=f'Port to listen on (default: {default_port} from environment)')
parser.add_argument('--debug', action='store_true', default=default_debug, help=f'Run in debug mode (default: {default_debug} from environment)')
args = parser.parse_args()
print(f"Environment: {current_environment}")
print(f"Starting Flask API server on {args.host}:{args.port}")
print(f"Debug mode: {args.debug}")
app.run(host=args.host, port=args.port, debug=args.debug)