diff --git a/backend/app/main.py b/backend/app/main.py index 4cd1595..79de12e 100755 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -83,6 +83,32 @@ async def lifespan(app: FastAPI): knowledge_base_service = KnowledgeBaseService(llamaparse_service, gemini_service, reference_docs) app.state.knowledge_base_service = knowledge_base_service print("Knowledge Base pipeline ready!") + + if db_available: + import asyncio as _asyncio + from app.models.database import async_session_factory as _session_factory + from app.repositories.knowledge_base_repository import KnowledgeBaseRepository as _KBRepo + try: + async with _session_factory() as _session: + _repo = _KBRepo(_session) + _stale = await _repo.get_all_stale_active_jobs() + if _stale: + print(f"Resuming {len(_stale)} interrupted processing job(s)...") + for _job in _stale: + _kb = await _repo.get_knowledge_base(_job.knowledge_base_id) + if _kb: + _asyncio.create_task( + knowledge_base_service.process_documents( + kb_id=_job.knowledge_base_id, + job_id=_job.id, + agent_key=_kb.agent_key, + user_id=_job.triggered_by_id, + user_name=_job.triggered_by_name, + ) + ) + print(f" Resumed: {_kb.display_name} (job {_job.id}, was {_job.status})") + except Exception as _e: + logger.warning(f"Startup job recovery failed: {_e}") else: print("LLAMA_CLOUD_API_KEY not set - Knowledge Base processing pipeline disabled") diff --git a/backend/app/repositories/knowledge_base_repository.py b/backend/app/repositories/knowledge_base_repository.py index cef9787..8402d9e 100644 --- a/backend/app/repositories/knowledge_base_repository.py +++ b/backend/app/repositories/knowledge_base_repository.py @@ -314,3 +314,29 @@ class KnowledgeBaseRepository: ) result = await self.session.execute(query) return result.scalar() > 0 + + async def get_all_stale_active_jobs(self, stale_minutes: int = 5) -> list[ProcessingJob]: + """Get all active jobs across all KBs older than stale_minutes (for startup recovery).""" + cutoff = datetime.now(timezone.utc) - timedelta(minutes=stale_minutes) + active_statuses = ["pending", "parsing_documents", "distilling"] + query = ( + select(ProcessingJob) + .where(ProcessingJob.status.in_(active_statuses)) + .where(ProcessingJob.created_at < cutoff) + ) + result = await self.session.execute(query) + return list(result.scalars().all()) + + async def reset_stuck_parsing_docs(self, kb_id: uuid.UUID) -> int: + """Reset docs stuck at 'parsing' back to 'pending' so they can be re-parsed.""" + query = ( + select(SourceDocument) + .where(SourceDocument.knowledge_base_id == kb_id) + .where(SourceDocument.parse_status == "parsing") + ) + result = await self.session.execute(query) + docs = result.scalars().all() + for doc in docs: + doc.parse_status = "pending" + await self.session.flush() + return len(docs) diff --git a/backend/app/services/knowledge_base_service.py b/backend/app/services/knowledge_base_service.py index 7863184..6a3d89b 100644 --- a/backend/app/services/knowledge_base_service.py +++ b/backend/app/services/knowledge_base_service.py @@ -162,6 +162,8 @@ class KnowledgeBaseService: # Update job status to parsing await repo.update_processing_job(job_id, status="parsing_documents") + # Reset docs stuck at "parsing" from a previous interrupted run + await repo.reset_stuck_parsing_docs(kb_id) await session.commit() # Get all source documents for this KB @@ -182,6 +184,15 @@ class KnowledgeBaseService: async def _parse_doc(doc): nonlocal parsed_count + # Resume: skip docs already parsed in a previous interrupted run + if doc.parse_status in ("parsed", "partial"): + async with progress_lock: + parsed_count += 1 + await repo.update_processing_job(job_id, parsed_documents=parsed_count) + await session.commit() + if doc.parsed_markdown and doc.parsed_markdown.strip(): + return (doc.filename, str(doc.id), doc.parsed_markdown) + return None async with semaphore: try: async with async_session_factory() as doc_session: