160 lines
No EOL
5.1 KiB
Python
Executable file
160 lines
No EOL
5.1 KiB
Python
Executable file
import os
|
|
import shutil
|
|
import re
|
|
import logging
|
|
from PIL import Image # Added for PIL image handling
|
|
|
|
# Optional: If llama_parse does any async under the hood and you're in Jupyter
|
|
# or some environment with an existing event loop, nest_asyncio can help.
|
|
import nest_asyncio
|
|
nest_asyncio.apply()
|
|
|
|
from llama_parse import LlamaParse
|
|
|
|
os.environ['LLAMA_CLOUD_API_KEY'] = 'llx-BmHqsgAhrUWpNJDhl25POaxe0WvwwyiwHcRpACKbJch50Lu2'
|
|
|
|
def _parse_pdf(pdf_path: str, working_dir: str):
|
|
"""
|
|
Internal helper (synchronous):
|
|
1) Extract text from PDF.
|
|
2) Generate images for each page and store as PIL images.
|
|
3) Return PIL images and extracted text.
|
|
"""
|
|
|
|
# --- 1) TEXT PARSER (synchronous) ---
|
|
parser_text = LlamaParse(
|
|
result_type="text",
|
|
add_page_breaks=False,
|
|
parsing_instruction=(
|
|
"Extract all text from the PDF. Ignore extra metadata or slug information."
|
|
),
|
|
premium_mode=False,
|
|
)
|
|
|
|
documents = parser_text.load_data(pdf_path)
|
|
if not documents:
|
|
raise RuntimeError("No text found or PDF is empty")
|
|
|
|
extracted_text = "\n".join(doc.text for doc in documents)
|
|
|
|
text_file_path = os.path.join(working_dir, "extracted_text.txt")
|
|
with open(text_file_path, "w", encoding="utf-8") as f:
|
|
f.write(extracted_text)
|
|
|
|
# --- 2) IMAGE GENERATION AND PIL HANDLING ---
|
|
parser_multimodal = LlamaParse(
|
|
result_type="markdown",
|
|
add_page_breaks=False,
|
|
parsing_instruction=(
|
|
"Generate page images of the PDF document, no special text instructions."
|
|
),
|
|
use_vendor_multimodal_model=True,
|
|
vendor_multimodal_model_name="openai-gpt4o",
|
|
premium_mode=False,
|
|
)
|
|
|
|
md_json_objs = parser_multimodal.get_json_result(pdf_path)
|
|
image_dicts = parser_multimodal.get_images(md_json_objs, download_path=working_dir)
|
|
|
|
image_paths = [img_info["path"] for img_info in image_dicts]
|
|
pil_images = []
|
|
|
|
if image_paths:
|
|
# Rename first image
|
|
original_first_image_path = image_paths[0]
|
|
renamed_image_path = os.path.join(working_dir, "parsed_test_image.jpg")
|
|
os.rename(original_first_image_path, renamed_image_path)
|
|
image_paths[0] = renamed_image_path
|
|
|
|
# Load images as PIL objects
|
|
for img_path in image_paths:
|
|
try:
|
|
with Image.open(img_path) as img:
|
|
pil_images.append(img.copy()) # Copy the image to avoid file handle issues
|
|
except Exception as e:
|
|
logging.error(f"Error loading image {img_path}: {e}")
|
|
continue
|
|
|
|
return {
|
|
"text_file": text_file_path,
|
|
"images": pil_images,
|
|
"extracted_text": extracted_text
|
|
}
|
|
|
|
|
|
def run_check(config: dict, context: dict, check_id: str):
|
|
"""
|
|
QC check that handles PDF parsing with PIL image storage
|
|
"""
|
|
input_file = config.get("input_file")
|
|
working_dir = config.get("working_dir", "working")
|
|
|
|
if not input_file or not os.path.isfile(input_file):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"PDF file '{input_file}' not provided or does not exist."
|
|
}
|
|
|
|
try:
|
|
if os.path.exists(working_dir):
|
|
for item in os.listdir(working_dir):
|
|
item_path = os.path.join(working_dir, item)
|
|
if os.path.isfile(item_path) or os.path.islink(item_path):
|
|
os.remove(item_path)
|
|
else:
|
|
shutil.rmtree(item_path)
|
|
else:
|
|
os.makedirs(working_dir)
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Failed to prepare working directory '{working_dir}': {e}"
|
|
}
|
|
|
|
if not re.search(r"\.pdf$", input_file, re.IGNORECASE):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Input file '{input_file}' does not appear to be a PDF."
|
|
}
|
|
|
|
try:
|
|
filename = os.path.basename(input_file)
|
|
context[check_id] = {
|
|
"filename": filename,
|
|
"input_file_path": input_file
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Failed to write input filename to context: {e}"
|
|
}
|
|
|
|
try:
|
|
parse_results = _parse_pdf(input_file, working_dir)
|
|
|
|
context[check_id].update({
|
|
"extracted_text": parse_results["extracted_text"],
|
|
"parsed_image": parse_results["images"][0] if parse_results["images"] else None,
|
|
"all_images": parse_results["images"],
|
|
"text_file_path": parse_results["text_file"]
|
|
})
|
|
|
|
return {
|
|
"status": "passed",
|
|
"details": {
|
|
"message": "PDF parsed successfully.",
|
|
"working_dir": working_dir,
|
|
"text_file": parse_results["text_file"],
|
|
"image_count": len(parse_results["images"])
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
context[check_id].update({
|
|
"error": str(e),
|
|
"partial_extraction": True
|
|
})
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Failed to parse PDF '{input_file}': {e}"
|
|
} |