"markdown" format requires PyMuPDF >= 1.24.0, not available on server. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
207 lines
6.7 KiB
Python
207 lines
6.7 KiB
Python
"""Document parsing service.
|
|
|
|
Uses PyMuPDF for PDF, python-pptx for PPTX, python-docx for DOCX.
|
|
Optionally extracts text from embedded images via Gemini vision.
|
|
"""
|
|
import asyncio
|
|
import base64
|
|
import os
|
|
from typing import List, Optional
|
|
|
|
|
|
class DoclingService:
|
|
def parse_to_markdown(self, file_path: str) -> str:
|
|
"""Parse PDF or PPTX to markdown."""
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
if ext == ".pdf":
|
|
return _parse_pdf_with_pymupdf(file_path)
|
|
elif ext in (".pptx", ".ppt"):
|
|
return _parse_pptx_text(file_path)
|
|
return ""
|
|
|
|
def parse_docx_structured(self, file_path: str) -> str:
|
|
"""Parse DOCX with python-docx for better table/structure handling."""
|
|
return self._parse_docx_with_python_docx(file_path)
|
|
|
|
def _parse_docx_with_python_docx(self, file_path: str) -> str:
|
|
"""Extract text from DOCX using python-docx with proper table handling."""
|
|
from docx import Document
|
|
|
|
doc = Document(file_path)
|
|
parts: List[str] = []
|
|
|
|
for element in doc.element.body:
|
|
tag = element.tag.split("}")[-1] if "}" in element.tag else element.tag
|
|
|
|
if tag == "p":
|
|
para = _find_paragraph_by_element(doc, element)
|
|
if para is not None:
|
|
text = para.text.strip()
|
|
if text:
|
|
style_name = (para.style.name or "").lower() if para.style else ""
|
|
if "heading" in style_name:
|
|
level = 1
|
|
for ch in style_name:
|
|
if ch.isdigit():
|
|
level = int(ch)
|
|
break
|
|
parts.append(f"{'#' * level} {text}")
|
|
else:
|
|
parts.append(text)
|
|
|
|
elif tag == "tbl":
|
|
tbl = _find_table_by_element(doc, element)
|
|
if tbl is not None:
|
|
md_table = _table_to_markdown(tbl)
|
|
if md_table:
|
|
parts.append(md_table)
|
|
|
|
embedded_images = self._extract_docx_images(doc)
|
|
if embedded_images:
|
|
parts.append("\n## Embedded Images\n")
|
|
for desc in embedded_images:
|
|
parts.append(f"- {desc}")
|
|
|
|
return "\n\n".join(parts)
|
|
|
|
def _extract_docx_images(self, doc) -> List[str]:
|
|
descriptions = []
|
|
try:
|
|
for rel in doc.part.rels.values():
|
|
if "image" in rel.reltype:
|
|
descriptions.append("[Embedded image]")
|
|
except Exception:
|
|
pass
|
|
return descriptions
|
|
|
|
|
|
def _parse_pdf_with_pymupdf(file_path: str) -> str:
|
|
"""Extract text from PDF using PyMuPDF (no ML, no torch)."""
|
|
import fitz # PyMuPDF
|
|
|
|
parts = []
|
|
with fitz.open(file_path) as doc:
|
|
for page_num, page in enumerate(doc, 1):
|
|
text = page.get_text("text").strip()
|
|
if text:
|
|
parts.append(f"## Page {page_num}\n\n{text}")
|
|
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def _parse_pptx_text(file_path: str) -> str:
|
|
"""Extract text from PPTX using python-pptx (no ML, no torch)."""
|
|
from pptx import Presentation
|
|
|
|
prs = Presentation(file_path)
|
|
parts = []
|
|
|
|
for slide_num, slide in enumerate(prs.slides, 1):
|
|
slide_texts = []
|
|
for shape in slide.shapes:
|
|
if shape.has_text_frame:
|
|
text = shape.text_frame.text.strip()
|
|
if text:
|
|
slide_texts.append(text)
|
|
elif shape.has_table:
|
|
md = _pptx_table_to_markdown(shape.table)
|
|
if md:
|
|
slide_texts.append(md)
|
|
|
|
if slide_texts:
|
|
parts.append(f"## Slide {slide_num}\n\n" + "\n\n".join(slide_texts))
|
|
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def _pptx_table_to_markdown(table) -> str:
|
|
rows = [[cell.text.strip().replace("|", "\\|") for cell in row.cells] for row in table.rows]
|
|
if not rows:
|
|
return ""
|
|
header = rows[0]
|
|
lines = [
|
|
"| " + " | ".join(header) + " |",
|
|
"| " + " | ".join(["---"] * len(header)) + " |",
|
|
]
|
|
for row in rows[1:]:
|
|
padded = row + [""] * (len(header) - len(row))
|
|
lines.append("| " + " | ".join(padded[: len(header)]) + " |")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _find_paragraph_by_element(doc, element):
|
|
for para in doc.paragraphs:
|
|
if para._element is element:
|
|
return para
|
|
return None
|
|
|
|
|
|
def _find_table_by_element(doc, element):
|
|
for table in doc.tables:
|
|
if table._element is element:
|
|
return table
|
|
return None
|
|
|
|
|
|
def _table_to_markdown(table) -> str:
|
|
rows = []
|
|
for row in table.rows:
|
|
cells = [cell.text.strip().replace("|", "\\|") for cell in row.cells]
|
|
rows.append(cells)
|
|
|
|
if not rows:
|
|
return ""
|
|
|
|
clean_rows = []
|
|
for row_cells in rows:
|
|
clean = []
|
|
for i, cell_text in enumerate(row_cells):
|
|
if i > 0 and cell_text == row_cells[i - 1]:
|
|
clean.append("")
|
|
else:
|
|
clean.append(cell_text)
|
|
clean_rows.append(clean)
|
|
|
|
lines = []
|
|
if clean_rows:
|
|
header = clean_rows[0]
|
|
lines.append("| " + " | ".join(header) + " |")
|
|
lines.append("| " + " | ".join(["---"] * len(header)) + " |")
|
|
for row in clean_rows[1:]:
|
|
padded = row + [""] * (len(header) - len(row))
|
|
lines.append("| " + " | ".join(padded[: len(header)]) + " |")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def extract_text_from_image_via_vision(image_bytes: bytes, mime_type: str = "image/png") -> Optional[str]:
|
|
"""Use Gemini vision to extract text from an image."""
|
|
try:
|
|
import google.genai as genai
|
|
|
|
api_key = os.environ.get("GOOGLE_API_KEY")
|
|
if not api_key:
|
|
return None
|
|
|
|
client = genai.Client()
|
|
b64 = base64.b64encode(image_bytes).decode("utf-8")
|
|
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model="gemini-2.5-flash",
|
|
contents=[
|
|
{
|
|
"parts": [
|
|
{"text": "Extract all text from this image. Return only the extracted text, nothing else. If no text is found, return 'No text found'."},
|
|
{"inline_data": {"mime_type": mime_type, "data": b64}},
|
|
]
|
|
}
|
|
],
|
|
)
|
|
text = response.text.strip() if response.text else None
|
|
if text and text.lower() != "no text found":
|
|
return text
|
|
return None
|
|
except Exception as e:
|
|
print(f"[DoclingService] Vision text extraction failed: {e}")
|
|
return None
|