presenton/servers/fastapi/services/documents_loader.py

221 lines
7.5 KiB
Python

import asyncio
import logging
import os
import tempfile
from pathlib import Path
from typing import Any, List, Optional, Tuple
import pdfplumber
from fastapi import HTTPException
from constants.documents import (
IMAGE_EXTENSIONS,
OFFICE_EXTENSIONS,
PDF_EXTENSIONS,
TEXT_EXTENSIONS,
)
from services.document_conversion_service import (
DocumentConversionError,
DocumentConversionService,
)
from services.liteparse_service import LiteParseError, LiteParseService
from utils.ocr_language import presentation_language_to_ocr_code
# Optional fallback converter (primarily useful on Windows)
try:
from services.lightweight_document_service import DocumentService as DocumentServiceCls
except Exception:
DocumentServiceCls = None
LOGGER = logging.getLogger(__name__)
class DocumentsLoader:
DECOMPOSE_TIMEOUT_SECONDS = 600
def __init__(
self,
file_paths: List[str],
presentation_language: Optional[str] = None,
):
self._file_paths = file_paths
self._ocr_language = presentation_language_to_ocr_code(presentation_language)
self.liteparse_service = LiteParseService(
timeout_seconds=self.DECOMPOSE_TIMEOUT_SECONDS
)
self.document_conversion_service = DocumentConversionService()
self.document_service: Any = (
DocumentServiceCls() if DocumentServiceCls is not None else None
)
self._documents: List[str] = []
self._images: List[List[str]] = []
@property
def documents(self):
return self._documents
@property
def images(self):
return self._images
async def load_documents(
self,
temp_dir: Optional[str] = None,
load_text: bool = True,
load_images: bool = False,
):
"""If load_images is True, temp_dir must be provided"""
documents: List[str] = []
images: List[List[str]] = []
for file_path in self._file_paths:
if not os.path.exists(file_path):
raise HTTPException(
status_code=404, detail=f"File {file_path} not found"
)
document = ""
imgs: List[str] = []
extension = Path(file_path).suffix.lower()
LOGGER.info(
"[DocumentsLoader] Processing file=%s extension=%s",
file_path,
extension,
)
if extension in PDF_EXTENSIONS:
document, imgs = await self.load_pdf(
file_path, load_text, load_images, temp_dir
)
elif extension in TEXT_EXTENSIONS:
document = await self.load_text(file_path)
elif extension in OFFICE_EXTENSIONS:
document = await asyncio.to_thread(
self.load_office_document,
file_path,
temp_dir,
)
elif extension in IMAGE_EXTENSIONS:
document = await asyncio.to_thread(
self.load_image,
file_path,
temp_dir,
)
else:
document = await asyncio.to_thread(self._parse_with_liteparse, file_path)
documents.append(document)
images.append(imgs)
self._documents = documents
self._images = images
async def load_pdf(
self,
file_path: str,
load_text: bool,
load_images: bool,
temp_dir: Optional[str] = None,
) -> Tuple[str, List[str]]:
image_paths: List[str] = []
document: str = ""
if load_text:
document = await asyncio.to_thread(self._parse_with_liteparse, file_path)
if load_images:
if temp_dir is None:
raise HTTPException(
status_code=400,
detail="temp_dir is required when load_images is true",
)
image_paths = await self.get_page_images_from_pdf_async(file_path, temp_dir)
return document, image_paths
async def load_text(self, file_path: str) -> str:
with open(file_path, "r", encoding="utf-8") as file:
return await asyncio.to_thread(file.read)
def load_office_document(self, file_path: str, temp_dir: Optional[str] = None) -> str:
if temp_dir:
converted_path = self.document_conversion_service.convert_office_to_pdf(
file_path,
temp_dir,
timeout_seconds=self.DECOMPOSE_TIMEOUT_SECONDS,
)
return self._parse_with_liteparse(converted_path)
with tempfile.TemporaryDirectory(prefix="office-convert-") as conversion_dir:
converted_path = self.document_conversion_service.convert_office_to_pdf(
file_path,
conversion_dir,
timeout_seconds=self.DECOMPOSE_TIMEOUT_SECONDS,
)
return self._parse_with_liteparse(converted_path)
def load_image(self, file_path: str, temp_dir: Optional[str] = None) -> str:
if temp_dir:
converted_path = self.document_conversion_service.convert_image_to_png(
file_path,
temp_dir,
timeout_seconds=self.DECOMPOSE_TIMEOUT_SECONDS,
)
return self._parse_with_liteparse(converted_path)
with tempfile.TemporaryDirectory(prefix="image-convert-") as conversion_dir:
converted_path = self.document_conversion_service.convert_image_to_png(
file_path,
conversion_dir,
timeout_seconds=self.DECOMPOSE_TIMEOUT_SECONDS,
)
return self._parse_with_liteparse(converted_path)
def _parse_with_liteparse(self, file_path: str) -> str:
try:
LOGGER.info("[DocumentsLoader] LiteParse start file=%s", file_path)
return self.liteparse_service.parse_to_markdown(
file_path,
ocr_enabled=True,
ocr_language=self._ocr_language,
)
except (LiteParseError, DocumentConversionError) as exc:
LOGGER.warning(
"[DocumentsLoader] Primary parse failed file=%s error=%s",
file_path,
exc,
)
if self.document_service is not None:
try:
LOGGER.info("[DocumentsLoader] Trying fallback parser file=%s", file_path)
return self.document_service.parse_to_markdown(file_path)
except Exception:
LOGGER.exception(
"[DocumentsLoader] Fallback parser failed file=%s",
file_path,
)
pass
raise HTTPException(
status_code=500,
detail=f"Failed to parse document {os.path.basename(file_path)}: {exc}",
) from exc
@classmethod
def get_page_images_from_pdf(cls, file_path: str, temp_dir: str) -> List[str]:
with pdfplumber.open(file_path) as pdf:
images = []
for page in pdf.pages:
img = page.to_image(resolution=150)
image_path = os.path.join(temp_dir, f"page_{page.page_number}.png")
img.save(image_path)
images.append(image_path)
return images
@classmethod
async def get_page_images_from_pdf_async(cls, file_path: str, temp_dir: str):
return await asyncio.to_thread(
cls.get_page_images_from_pdf, file_path, temp_dir
)