Parallelize LlamaParse document processing with asyncio.gather
Parse documents concurrently (up to 10 at a time via semaphore) instead of serially. Each coroutine uses its own DB session for per-document status updates, while a shared lock serializes job progress increments on the main session to avoid session-sharing issues. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
919e8185fa
commit
8a9a24ebe6
1 changed files with 70 additions and 36 deletions
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
|
|
@ -9,6 +10,8 @@ from app.services.storage_service import storage_service
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_CONCURRENT_PARSES = 10
|
||||
|
||||
# Distillation prompt templates per agent type
|
||||
DISTILLATION_PROMPTS = {
|
||||
"legal": """You are a compliance documentation specialist. Below is raw reference material about legal compliance, advertising standards, financial promotions, and disclaimers relevant to Barclays marketing materials.
|
||||
|
|
@ -112,49 +115,80 @@ class KnowledgeBaseService:
|
|||
await session.commit()
|
||||
return
|
||||
|
||||
# Parse each document
|
||||
# Parse documents in parallel (up to MAX_CONCURRENT_PARSES at a time)
|
||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PARSES)
|
||||
progress_lock = asyncio.Lock()
|
||||
parsed_count = 0
|
||||
combined_parts = []
|
||||
source_doc_ids = []
|
||||
|
||||
for doc in docs:
|
||||
try:
|
||||
# Load file data from storage
|
||||
file_data = await storage_service.get_file(doc.file_storage_key)
|
||||
if not file_data:
|
||||
logger.error(f"[KB_SERVICE] File not found: {doc.file_storage_key}")
|
||||
await repo.update_source_document_parse_status(
|
||||
doc.id, "error", parse_error="File not found in storage."
|
||||
)
|
||||
await session.commit()
|
||||
continue
|
||||
async def _parse_doc(doc):
|
||||
nonlocal parsed_count
|
||||
async with semaphore:
|
||||
try:
|
||||
async with async_session_factory() as doc_session:
|
||||
doc_repo = KnowledgeBaseRepository(doc_session)
|
||||
|
||||
# Parse with LlamaParse
|
||||
await repo.update_source_document_parse_status(doc.id, "parsing")
|
||||
await session.commit()
|
||||
# Load file data from storage
|
||||
file_data = await storage_service.get_file(doc.file_storage_key)
|
||||
if not file_data:
|
||||
logger.error(f"[KB_SERVICE] File not found: {doc.file_storage_key}")
|
||||
await doc_repo.update_source_document_parse_status(
|
||||
doc.id, "error", parse_error="File not found in storage."
|
||||
)
|
||||
await doc_session.commit()
|
||||
async with progress_lock:
|
||||
parsed_count += 1
|
||||
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
|
||||
await session.commit()
|
||||
return None
|
||||
|
||||
markdown = await self.llamaparse.parse_document(file_data, doc.filename)
|
||||
# Parse with LlamaParse
|
||||
await doc_repo.update_source_document_parse_status(doc.id, "parsing")
|
||||
await doc_session.commit()
|
||||
|
||||
# Update parse status
|
||||
await repo.update_source_document_parse_status(
|
||||
doc.id, "parsed", parsed_markdown=markdown
|
||||
)
|
||||
parsed_count += 1
|
||||
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
|
||||
await session.commit()
|
||||
markdown = await self.llamaparse.parse_document(file_data, doc.filename)
|
||||
|
||||
if markdown.strip():
|
||||
combined_parts.append(f"# {doc.filename}\n\n{markdown}")
|
||||
source_doc_ids.append(str(doc.id))
|
||||
async with async_session_factory() as doc_session:
|
||||
doc_repo = KnowledgeBaseRepository(doc_session)
|
||||
await doc_repo.update_source_document_parse_status(
|
||||
doc.id, "parsed", parsed_markdown=markdown
|
||||
)
|
||||
await doc_session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[KB_SERVICE] Error parsing {doc.filename}: {e}")
|
||||
await repo.update_source_document_parse_status(
|
||||
doc.id, "error", parse_error=str(e)
|
||||
)
|
||||
parsed_count += 1
|
||||
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
|
||||
await session.commit()
|
||||
async with progress_lock:
|
||||
parsed_count += 1
|
||||
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
|
||||
await session.commit()
|
||||
|
||||
if markdown.strip():
|
||||
return (doc.filename, str(doc.id), markdown)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[KB_SERVICE] Error parsing {doc.filename}: {e}")
|
||||
try:
|
||||
async with async_session_factory() as doc_session:
|
||||
doc_repo = KnowledgeBaseRepository(doc_session)
|
||||
await doc_repo.update_source_document_parse_status(
|
||||
doc.id, "error", parse_error=str(e)
|
||||
)
|
||||
await doc_session.commit()
|
||||
except Exception:
|
||||
logger.error(f"[KB_SERVICE] Failed to update error status for {doc.filename}")
|
||||
async with progress_lock:
|
||||
parsed_count += 1
|
||||
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
|
||||
await session.commit()
|
||||
return None
|
||||
|
||||
results = await asyncio.gather(*[_parse_doc(doc) for doc in docs])
|
||||
combined_parts = [
|
||||
f"# {filename}\n\n{markdown}"
|
||||
for filename, doc_id, markdown in (r for r in results if r is not None)
|
||||
]
|
||||
source_doc_ids = [
|
||||
doc_id
|
||||
for _, doc_id, _ in (r for r in results if r is not None)
|
||||
]
|
||||
|
||||
if not combined_parts:
|
||||
await repo.update_processing_job(
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue