diff --git a/backend/app/services/knowledge_base_service.py b/backend/app/services/knowledge_base_service.py index f48ad71..cf4a859 100644 --- a/backend/app/services/knowledge_base_service.py +++ b/backend/app/services/knowledge_base_service.py @@ -1,3 +1,4 @@ +import asyncio import logging import uuid from datetime import datetime, timezone @@ -9,6 +10,8 @@ from app.services.storage_service import storage_service logger = logging.getLogger(__name__) +MAX_CONCURRENT_PARSES = 10 + # Distillation prompt templates per agent type DISTILLATION_PROMPTS = { "legal": """You are a compliance documentation specialist. Below is raw reference material about legal compliance, advertising standards, financial promotions, and disclaimers relevant to Barclays marketing materials. @@ -112,49 +115,80 @@ class KnowledgeBaseService: await session.commit() return - # Parse each document + # Parse documents in parallel (up to MAX_CONCURRENT_PARSES at a time) + semaphore = asyncio.Semaphore(MAX_CONCURRENT_PARSES) + progress_lock = asyncio.Lock() parsed_count = 0 - combined_parts = [] - source_doc_ids = [] - for doc in docs: - try: - # Load file data from storage - file_data = await storage_service.get_file(doc.file_storage_key) - if not file_data: - logger.error(f"[KB_SERVICE] File not found: {doc.file_storage_key}") - await repo.update_source_document_parse_status( - doc.id, "error", parse_error="File not found in storage." - ) - await session.commit() - continue + async def _parse_doc(doc): + nonlocal parsed_count + async with semaphore: + try: + async with async_session_factory() as doc_session: + doc_repo = KnowledgeBaseRepository(doc_session) - # Parse with LlamaParse - await repo.update_source_document_parse_status(doc.id, "parsing") - await session.commit() + # Load file data from storage + file_data = await storage_service.get_file(doc.file_storage_key) + if not file_data: + logger.error(f"[KB_SERVICE] File not found: {doc.file_storage_key}") + await doc_repo.update_source_document_parse_status( + doc.id, "error", parse_error="File not found in storage." + ) + await doc_session.commit() + async with progress_lock: + parsed_count += 1 + await repo.update_processing_job(job_id, parsed_documents=parsed_count) + await session.commit() + return None - markdown = await self.llamaparse.parse_document(file_data, doc.filename) + # Parse with LlamaParse + await doc_repo.update_source_document_parse_status(doc.id, "parsing") + await doc_session.commit() - # Update parse status - await repo.update_source_document_parse_status( - doc.id, "parsed", parsed_markdown=markdown - ) - parsed_count += 1 - await repo.update_processing_job(job_id, parsed_documents=parsed_count) - await session.commit() + markdown = await self.llamaparse.parse_document(file_data, doc.filename) - if markdown.strip(): - combined_parts.append(f"# {doc.filename}\n\n{markdown}") - source_doc_ids.append(str(doc.id)) + async with async_session_factory() as doc_session: + doc_repo = KnowledgeBaseRepository(doc_session) + await doc_repo.update_source_document_parse_status( + doc.id, "parsed", parsed_markdown=markdown + ) + await doc_session.commit() - except Exception as e: - logger.error(f"[KB_SERVICE] Error parsing {doc.filename}: {e}") - await repo.update_source_document_parse_status( - doc.id, "error", parse_error=str(e) - ) - parsed_count += 1 - await repo.update_processing_job(job_id, parsed_documents=parsed_count) - await session.commit() + async with progress_lock: + parsed_count += 1 + await repo.update_processing_job(job_id, parsed_documents=parsed_count) + await session.commit() + + if markdown.strip(): + return (doc.filename, str(doc.id), markdown) + return None + + except Exception as e: + logger.error(f"[KB_SERVICE] Error parsing {doc.filename}: {e}") + try: + async with async_session_factory() as doc_session: + doc_repo = KnowledgeBaseRepository(doc_session) + await doc_repo.update_source_document_parse_status( + doc.id, "error", parse_error=str(e) + ) + await doc_session.commit() + except Exception: + logger.error(f"[KB_SERVICE] Failed to update error status for {doc.filename}") + async with progress_lock: + parsed_count += 1 + await repo.update_processing_job(job_id, parsed_documents=parsed_count) + await session.commit() + return None + + results = await asyncio.gather(*[_parse_doc(doc) for doc in docs]) + combined_parts = [ + f"# {filename}\n\n{markdown}" + for filename, doc_id, markdown in (r for r in results if r is not None) + ] + source_doc_ids = [ + doc_id + for _, doc_id, _ in (r for r in results if r is not None) + ] if not combined_parts: await repo.update_processing_job(