179 lines
6.1 KiB
Python
179 lines
6.1 KiB
Python
from typing import List
|
|
from fastapi import APIRouter, Depends, File, UploadFile, HTTPException, Query, Header
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlmodel import select
|
|
|
|
from models.image_prompt import ImagePrompt
|
|
from models.sql.image_asset import ImageAsset
|
|
from services.database import get_async_session
|
|
from services.image_generation_service import ImageGenerationService
|
|
from utils.asset_directory_utils import get_images_directory
|
|
from utils.get_env import get_pexels_api_key_env, get_pixabay_api_key_env
|
|
from utils.image_provider import get_selected_image_provider
|
|
from enums.image_provider import ImageProvider
|
|
import os
|
|
import uuid
|
|
from utils.file_utils import get_file_name_with_random_uuid
|
|
|
|
IMAGES_ROUTER = APIRouter(prefix="/images", tags=["Images"])
|
|
|
|
|
|
def _normalize_stock_provider(provider: str | None) -> str:
|
|
normalized_provider = (provider or "").strip().lower()
|
|
if normalized_provider in {"pixels", "pixel", "pexel"}:
|
|
normalized_provider = "pexels"
|
|
|
|
if normalized_provider:
|
|
if normalized_provider in {"pexels", "pixabay"}:
|
|
return normalized_provider
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="provider must be either 'pexels' or 'pixabay'",
|
|
)
|
|
|
|
selected_provider = get_selected_image_provider()
|
|
if selected_provider == ImageProvider.PIXABAY:
|
|
return "pixabay"
|
|
return "pexels"
|
|
|
|
|
|
@IMAGES_ROUTER.get("/search", response_model=List[str])
|
|
async def search_stock_images(
|
|
query: str,
|
|
limit: int = Query(default=12, ge=1, le=30),
|
|
provider: str | None = Query(default=None),
|
|
strict_api_key: bool = Query(default=False),
|
|
x_provider_api_key: str | None = Header(default=None, alias="X-Provider-Api-Key"),
|
|
):
|
|
normalized_provider = _normalize_stock_provider(provider)
|
|
|
|
image_generation_service = ImageGenerationService(get_images_directory())
|
|
|
|
if normalized_provider == "pexels":
|
|
api_key = (x_provider_api_key or get_pexels_api_key_env() or "").strip()
|
|
if strict_api_key and not api_key:
|
|
raise HTTPException(status_code=401, detail="Pexels API key is required")
|
|
|
|
# Pexels can return cached public responses for common queries.
|
|
# Use a nonce query in strict mode to force a real auth check.
|
|
if strict_api_key:
|
|
validation_query = f"__presenton_auth_check_{uuid.uuid4().hex}"
|
|
await image_generation_service.get_image_from_pexels(
|
|
validation_query,
|
|
api_key=api_key,
|
|
limit=1,
|
|
)
|
|
|
|
images = await image_generation_service.get_image_from_pexels(
|
|
query,
|
|
api_key=api_key,
|
|
limit=limit,
|
|
)
|
|
if isinstance(images, str):
|
|
return [images] if images else []
|
|
return images
|
|
|
|
api_key = (x_provider_api_key or get_pixabay_api_key_env() or "").strip()
|
|
if strict_api_key and not api_key:
|
|
raise HTTPException(status_code=401, detail="Pixabay API key is required")
|
|
|
|
images = await image_generation_service.get_image_from_pixabay(
|
|
query,
|
|
api_key=api_key,
|
|
limit=limit,
|
|
)
|
|
if isinstance(images, str):
|
|
return [images] if images else []
|
|
return images
|
|
|
|
|
|
@IMAGES_ROUTER.get("/generate")
|
|
async def generate_image(
|
|
prompt: str, sql_session: AsyncSession = Depends(get_async_session)
|
|
):
|
|
images_directory = get_images_directory()
|
|
image_prompt = ImagePrompt(prompt=prompt)
|
|
image_generation_service = ImageGenerationService(images_directory)
|
|
|
|
image = await image_generation_service.generate_image(image_prompt)
|
|
if not isinstance(image, ImageAsset):
|
|
return image
|
|
|
|
sql_session.add(image)
|
|
await sql_session.commit()
|
|
|
|
return image.path
|
|
|
|
|
|
@IMAGES_ROUTER.get("/generated", response_model=List[ImageAsset])
|
|
async def get_generated_images(sql_session: AsyncSession = Depends(get_async_session)):
|
|
try:
|
|
images_result = await sql_session.scalars(
|
|
select(ImageAsset)
|
|
.where(ImageAsset.is_uploaded == False)
|
|
.order_by(ImageAsset.created_at.desc())
|
|
)
|
|
return list(images_result)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Failed to retrieve generated images: {str(e)}"
|
|
)
|
|
|
|
|
|
@IMAGES_ROUTER.post("/upload")
|
|
async def upload_image(
|
|
file: UploadFile = File(...), sql_session: AsyncSession = Depends(get_async_session)
|
|
):
|
|
try:
|
|
new_filename = get_file_name_with_random_uuid(file)
|
|
image_path = os.path.join(
|
|
get_images_directory(), os.path.basename(new_filename)
|
|
)
|
|
|
|
with open(image_path, "wb") as f:
|
|
f.write(await file.read())
|
|
|
|
image_asset = ImageAsset(path=image_path, is_uploaded=True)
|
|
|
|
sql_session.add(image_asset)
|
|
await sql_session.commit()
|
|
# Refresh to ensure all defaults are loaded
|
|
await sql_session.refresh(image_asset)
|
|
|
|
return image_asset
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to upload image: {str(e)}")
|
|
|
|
|
|
@IMAGES_ROUTER.get("/uploaded", response_model=List[ImageAsset])
|
|
async def get_uploaded_images(sql_session: AsyncSession = Depends(get_async_session)):
|
|
try:
|
|
images_result = await sql_session.scalars(
|
|
select(ImageAsset)
|
|
.where(ImageAsset.is_uploaded == True)
|
|
.order_by(ImageAsset.created_at.desc())
|
|
)
|
|
return list(images_result)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Failed to retrieve uploaded images: {str(e)}"
|
|
)
|
|
|
|
|
|
@IMAGES_ROUTER.delete("/{id}", status_code=204)
|
|
async def delete_uploaded_image_by_id(
|
|
id: uuid.UUID, sql_session: AsyncSession = Depends(get_async_session)
|
|
):
|
|
try:
|
|
# Fetch the asset to get its actual file path
|
|
image = await sql_session.get(ImageAsset, id)
|
|
if not image:
|
|
raise HTTPException(status_code=404, detail="Image not found")
|
|
|
|
os.remove(image.path)
|
|
|
|
await sql_session.delete(image)
|
|
await sql_session.commit()
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to delete image: {str(e)}")
|