chore(fastapi): remove langchain

This commit is contained in:
sauravniraula 2025-06-28 22:08:01 +05:45
parent 8ee5a4f53a
commit 99583238bf
No known key found for this signature in database
GPG key ID: 60FCC1B5A5E83326
9 changed files with 68 additions and 146 deletions

View file

@ -1,5 +1,3 @@
import asyncio
from typing import List
import uuid
from api.models import LogMetadata
from api.routers.presentation.models import (
@ -37,7 +35,7 @@ class DecomposeDocumentsHandler:
file_path = TEMP_FILE_SERVICE.create_temp_file_path(
f"{str(uuid.uuid4())}.txt", self.temp_dir
)
parsed_doc = parsed_doc.page_content.replace("<br>", "\n")
parsed_doc = parsed_doc.replace("<br>", "\n")
with open(file_path, "w") as text_file:
text_file.write(parsed_doc)
document_paths.append(file_path)

View file

@ -1,5 +1,4 @@
import uuid
import re
from api.models import LogMetadata
from api.routers.presentation.models import GenerateOutlinesRequest

View file

@ -27,10 +27,7 @@ from ppt_config_generator.ppt_outlines_generator import generate_ppt_content
from ppt_generator.generator import generate_presentation
from ppt_generator.models.llm_models import (
LLM_CONTENT_TYPE_MAPPING,
LLMPresentationModel,
)
from langchain_core.output_parsers import JsonOutputParser
from ppt_generator.models.slide_model import SlideModel

View file

@ -31,7 +31,6 @@ from ppt_generator.models.llm_models import (
)
from ppt_generator.models.slide_model import SlideModel
from api.services.instances import TEMP_FILE_SERVICE
from langchain_core.output_parsers import JsonOutputParser
from ppt_generator.slide_generator import get_slide_content_from_type_and_outline

View file

@ -1,11 +1,10 @@
import asyncio
import mimetypes
import os
from typing import List, Tuple
from fastapi import HTTPException
from langchain_community.document_loaders import TextLoader, PDFPlumberLoader
from langchain_core.documents import Document
from langchain_text_splitters import CharacterTextSplitter, MarkdownTextSplitter
from pptx import Presentation
import pdfplumber
from docx import Document as DocxDocument
from image_processor.utils import get_page_images_from_pdf_async
@ -30,23 +29,13 @@ class DocumentsLoader:
def __init__(self, documents: List[str]):
self._document_paths = documents
self._documents: List[Document] = []
self._splitted_documents: List[Document] = []
self._documents: List[str] = []
self._images: List[List[str]] = []
self._markdown_splitter = MarkdownTextSplitter(chunk_size=500, chunk_overlap=50)
self._text_splitter = CharacterTextSplitter(
separator="/n", chunk_size=500, chunk_overlap=50
)
@property
def documents(self):
return self._documents
@property
def splitted_documents(self):
return self._splitted_documents
@property
def images(self):
return self._images
@ -54,90 +43,69 @@ class DocumentsLoader:
async def load_documents(
self,
temp_dir: str,
split_documents: bool = False,
load_markdown: bool = True,
load_text: bool = True,
load_images: bool = False,
):
documents: List[Document] = []
documents: List[str] = []
images: List[str] = []
splitted_documents: List[Document] = []
for file_path in self._document_paths:
if not os.path.exists(file_path):
raise HTTPException(
status_code=404, detail=f"File {file_path} not found"
)
docs = []
document = ""
imgs = []
mime_type = mimetypes.guess_type(file_path)[0]
if mime_type in PDF_MIME_TYPES:
docs, imgs = await self.load_pdf(
file_path, load_markdown, load_images, temp_dir
document, imgs = await self.load_pdf(
file_path, load_text, load_images, temp_dir
)
elif mime_type in TEXT_MIME_TYPES:
docs = self.load_text(file_path)
document = await self.load_text(file_path)
elif mime_type in POWERPOINT_TYPES:
docs = self.load_powerpoint(file_path)
document = self.load_powerpoint(file_path)
elif mime_type in WORD_TYPES:
docs = self.load_msword(file_path)
document = self.load_msword(file_path)
documents.extend(docs)
documents.append(document)
images.append(imgs)
if split_documents:
splitted_documents.extend(self.split_documents(docs, mime_type))
self._documents = documents
self._splitted_documents = splitted_documents
self._images = images
def split_documents(self, documents: List[Document], mime_type):
return self._text_splitter.split_documents(documents)
def clip_longer_documents(self, documents: List[Document], clip_after: int = 1200):
for document in documents:
document.page_content = document.page_content[:clip_after]
return documents
async def load_pdf(
self,
file_path: str,
load_markdown: bool,
load_text: bool,
load_images: bool,
temp_dir: str,
) -> Tuple[List[Document], List[str]]:
) -> Tuple[str, List[str]]:
image_paths = []
documents: List[Document] = []
document: str = ""
if load_markdown:
loader = PDFPlumberLoader(file_path)
docs = loader.load()
pdf_document = Document(page_content="")
pdf_document.metadata = docs[0].metadata
for doc in docs:
pdf_document.page_content += doc.page_content
documents.append(pdf_document)
if load_text:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
document += await asyncio.to_thread(page.extract_text)
if load_images:
image_paths = await get_page_images_from_pdf_async(file_path, temp_dir)
return documents, image_paths
return document, image_paths
async def decompose_pdf_to_markdown(self, document_path: str) -> str:
raise Exception("Not Implemented")
async def load_text(self, file_path: str) -> str:
with open(file_path, "r") as file:
return await asyncio.to_thread(file.read)
def load_text(self, file_path: str) -> List[Document]:
loader = TextLoader(file_path)
return loader.load()
def load_msword(self, file_path: str) -> List[Document]:
def load_msword(self, file_path: str) -> str:
document = DocxDocument(file_path)
text = "\n".join([paragraph.text for paragraph in document.paragraphs])
return [Document(page_content=text)]
return text
def load_powerpoint(self, file_path: str) -> List[Document]:
def load_powerpoint(self, file_path: str) -> str:
presentation = Presentation(file_path)
extracted_text = ""
@ -149,4 +117,4 @@ class DocumentsLoader:
extracted_text += f"{paragraph.text}\n"
extracted_text += "\n"
extracted_text += "\n\n"
return [Document(page_content=extracted_text)]
return extracted_text

View file

@ -1,37 +1,34 @@
import json
import os
from langchain_core.vectorstores import InMemoryVectorStore
from fastembed import TextEmbedding
from langchain_core.documents import Document
from api.utils.utils import get_resource
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
# Pyinstaller
import fastembed
def get_icons_vectorstore():
vector_store_path = get_resource("assets/icons_vectorstore.json")
embeddings = FastEmbedEmbeddings()
embedding_model = TextEmbedding()
if os.path.exists(vector_store_path):
vector_store = InMemoryVectorStore.load(vector_store_path, embeddings)
return vector_store
vector_store = InMemoryVectorStore(embeddings)
vector_store.dump(vector_store_path)
# if os.path.exists(vector_store_path):
# vector_store = InMemoryVectorStore.load(vector_store_path, embeddings)
# return vector_store
with open(get_resource("assets/icons.json"), "r") as f:
icons = json.load(f)
icon_names = [icon["name"] for icon in icons["icons"]]
documents = []
bold_icon_names = []
for each in icon_names:
if each.split("-")[-1] == "bold":
documents.append(Document(id=each, page_content=each))
bold_icon_names.append(each)
vector_store.add_documents(documents)
vector_store.dump(vector_store_path)
return vector_store
documents_and_embeddings = {
"documents": bold_icon_names,
"embeddings": embedding_model.embed(bold_icon_names),
}
with open(vector_store_path, "w") as f:
json.dump(documents_and_embeddings, f)
return documents_and_embeddings

View file

@ -1,7 +1,5 @@
import asyncio
from typing import List
from langchain_core.documents import Document
from langchain_text_splitters import CharacterTextSplitter
from openai.types.chat.chat_completion import ChatCompletion
from api.utils.model_utils import get_llm_client, get_nano_model
@ -23,16 +21,13 @@ Maintain as much information as possible.
"""
async def generate_document_summary(documents: List[Document]):
async def generate_document_summary(documents: List[str]):
client = get_llm_client()
model = get_nano_model()
text_splitter = CharacterTextSplitter(chunk_size=200000, chunk_overlap=0)
coroutines = []
for document in documents:
text = document.page_content
truncated_text = text_splitter.split_text(text)[0]
truncated_text = document[:200000]
coroutine = client.chat.completions.create(
model=model,
messages=[

View file

@ -44,14 +44,6 @@ Jinja2==3.1.6
jiter==0.9.0
jsonpatch==1.33
jsonpointer==3.0.0
langchain==0.3.25
langchain-community==0.3.24
langchain-core==0.3.65
langchain-google-genai==2.1.4
langchain-ollama==0.3.3
langchain-openai==0.3.16
langchain-text-splitters==0.3.8
langsmith==0.3.45
loguru==0.7.3
lxml==5.4.0
markdown-it-py==3.0.0

View file

@ -1,58 +1,35 @@
import os
from typing import Optional
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI
# search_tool = DuckDuckGoSearchRun(
# api_wrapper=DuckDuckGoSearchAPIWrapper(max_results=50)
# )
prompt_template = ChatPromptTemplate.from_messages(
[
(
"system",
"""
Use provided prompt and search results to create an elaborate and up-to-date research report in mentioned language.
def get_prompt_template():
return [
{
"role": "system",
"content": """
Use provided prompt and search results to create an elaborate and up-to-date research report in mentioned language.
# Steps
1. Analyze the prompt and search results.
2. Extract topic of the report.
3. Generate a report in markdown format.
# Steps
1. Analyze the prompt and search results.
2. Extract topic of the report.
3. Generate a report in markdown format.
# Notes
- If language is not mentioned, use language from prompt.
- Format of report should be like *Research Report*.
- Ignore formatting if mentioned in prompt.
# Notes
- If language is not mentioned, use language from prompt.
- Format of report should be like *Research Report*.
- Ignore formatting if mentioned in prompt.
""",
),
(
"human",
"""
- Prompt: {prompt}
- Language: {language}
- Search Results: {search_results}
},
{
"role": "human",
"content": """
- Prompt: {prompt}
- Language: {language}
- Search Results: {search_results}
""",
),
},
]
)
async def get_report(query: str, language: Optional[str]):
model = (
ChatOpenAI(model="gpt-4.1-nano")
if os.getenv("LLM") == "openai"
else ChatGoogleGenerativeAI(model="gemini-2.0-flash")
)
chain = prompt_template | model
# search_results = await search_tool.ainvoke(query)
# response = await chain.ainvoke(
# {
# "prompt": query,
# "language": language,
# "search_results": search_results,
# }
# )
# return response.content
return "Research Report coming soon"