- Accept .xlsx/.xls uploads alongside PDFs in campaigns module - New parse_campaign_excel() in services.py using openpyxl - Converts all sheets to structured text (headers + rows) for LLM use - Upload form now accepts both PDF and Excel files - Added openpyxl to requirements.txt Workflow: upload campaign presentation (PDF) + media plan (Excel with has_pricing checked) for the same campaign ID. The price check will use the Excel data to validate actual prices per country. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
256 lines
8.6 KiB
Python
256 lines
8.6 KiB
Python
"""
|
|
Campaign PDF Parsing Service.
|
|
|
|
Handles parsing of campaign presentation PDFs using LlamaParse,
|
|
extracting both text content and page images for QC reference.
|
|
"""
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_campaign_pdf(presentation_id: int, app=None):
|
|
"""
|
|
Parse a campaign presentation PDF and store extracted content.
|
|
|
|
Extracts text and page images from the PDF using LlamaParse.
|
|
Updates the CampaignPresentation record with parsed content.
|
|
|
|
Args:
|
|
presentation_id: ID of the CampaignPresentation record to parse
|
|
app: Flask app instance (required when called from background thread)
|
|
"""
|
|
from core.models.database import db
|
|
from core.models.campaign_presentation import CampaignPresentation
|
|
|
|
# This function expects to be called within an app context
|
|
# (the caller is responsible for wrapping with app.app_context())
|
|
presentation = CampaignPresentation.query.get(presentation_id)
|
|
if not presentation:
|
|
logger.error(f"CampaignPresentation {presentation_id} not found")
|
|
return
|
|
|
|
try:
|
|
# Update status to parsing
|
|
presentation.status = 'parsing'
|
|
db.session.commit()
|
|
|
|
pdf_path = presentation.pdf_path
|
|
if not os.path.isfile(pdf_path):
|
|
raise FileNotFoundError(f"PDF not found at {pdf_path}")
|
|
|
|
# Create page images directory
|
|
campaign_dir = os.path.dirname(pdf_path)
|
|
pages_dir = os.path.join(campaign_dir, 'pages')
|
|
os.makedirs(pages_dir, exist_ok=True)
|
|
|
|
# Parse PDF
|
|
logger.info(f"Parsing campaign PDF: {pdf_path}")
|
|
result = _parse_pdf(pdf_path, pages_dir)
|
|
|
|
# Update record with parsed content
|
|
presentation.parsed_content = result['extracted_text']
|
|
presentation.page_images_dir = pages_dir
|
|
presentation.parsed_at = datetime.utcnow()
|
|
presentation.status = 'ready'
|
|
presentation.error_message = None
|
|
db.session.commit()
|
|
|
|
logger.info(
|
|
f"Campaign PDF parsed successfully: {presentation.campaign_id} "
|
|
f"({len(result['extracted_text'])} chars, "
|
|
f"{result['image_count']} page images)"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse campaign PDF: {e}", exc_info=True)
|
|
presentation.status = 'error'
|
|
presentation.error_message = str(e)
|
|
db.session.commit()
|
|
|
|
|
|
def _parse_pdf(pdf_path: str, pages_dir: str) -> dict:
|
|
"""
|
|
Parse a PDF file extracting text and page images.
|
|
|
|
Uses the same LlamaParse approach as modules/hm_qc/checks/legacy/HM_parse.py
|
|
but stores page images to disk instead of keeping them in memory.
|
|
|
|
Args:
|
|
pdf_path: Path to the PDF file
|
|
pages_dir: Directory to store extracted page images
|
|
|
|
Returns:
|
|
dict with 'extracted_text', 'image_count'
|
|
"""
|
|
# Lazy imports — only needed at parse time
|
|
import nest_asyncio
|
|
nest_asyncio.apply()
|
|
from llama_parse import LlamaParse
|
|
from PIL import Image
|
|
|
|
# 1) Text extraction
|
|
parser_text = LlamaParse(
|
|
result_type="text",
|
|
add_page_breaks=False,
|
|
parsing_instruction=(
|
|
"Extract all text from the PDF. "
|
|
"Include all typography specifications, font names, sizes, positioning rules, "
|
|
"copy text, and any guidelines or instructions."
|
|
),
|
|
premium_mode=False,
|
|
)
|
|
|
|
documents = parser_text.load_data(pdf_path)
|
|
if not documents:
|
|
raise RuntimeError("No text found or PDF is empty")
|
|
|
|
extracted_text = "\n".join(doc.text for doc in documents)
|
|
|
|
# Save extracted text
|
|
text_file_path = os.path.join(pages_dir, "extracted_text.txt")
|
|
with open(text_file_path, "w", encoding="utf-8") as f:
|
|
f.write(extracted_text)
|
|
|
|
# 2) Page image extraction (multimodal)
|
|
image_count = 0
|
|
try:
|
|
parser_multimodal = LlamaParse(
|
|
result_type="markdown",
|
|
add_page_breaks=False,
|
|
parsing_instruction=(
|
|
"Generate page images of the PDF document, "
|
|
"including all visual layouts, mockups, and graphic examples."
|
|
),
|
|
use_vendor_multimodal_model=True,
|
|
vendor_multimodal_model_name="openai-gpt4o",
|
|
premium_mode=False,
|
|
)
|
|
|
|
md_json_objs = parser_multimodal.get_json_result(pdf_path)
|
|
image_dicts = parser_multimodal.get_images(md_json_objs, download_path=pages_dir)
|
|
|
|
# Rename images with sequential page numbers
|
|
for i, img_info in enumerate(image_dicts):
|
|
src_path = img_info["path"]
|
|
dst_path = os.path.join(pages_dir, f"page_{i+1:03d}.jpg")
|
|
try:
|
|
if os.path.exists(src_path):
|
|
# Ensure it's a valid image, then save as JPG
|
|
with Image.open(src_path) as img:
|
|
img.convert('RGB').save(dst_path, 'JPEG', quality=90)
|
|
# Remove original if different path
|
|
if os.path.abspath(src_path) != os.path.abspath(dst_path):
|
|
os.remove(src_path)
|
|
image_count += 1
|
|
except Exception as e:
|
|
logger.warning(f"Failed to process page image {i+1}: {e}")
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Multimodal parsing failed (text still available): {e}")
|
|
|
|
return {
|
|
'extracted_text': extracted_text,
|
|
'image_count': image_count
|
|
}
|
|
|
|
|
|
def parse_campaign_excel(presentation_id: int):
|
|
"""
|
|
Parse a campaign Excel file (media plan / price sheet) and store as text.
|
|
|
|
Extracts all sheets into a structured text format that can be used
|
|
by QC checks for price validation.
|
|
|
|
Args:
|
|
presentation_id: ID of the CampaignPresentation record to parse
|
|
"""
|
|
from core.models.database import db
|
|
from core.models.campaign_presentation import CampaignPresentation
|
|
import openpyxl
|
|
|
|
presentation = CampaignPresentation.query.get(presentation_id)
|
|
if not presentation:
|
|
logger.error(f"CampaignPresentation {presentation_id} not found")
|
|
return
|
|
|
|
try:
|
|
presentation.status = 'parsing'
|
|
db.session.commit()
|
|
|
|
file_path = presentation.pdf_path
|
|
if not os.path.isfile(file_path):
|
|
raise FileNotFoundError(f"File not found at {file_path}")
|
|
|
|
logger.info(f"Parsing campaign Excel: {file_path}")
|
|
|
|
wb = openpyxl.load_workbook(file_path, data_only=True)
|
|
all_text = []
|
|
|
|
for sheet_name in wb.sheetnames:
|
|
ws = wb[sheet_name]
|
|
if ws.max_row is None or ws.max_row == 0:
|
|
continue
|
|
|
|
sheet_text = [f"=== Sheet: {sheet_name} ==="]
|
|
|
|
# Get header row
|
|
headers = []
|
|
for row in ws.iter_rows(min_row=1, max_row=1, values_only=True):
|
|
headers = [str(c) if c else '' for c in row]
|
|
|
|
# Convert rows to text
|
|
for row_idx, row in enumerate(ws.iter_rows(min_row=1, max_row=ws.max_row, values_only=True), 1):
|
|
if not any(cell is not None for cell in row):
|
|
continue
|
|
cells = [str(c) if c is not None else '' for c in row]
|
|
# Format as "Header: Value" pairs for better LLM comprehension
|
|
if row_idx == 1:
|
|
sheet_text.append("Headers: " + " | ".join(cells))
|
|
else:
|
|
sheet_text.append(" | ".join(cells))
|
|
|
|
all_text.append("\n".join(sheet_text))
|
|
|
|
extracted_text = "\n\n".join(all_text)
|
|
|
|
presentation.parsed_content = extracted_text
|
|
presentation.parsed_at = datetime.utcnow()
|
|
presentation.status = 'ready'
|
|
presentation.error_message = None
|
|
db.session.commit()
|
|
|
|
logger.info(
|
|
f"Campaign Excel parsed: {presentation.campaign_id} "
|
|
f"({len(extracted_text)} chars, {len(wb.sheetnames)} sheets)"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse campaign Excel: {e}", exc_info=True)
|
|
presentation.status = 'error'
|
|
presentation.error_message = str(e)
|
|
db.session.commit()
|
|
|
|
|
|
def get_page_image_paths(page_images_dir: str) -> list:
|
|
"""
|
|
Get sorted list of page image paths from a campaign's pages directory.
|
|
|
|
Args:
|
|
page_images_dir: Path to the pages directory
|
|
|
|
Returns:
|
|
Sorted list of image file paths
|
|
"""
|
|
if not page_images_dir or not os.path.isdir(page_images_dir):
|
|
return []
|
|
|
|
image_files = []
|
|
for f in sorted(os.listdir(page_images_dir)):
|
|
if f.lower().endswith(('.jpg', '.jpeg', '.png')) and f.startswith('page_'):
|
|
image_files.append(os.path.join(page_images_dir, f))
|
|
|
|
return image_files
|