presenton/servers/fastapi/document_processor/loader.py
2025-05-10 19:57:24 +05:45

152 lines
5 KiB
Python

import mimetypes
import os
from typing import List, Tuple
from fastapi import HTTPException
from langchain_community.document_loaders import TextLoader, PDFPlumberLoader
from langchain_core.documents import Document
from langchain_text_splitters import CharacterTextSplitter, MarkdownTextSplitter
from pptx import Presentation
from docx import Document as DocxDocument
from image_processor.utils import get_page_images_from_pdf_async
PDF_MIME_TYPES = ["application/pdf"]
TEXT_MIME_TYPES = ["text/plain"]
POWERPOINT_TYPES = [
"application/vnd.openxmlformats-officedocument.presentationml.presentation"
]
WORD_TYPES = [
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
]
SPREADSHEET_TYPES = ["text/csv", "application/csv"]
UPLOAD_ACCEPTED_DOCUMENTS = (
PDF_MIME_TYPES + TEXT_MIME_TYPES + POWERPOINT_TYPES + WORD_TYPES
)
class DocumentsLoader:
def __init__(self, documents: List[str]):
self._document_paths = documents
self._documents: List[Document] = []
self._splitted_documents: List[Document] = []
self._images: List[List[str]] = []
self._markdown_splitter = MarkdownTextSplitter(chunk_size=500, chunk_overlap=50)
self._text_splitter = CharacterTextSplitter(
separator="/n", chunk_size=500, chunk_overlap=50
)
@property
def documents(self):
return self._documents
@property
def splitted_documents(self):
return self._splitted_documents
@property
def images(self):
return self._images
async def load_documents(
self,
temp_dir: str,
split_documents: bool = False,
load_markdown: bool = True,
load_images: bool = False,
):
documents: List[Document] = []
images: List[str] = []
splitted_documents: List[Document] = []
for file_path in self._document_paths:
if not os.path.exists(file_path):
raise HTTPException(
status_code=404, detail=f"File {file_path} not found"
)
docs = []
imgs = []
mime_type = mimetypes.guess_type(file_path)[0]
if mime_type in PDF_MIME_TYPES:
docs, imgs = await self.load_pdf(
file_path, load_markdown, load_images, temp_dir
)
elif mime_type in TEXT_MIME_TYPES:
docs = self.load_text(file_path)
elif mime_type in POWERPOINT_TYPES:
docs = self.load_powerpoint(file_path)
elif mime_type in WORD_TYPES:
docs = self.load_msword(file_path)
documents.extend(docs)
images.append(imgs)
if split_documents:
splitted_documents.extend(self.split_documents(docs, mime_type))
self._documents = documents
self._splitted_documents = splitted_documents
self._images = images
def split_documents(self, documents: List[Document], mime_type):
return self._text_splitter.split_documents(documents)
def clip_longer_documents(self, documents: List[Document], clip_after: int = 1200):
for document in documents:
document.page_content = document.page_content[:clip_after]
return documents
async def load_pdf(
self,
file_path: str,
load_markdown: bool,
load_images: bool,
temp_dir: str,
) -> Tuple[List[Document], List[str]]:
image_paths = []
documents: List[Document] = []
if load_markdown:
loader = PDFPlumberLoader(file_path)
docs = loader.load()
pdf_document = Document(page_content="")
pdf_document.metadata = docs[0].metadata
for doc in docs:
pdf_document.page_content += doc.page_content
documents.append(pdf_document)
if load_images:
image_paths = await get_page_images_from_pdf_async(file_path, temp_dir)
return documents, image_paths
async def decompose_pdf_to_markdown(self, document_path: str) -> str:
raise Exception("Not Implemented")
def load_text(self, file_path: str) -> List[Document]:
loader = TextLoader(file_path)
return loader.load()
def load_msword(self, file_path: str) -> List[Document]:
document = DocxDocument(file_path)
text = "\n".join([paragraph.text for paragraph in document.paragraphs])
return [Document(page_content=text)]
def load_powerpoint(self, file_path: str) -> List[Document]:
presentation = Presentation(file_path)
extracted_text = ""
for index, slide in enumerate(presentation.slides):
extracted_text += f"# Slide {index + 1}\n"
for shape in slide.shapes:
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
extracted_text += f"{paragraph.text}\n"
extracted_text += "\n"
extracted_text += "\n\n"
return [Document(page_content=extracted_text)]