feat(knowledge-base): smart resume for interrupted processing jobs

On server restart, stale active jobs are automatically resumed rather
than failed. Docs already parsed in a prior run are skipped (resume from
cache), docs stuck at 'parsing' are reset to 'pending' and re-parsed.

- Repository: add get_all_stale_active_jobs() and reset_stuck_parsing_docs()
- Service: skip already-parsed docs in _parse_doc(), reset stuck docs on start
- Main: recover stale jobs via asyncio.create_task() in lifespan startup

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-05-15 10:20:35 +01:00
parent a40e3d4052
commit 1982d5d76e
3 changed files with 63 additions and 0 deletions

View file

@ -83,6 +83,32 @@ async def lifespan(app: FastAPI):
knowledge_base_service = KnowledgeBaseService(llamaparse_service, gemini_service, reference_docs)
app.state.knowledge_base_service = knowledge_base_service
print("Knowledge Base pipeline ready!")
if db_available:
import asyncio as _asyncio
from app.models.database import async_session_factory as _session_factory
from app.repositories.knowledge_base_repository import KnowledgeBaseRepository as _KBRepo
try:
async with _session_factory() as _session:
_repo = _KBRepo(_session)
_stale = await _repo.get_all_stale_active_jobs()
if _stale:
print(f"Resuming {len(_stale)} interrupted processing job(s)...")
for _job in _stale:
_kb = await _repo.get_knowledge_base(_job.knowledge_base_id)
if _kb:
_asyncio.create_task(
knowledge_base_service.process_documents(
kb_id=_job.knowledge_base_id,
job_id=_job.id,
agent_key=_kb.agent_key,
user_id=_job.triggered_by_id,
user_name=_job.triggered_by_name,
)
)
print(f" Resumed: {_kb.display_name} (job {_job.id}, was {_job.status})")
except Exception as _e:
logger.warning(f"Startup job recovery failed: {_e}")
else:
print("LLAMA_CLOUD_API_KEY not set - Knowledge Base processing pipeline disabled")

View file

@ -314,3 +314,29 @@ class KnowledgeBaseRepository:
)
result = await self.session.execute(query)
return result.scalar() > 0
async def get_all_stale_active_jobs(self, stale_minutes: int = 5) -> list[ProcessingJob]:
"""Get all active jobs across all KBs older than stale_minutes (for startup recovery)."""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=stale_minutes)
active_statuses = ["pending", "parsing_documents", "distilling"]
query = (
select(ProcessingJob)
.where(ProcessingJob.status.in_(active_statuses))
.where(ProcessingJob.created_at < cutoff)
)
result = await self.session.execute(query)
return list(result.scalars().all())
async def reset_stuck_parsing_docs(self, kb_id: uuid.UUID) -> int:
"""Reset docs stuck at 'parsing' back to 'pending' so they can be re-parsed."""
query = (
select(SourceDocument)
.where(SourceDocument.knowledge_base_id == kb_id)
.where(SourceDocument.parse_status == "parsing")
)
result = await self.session.execute(query)
docs = result.scalars().all()
for doc in docs:
doc.parse_status = "pending"
await self.session.flush()
return len(docs)

View file

@ -162,6 +162,8 @@ class KnowledgeBaseService:
# Update job status to parsing
await repo.update_processing_job(job_id, status="parsing_documents")
# Reset docs stuck at "parsing" from a previous interrupted run
await repo.reset_stuck_parsing_docs(kb_id)
await session.commit()
# Get all source documents for this KB
@ -182,6 +184,15 @@ class KnowledgeBaseService:
async def _parse_doc(doc):
nonlocal parsed_count
# Resume: skip docs already parsed in a previous interrupted run
if doc.parse_status in ("parsed", "partial"):
async with progress_lock:
parsed_count += 1
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
await session.commit()
if doc.parsed_markdown and doc.parsed_markdown.strip():
return (doc.filename, str(doc.id), doc.parsed_markdown)
return None
async with semaphore:
try:
async with async_session_factory() as doc_session: