feat(fastapi): adds icons finder service to search icons using fastembed-vectorstore

This commit is contained in:
sauravniraula 2025-07-15 23:56:26 +05:45
parent a99db53bba
commit ea3ff5be47
No known key found for this signature in database
GPG key ID: 60FCC1B5A5E83326
9 changed files with 577498 additions and 577389 deletions

View file

@ -7,5 +7,6 @@ from api.lifespan import app_lifespan
APP = FastAPI(lifespan=app_lifespan)
APP.mount("/static", StaticFiles(directory="static"), name="static")
# APP.mount("/static/app-data", StaticFiles(directory=get_app_data_directory_env()))
APP.include_router(API_V1_PPT_ROUTER)

View file

@ -59,4 +59,4 @@ async def stream_outlines(presentation_id: str):
key="presentation", value=presentation.model_dump_json()
).to_string()
return StreamingResponse(inner())
return StreamingResponse(inner(), media_type="text/event-stream")

View file

@ -19,7 +19,9 @@ from utils.llm_calls.generate_document_summary import generate_document_summary
from utils.llm_calls.generate_presentation_structure import (
generate_presentation_structure,
)
from utils.llm_calls.generate_slide_content import get_slide_content_from_type_and_outline
from utils.llm_calls.generate_slide_content import (
get_slide_content_from_type_and_outline,
)
PRESENTATION_ROUTER = APIRouter(prefix="/presentation", tags=["Presentation"])
@ -171,4 +173,4 @@ async def stream_presentation(presentation_id: str):
value=response.model_dump(mode="json"),
).to_string()
return StreamingResponse(inner())
return StreamingResponse(inner(), media_type="text/event-stream")

File diff suppressed because it is too large Load diff

View file

@ -16,6 +16,7 @@ email_validator==2.2.0
fastapi==0.116.1
fastapi-cli==0.0.8
fastapi-cloud-cli==0.1.4
fastembed_vectorstore==0.1.6
frozenlist==1.7.0
google-auth==2.40.3
google-genai==1.25.0
@ -33,6 +34,7 @@ MarkupSafe==3.0.2
mdurl==0.1.2
multidict==6.6.3
openai==1.95.1
pathvalidate==3.3.1
pdfminer.six==20250506
pdfplumber==0.11.7
pillow==11.3.0

View file

@ -0,0 +1,32 @@
import json
import os
from fastembed_vectorstore import FastembedVectorstore, FastembedEmbeddingModel
class IconFinderService:
def __init__(self):
self.vector_store = self.get_icons_vectorstore()
def get_icons_vectorstore(self):
vector_store_path = "assets/icons_vectorstore.json"
embedding_model = FastembedEmbeddingModel.BGESmallENV15
if os.path.exists(vector_store_path):
return FastembedVectorstore.load(embedding_model, vector_store_path)
vector_store = FastembedVectorstore(embedding_model)
with open("assets/icons.json", "r") as f:
icons = json.load(f)
documents = []
for each in icons["icons"]:
if each["name"].split("-")[-1] == "bold":
documents.append(f"{each['name']}||{each['tags']}")
vector_store.embed_documents(documents)
vector_store.save(vector_store_path)
return vector_store
def search_icons(self, query: str, k: int = 1):
result = self.vector_store.search(query, k)
return [ f"/static/icons/bold/{result[0].split("||")[0]}.png" for result in result]

View file

@ -5,13 +5,13 @@ import aiohttp
from google import genai
from google.genai.types import GenerateContentConfig
from models.image_prompt import ImagePrompt
from utils.download_helpers import download_file
from utils.get_env import get_pexels_api_key_env
from utils.llm_provider import (
get_llm_client,
is_google_selected,
is_openai_selected,
)
from utils.randomizers import get_random_uuid
class ImageGenerationService:
@ -35,7 +35,7 @@ class ImageGenerationService:
async def generate_image(self, prompt: ImagePrompt) -> str:
if not self.image_gen_func:
print("No image generation function found. Using placeholder image.")
return "static/images/placeholder.jpg"
return "/static/images/placeholder.jpg"
image_prompt = prompt.get_image_prompt(not self.use_pexels)
print(f"Request - Generating Image for {image_prompt}")
@ -48,7 +48,7 @@ class ImageGenerationService:
except Exception as e:
print(f"Error generating image: {e}")
return "static/images/placeholder.jpg"
return "/static/images/placeholder.jpg"
async def generate_image_openai(prompt: str, output_directory: str) -> str:
client = get_llm_client()
@ -60,13 +60,7 @@ class ImageGenerationService:
size="1024x1024",
)
image_url = result.data[0].url
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
image_bytes = await response.read()
image_path = os.path.join(output_directory, f"{get_random_uuid()}.jpg")
with open(image_path, "wb") as f:
f.write(image_bytes)
return image_path
return await download_file(image_url, output_directory)
async def generate_image_google(prompt: str, output_directory: str) -> str:
client = genai.Client()
@ -95,6 +89,4 @@ class ImageGenerationService:
)
data = await response.json()
image_url = data["photos"][0]["src"]["large"]
image_path = os.path.join(output_directory, f"{str(uuid.uuid4())}.jpg")
await download_file(image_url, image_path)
return image_path
return await download_file(image_url, output_directory)

View file

@ -0,0 +1,80 @@
import asyncio
import os
import mimetypes
from typing import List, Optional
from urllib.parse import urlparse
import aiohttp
from utils.randomizers import get_random_uuid
async def download_file(
url: str, save_directory: str, headers: Optional[dict] = None
) -> Optional[str]:
try:
os.makedirs(save_directory, exist_ok=True)
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
if not filename or "." not in filename:
async with aiohttp.ClientSession() as session:
async with session.head(url, headers=headers) as response:
if response.status == 200:
content_disposition = response.headers.get(
"Content-Disposition", ""
)
if "filename=" in content_disposition:
filename = content_disposition.split("filename=")[1].strip(
"\"'"
)
else:
content_type = response.headers.get("Content-Type", "")
if content_type:
extension = mimetypes.guess_extension(
content_type.split(";")[0]
)
if extension:
filename = f"{get_random_uuid()}{extension}"
filename = filename or get_random_uuid()
save_path = os.path.join(save_directory, filename)
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
with open(save_path, "wb") as file:
async for chunk in response.content.iter_chunked(8192):
file.write(chunk)
print(f"File downloaded successfully: {save_path}")
return save_path
else:
print(f"Failed to download file. HTTP status: {response.status}")
return None
except Exception as e:
print(f"Error downloading file from {url}: {e}")
return None
async def download_files(
urls: List[str], save_directory: str, headers: Optional[dict] = None
) -> List[Optional[str]]:
print(f"Starting download of {len(urls)} files to {save_directory}")
coroutines = [download_file(url, save_directory, headers) for url in urls]
results = await asyncio.gather(*coroutines, return_exceptions=True)
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Exception during download of {urls[i]}: {result}")
final_results.append(None)
else:
final_results.append(result)
successful_downloads = sum(1 for result in final_results if result is not None)
print(
f"Download completed: {successful_downloads}/{len(urls)} files downloaded successfully"
)
return final_results

View file

@ -53,7 +53,7 @@ const startServers = async () => {
const fastApiProcess = spawn(
"python",
[isDev ? "server_autoreload.py" : "server.py", "--port", fastapiPort.toString()],
["server.py", "--port", fastapiPort.toString(), "--reload", isDev],
{
cwd: fastapiDir,
stdio: "inherit",