hm_qc/checks/HM_parse.py
2025-09-30 10:37:12 -05:00

160 lines
No EOL
5.1 KiB
Python
Executable file

import os
import shutil
import re
import logging
from PIL import Image # Added for PIL image handling
# Optional: If llama_parse does any async under the hood and you're in Jupyter
# or some environment with an existing event loop, nest_asyncio can help.
import nest_asyncio
nest_asyncio.apply()
from llama_parse import LlamaParse
os.environ['LLAMA_CLOUD_API_KEY'] = 'llx-BmHqsgAhrUWpNJDhl25POaxe0WvwwyiwHcRpACKbJch50Lu2'
def _parse_pdf(pdf_path: str, working_dir: str):
"""
Internal helper (synchronous):
1) Extract text from PDF.
2) Generate images for each page and store as PIL images.
3) Return PIL images and extracted text.
"""
# --- 1) TEXT PARSER (synchronous) ---
parser_text = LlamaParse(
result_type="text",
add_page_breaks=False,
parsing_instruction=(
"Extract all text from the PDF. Ignore extra metadata or slug information."
),
premium_mode=False,
)
documents = parser_text.load_data(pdf_path)
if not documents:
raise RuntimeError("No text found or PDF is empty")
extracted_text = "\n".join(doc.text for doc in documents)
text_file_path = os.path.join(working_dir, "extracted_text.txt")
with open(text_file_path, "w", encoding="utf-8") as f:
f.write(extracted_text)
# --- 2) IMAGE GENERATION AND PIL HANDLING ---
parser_multimodal = LlamaParse(
result_type="markdown",
add_page_breaks=False,
parsing_instruction=(
"Generate page images of the PDF document, no special text instructions."
),
use_vendor_multimodal_model=True,
vendor_multimodal_model_name="openai-gpt4o",
premium_mode=False,
)
md_json_objs = parser_multimodal.get_json_result(pdf_path)
image_dicts = parser_multimodal.get_images(md_json_objs, download_path=working_dir)
image_paths = [img_info["path"] for img_info in image_dicts]
pil_images = []
if image_paths:
# Rename first image
original_first_image_path = image_paths[0]
renamed_image_path = os.path.join(working_dir, "parsed_test_image.jpg")
os.rename(original_first_image_path, renamed_image_path)
image_paths[0] = renamed_image_path
# Load images as PIL objects
for img_path in image_paths:
try:
with Image.open(img_path) as img:
pil_images.append(img.copy()) # Copy the image to avoid file handle issues
except Exception as e:
logging.error(f"Error loading image {img_path}: {e}")
continue
return {
"text_file": text_file_path,
"images": pil_images,
"extracted_text": extracted_text
}
def run_check(config: dict, context: dict, check_id: str):
"""
QC check that handles PDF parsing with PIL image storage
"""
input_file = config.get("input_file")
working_dir = config.get("working_dir", "working")
if not input_file or not os.path.isfile(input_file):
return {
"status": "error",
"error_message": f"PDF file '{input_file}' not provided or does not exist."
}
try:
if os.path.exists(working_dir):
for item in os.listdir(working_dir):
item_path = os.path.join(working_dir, item)
if os.path.isfile(item_path) or os.path.islink(item_path):
os.remove(item_path)
else:
shutil.rmtree(item_path)
else:
os.makedirs(working_dir)
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to prepare working directory '{working_dir}': {e}"
}
if not re.search(r"\.pdf$", input_file, re.IGNORECASE):
return {
"status": "error",
"error_message": f"Input file '{input_file}' does not appear to be a PDF."
}
try:
filename = os.path.basename(input_file)
context[check_id] = {
"filename": filename,
"input_file_path": input_file
}
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to write input filename to context: {e}"
}
try:
parse_results = _parse_pdf(input_file, working_dir)
context[check_id].update({
"extracted_text": parse_results["extracted_text"],
"parsed_image": parse_results["images"][0] if parse_results["images"] else None,
"all_images": parse_results["images"],
"text_file_path": parse_results["text_file"]
})
return {
"status": "passed",
"details": {
"message": "PDF parsed successfully.",
"working_dir": working_dir,
"text_file": parse_results["text_file"],
"image_count": len(parse_results["images"])
}
}
except Exception as e:
context[check_id].update({
"error": str(e),
"partial_extraction": True
})
return {
"status": "error",
"error_message": f"Failed to parse PDF '{input_file}': {e}"
}