ppt-tool/backend/services/docling_service.py
Vadym Samoilenko 6157fcbc4e Fix PyMuPDF: replace get_text("markdown") with get_text("text")
"markdown" format requires PyMuPDF >= 1.24.0, not available on server.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:59:22 +00:00

207 lines
6.7 KiB
Python

"""Document parsing service.
Uses PyMuPDF for PDF, python-pptx for PPTX, python-docx for DOCX.
Optionally extracts text from embedded images via Gemini vision.
"""
import asyncio
import base64
import os
from typing import List, Optional
class DoclingService:
def parse_to_markdown(self, file_path: str) -> str:
"""Parse PDF or PPTX to markdown."""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
return _parse_pdf_with_pymupdf(file_path)
elif ext in (".pptx", ".ppt"):
return _parse_pptx_text(file_path)
return ""
def parse_docx_structured(self, file_path: str) -> str:
"""Parse DOCX with python-docx for better table/structure handling."""
return self._parse_docx_with_python_docx(file_path)
def _parse_docx_with_python_docx(self, file_path: str) -> str:
"""Extract text from DOCX using python-docx with proper table handling."""
from docx import Document
doc = Document(file_path)
parts: List[str] = []
for element in doc.element.body:
tag = element.tag.split("}")[-1] if "}" in element.tag else element.tag
if tag == "p":
para = _find_paragraph_by_element(doc, element)
if para is not None:
text = para.text.strip()
if text:
style_name = (para.style.name or "").lower() if para.style else ""
if "heading" in style_name:
level = 1
for ch in style_name:
if ch.isdigit():
level = int(ch)
break
parts.append(f"{'#' * level} {text}")
else:
parts.append(text)
elif tag == "tbl":
tbl = _find_table_by_element(doc, element)
if tbl is not None:
md_table = _table_to_markdown(tbl)
if md_table:
parts.append(md_table)
embedded_images = self._extract_docx_images(doc)
if embedded_images:
parts.append("\n## Embedded Images\n")
for desc in embedded_images:
parts.append(f"- {desc}")
return "\n\n".join(parts)
def _extract_docx_images(self, doc) -> List[str]:
descriptions = []
try:
for rel in doc.part.rels.values():
if "image" in rel.reltype:
descriptions.append("[Embedded image]")
except Exception:
pass
return descriptions
def _parse_pdf_with_pymupdf(file_path: str) -> str:
"""Extract text from PDF using PyMuPDF (no ML, no torch)."""
import fitz # PyMuPDF
parts = []
with fitz.open(file_path) as doc:
for page_num, page in enumerate(doc, 1):
text = page.get_text("text").strip()
if text:
parts.append(f"## Page {page_num}\n\n{text}")
return "\n\n".join(parts)
def _parse_pptx_text(file_path: str) -> str:
"""Extract text from PPTX using python-pptx (no ML, no torch)."""
from pptx import Presentation
prs = Presentation(file_path)
parts = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_texts = []
for shape in slide.shapes:
if shape.has_text_frame:
text = shape.text_frame.text.strip()
if text:
slide_texts.append(text)
elif shape.has_table:
md = _pptx_table_to_markdown(shape.table)
if md:
slide_texts.append(md)
if slide_texts:
parts.append(f"## Slide {slide_num}\n\n" + "\n\n".join(slide_texts))
return "\n\n".join(parts)
def _pptx_table_to_markdown(table) -> str:
rows = [[cell.text.strip().replace("|", "\\|") for cell in row.cells] for row in table.rows]
if not rows:
return ""
header = rows[0]
lines = [
"| " + " | ".join(header) + " |",
"| " + " | ".join(["---"] * len(header)) + " |",
]
for row in rows[1:]:
padded = row + [""] * (len(header) - len(row))
lines.append("| " + " | ".join(padded[: len(header)]) + " |")
return "\n".join(lines)
def _find_paragraph_by_element(doc, element):
for para in doc.paragraphs:
if para._element is element:
return para
return None
def _find_table_by_element(doc, element):
for table in doc.tables:
if table._element is element:
return table
return None
def _table_to_markdown(table) -> str:
rows = []
for row in table.rows:
cells = [cell.text.strip().replace("|", "\\|") for cell in row.cells]
rows.append(cells)
if not rows:
return ""
clean_rows = []
for row_cells in rows:
clean = []
for i, cell_text in enumerate(row_cells):
if i > 0 and cell_text == row_cells[i - 1]:
clean.append("")
else:
clean.append(cell_text)
clean_rows.append(clean)
lines = []
if clean_rows:
header = clean_rows[0]
lines.append("| " + " | ".join(header) + " |")
lines.append("| " + " | ".join(["---"] * len(header)) + " |")
for row in clean_rows[1:]:
padded = row + [""] * (len(header) - len(row))
lines.append("| " + " | ".join(padded[: len(header)]) + " |")
return "\n".join(lines)
async def extract_text_from_image_via_vision(image_bytes: bytes, mime_type: str = "image/png") -> Optional[str]:
"""Use Gemini vision to extract text from an image."""
try:
import google.genai as genai
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
return None
client = genai.Client()
b64 = base64.b64encode(image_bytes).decode("utf-8")
response = await asyncio.to_thread(
client.models.generate_content,
model="gemini-2.5-flash",
contents=[
{
"parts": [
{"text": "Extract all text from this image. Return only the extracted text, nothing else. If no text is found, return 'No text found'."},
{"inline_data": {"mime_type": mime_type, "data": b64}},
]
}
],
)
text = response.text.strip() if response.text else None
if text and text.lower() != "no text found":
return text
return None
except Exception as e:
print(f"[DoclingService] Vision text extraction failed: {e}")
return None