forge/backend/app/services/image_upscaler.py

385 lines
15 KiB
Python

"""Image Upscaler Service - Topaz Labs API
Available Models:
- proteus: General enhancement with fine-tuning parameters (default)
- artemis: Detail enhancement and noise reduction
- gaia: Specialized for HD/4K upscaling
- iris: Noise and compression artifact reduction
- nyx: Low light and high ISO recovery
- rhea: Detail recovery for older/degraded images
- theia: High-fidelity upscaling
Output Options:
- Scale: 2x, 4x, 6x, 8x (up to 16K)
- Output formats: png, jpg, tiff
- Face enhancement: auto-detect and enhance faces
- Noise reduction: 0-100
- Sharpening: 0-100
- Grain recovery: preserve film grain
"""
import httpx
import os
from uuid import uuid4
from datetime import datetime
import asyncio
from typing import Optional, Dict, Any
from app.database import SessionLocal
from app.models.job import Job
from app.models.asset import Asset
from app.config import settings
import logging
logger = logging.getLogger(__name__)
# Topaz enhancement models with their specialties
TOPAZ_MODELS = {
"Standard V2": {
"name": "Standard V2",
"description": "General-purpose model balancing detail, sharpness, and noise reduction",
"parameters": ["face_enhancement"],
"best_for": "General purpose"
},
"High Fidelity V2": {
"name": "High Fidelity V2",
"description": "Ideal for high-quality images, preserving intricate details",
"parameters": ["face_enhancement"],
"best_for": "High quality sources"
},
"Low Resolution V2": {
"name": "Low Resolution V2",
"description": "Designed for enhancing clarity and detail in low-resolution images",
"parameters": ["face_enhancement"],
"best_for": "Low resolution / compressed"
},
"CGI": {
"name": "CGI",
"description": "Optimized for computer-generated imagery and digital illustrations",
"parameters": ["face_enhancement"],
"best_for": "Digital art, 3D renders"
},
"Text Refine": {
"name": "Text Refine",
"description": "Optimized for images containing text",
"parameters": [],
"best_for": "Documents, screenshots"
},
"Enhance Generative": {
"name": "Enhance Generative",
"description": "Generative model for high quality and creative detail (slower)",
"parameters": ["face_enhancement"],
"best_for": "High quality creative upscaling"
}
}
async def upscale(job_id: str):
"""Upscale image using Topaz Labs API
Input parameters:
- scale: Upscale factor (2, 4, 6, 8)
- model: Enhancement model (see TOPAZ_MODELS)
- output_format: 'png', 'jpg', 'tiff' (default: png)
- face_enhancement: Boolean to enable face detection and enhancement
- noise_reduction: 0-100, amount of noise removal
- sharpening: 0-100, output sharpening level
- compression_recovery: 0-100, recover compression artifacts
- detail_enhancement: 0-100, enhance fine details
- preserve_grain: Boolean to preserve film grain
- output_quality: 1-100 for jpg output (default: 95)
"""
db = SessionLocal()
try:
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
return
input_data = job.input_data
input_asset_ids = job.input_asset_ids
if not input_asset_ids:
raise ValueError("No input asset provided")
# Get input asset
input_asset = db.query(Asset).filter(Asset.id == input_asset_ids[0]).first()
if not input_asset:
raise ValueError("Input asset not found")
# Extract parameters
scale = input_data.get("scale", 2)
model = input_data.get("model", "auto")
output_format = input_data.get("output_format", "png")
face_enhancement = input_data.get("face_enhancement", False)
noise_reduction = input_data.get("noise_reduction")
sharpening = input_data.get("sharpening")
compression_recovery = input_data.get("compression_recovery")
detail_enhancement = input_data.get("detail_enhancement")
preserve_grain = input_data.get("preserve_grain", False)
output_quality = input_data.get("output_quality", 95)
job.progress = 10
job.api_provider = "topaz"
job.api_model = model
db.commit()
# Read input image
with open(input_asset.file_path, "rb") as f:
image_data = f.read()
# Ensure dimensions are set - extract if missing
if not input_asset.width or not input_asset.height:
from PIL import Image
import io
img = Image.open(io.BytesIO(image_data))
original_width, original_height = img.size
# Update asset with correct dimensions
input_asset.width = original_width
input_asset.height = original_height
db.commit()
else:
original_width = input_asset.width
original_height = input_asset.height
logger.info(f"Topaz Image Setup: {original_width}x{original_height}, Scale: {scale}, Model: {model}")
# Calculate output dimensions
output_width = original_width * scale
output_height = original_height * scale
job.progress = 20
db.commit()
# Build enhancement parameters matching simple working PHP version
enhance_params: Dict[str, Any] = {
"output_height": str(output_height),
"output_format": output_format if output_format in ["jpeg", "jpg", "png", "tiff", "tif"] else "png",
"crop_to_fill": "true" if input_data.get("crop_to_fill") else "false",
"face_enhancement": "true" if input_data.get("face_enhancement") else "false"
}
# Handle 'auto' model: simplest is to OMIT parameter if auto
if model.lower() != "auto":
enhance_params["model"] = model
# Strict PHP parity for reliability:
# process.php ONLY sent: image, output_height, output_format, crop_to_fill, face_enhancement, model.
# It did NOT send denoise, sharpen, creativity, etc.
# To avoid 422 errors or timeouts, we omit them until verified.
# Call Topaz API
async with httpx.AsyncClient(timeout=600) as client:
# Start async enhancement
response = await client.post(
"https://api.topazlabs.com/image/v1/enhance/async",
headers={
"X-API-Key": settings.topaz_api_key,
"Accept": "application/json"
},
files={"image": (input_asset.original_filename, image_data, input_asset.mime_type)},
data=enhance_params
)
response.raise_for_status()
result = response.json()
request_id = result.get("id") or result.get("requestId") or result.get("process_id")
job.progress = 40
job.api_request_id = request_id
db.commit()
# Poll for completion
output_url = None
polling_interval = 2
max_attempts = 180
status_data = {}
for i in range(max_attempts):
await asyncio.sleep(polling_interval)
try:
status_response = await client.get(
f"https://api.topazlabs.com/image/v1/status/{request_id}",
headers={"X-API-Key": settings.topaz_api_key}
)
if status_response.status_code != 200:
logger.warning(f"Topaz Status Check Failed: {status_response.text}")
continue
status_data = status_response.json()
status = status_data.get("status", "")
# Topaz uses different status values and field names
topaz_status = status.lower() if status else ""
# Log status occasionally
if i % 5 == 0:
logger.info(f"Topaz Job {request_id} Status: {topaz_status} (Attempt {i}/{max_attempts})")
if topaz_status == "completed":
logger.info(f"Topaz job completed, fetching download URL...")
break
elif topaz_status == "failed":
error_msg = status_data.get("error") or "Unknown error"
raise ValueError(f"Topaz enhancement failed: {error_msg}")
# Update progress
# Start at 40, go up to 85.
progress_increment = (85 - 40) / max_attempts
current_progress = 40 + (i * progress_increment)
if i % 2 == 0:
job.progress = int(current_progress)
db.commit()
except Exception as loop_e:
logger.error(f"Error in polling loop: {loop_e}")
continue
if topaz_status != "completed":
raise TimeoutError("Topaz upscaling timed out or did not complete.")
# Call the download endpoint to get the download URL
logger.info(f"Calling download endpoint for request_id: {request_id}")
download_response = await client.get(
f"https://api.topazlabs.com/image/v1/download/{request_id}",
headers={"X-API-Key": settings.topaz_api_key}
)
download_response.raise_for_status()
download_data = download_response.json()
output_url = download_data.get("download_url")
if not output_url:
raise ValueError(f"No download_url in response: {download_data}")
logger.info(f"Topaz download URL received: {output_url[:100] if output_url else 'None'}")
# Download result
img_response = await client.get(output_url)
upscaled_data = img_response.content
logger.info(f"Downloaded upscaled image: {len(upscaled_data)} bytes")
job.progress = 90
db.commit()
# Determine output extension
ext_map = {"png": ".png", "jpg": ".jpg", "jpeg": ".jpg", "tiff": ".tiff"}
ext = ext_map.get(output_format, ".png")
mime_map = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", "tiff": "image/tiff"}
mime = mime_map.get(output_format, "image/png")
# Save output
# Save output
base_name = os.path.splitext(input_asset.original_filename)[0]
# Clean base name: replace spaces with underscores
clean_base_name = base_name.replace(" ", "_")
# Clean model name: remove spaces, lowercase, etc - User wants specific format "Proteus" etc.
# actually user said "Model name in original case or uppercase".
# The model var comes from input_data.get("model").
# Let's keep it as is but replace spaces with underscores if any.
clean_model = model.replace(" ", "_")
filename = f"{clean_base_name}_{scale}X_{clean_model}{ext}"
# Ensure unique filename if needed in future, but for now this naming convention is requested
# If we really need uniqueness, maybe append a short hash, but user asked specifically for this format
# Let's check if we should add a short suffix to avoid collisions if they run it twice
# or rely on duplicate handling?
# User request: "like 2X-Auto or 2X-CGI or 2X-Standard-V2"
# So we prioritize readability.
# filename = f"upscaled_{scale}x_{model}_{uuid4()}{ext}" <-- OLD
storage_path = os.path.join(settings.storage_path, "images")
os.makedirs(storage_path, exist_ok=True)
file_path = os.path.join(storage_path, filename)
with open(file_path, "wb") as f:
f.write(upscaled_data)
# Create output asset
output_asset = Asset(
user_id=job.user_id,
project_id=job.project_id,
original_filename=filename,
stored_filename=filename,
file_path=file_path,
file_type="image",
mime_type=mime,
file_size_bytes=len(upscaled_data),
width=output_width,
height=output_height,
source_module="image_upscaler",
source_job_id=job.id,
parent_asset_id=input_asset.id,
asset_metadata={
"scale": scale,
"model": model,
"face_enhancement": face_enhancement,
"noise_reduction": noise_reduction,
"sharpening": sharpening,
"original_dimensions": f"{original_width}x{original_height}",
"output_dimensions": f"{output_width}x{output_height}"
}
)
db.add(output_asset)
db.commit()
db.refresh(output_asset)
job.output_asset_ids = [output_asset.id]
job.output_data = {"asset_id": str(output_asset.id), "file_path": file_path}
logger.info(f"✓ Topaz upscale completed: Asset {output_asset.id} created")
# Log Usage
try:
from app.utils.logging import log_model_usage
# Topaz typically charges per megapixel or image
# We seeded it as 'cost_per_image' ($0.20 buffer) for 'topaz' provider
# Calculate duration
duration_ms = 0
if job.started_at:
duration_ms = int((datetime.utcnow() - job.started_at).total_seconds() * 1000)
log_model_usage(
db=db,
job_id=str(job.id),
user_id=str(job.user_id),
module="image_upscaler",
action="upscale",
provider="topaz",
model=model, # e.g. "Proteus"
usage_stats={
"images": 1,
"processing_time_ms": duration_ms
},
request_metadata={
"input_file": input_asset.original_filename,
"scale": scale,
"original_dims": f"{original_width}x{original_height}",
"output_dims": f"{output_width}x{output_height}"
},
response_metadata={
"output_file": filename,
"topaz_request_id": request_id
}
)
except Exception as log_e:
logger.error(f"Failed to log usage stats: {log_e}")
job.progress = 100
job.status = "completed"
job.completed_at = datetime.utcnow()
db.commit()
except Exception as e:
job.status = "failed"
job.error_message = str(e)
db.commit()
finally:
db.close()
def get_available_models() -> Dict[str, Any]:
"""Get all available Topaz upscaling models and their capabilities"""
return TOPAZ_MODELS