Oliver-ai-bot_2.0/backend/app/tasks/knowledge_processing.py
Vadym Samoilenko 44a512c41f Phase 1 Complete: Dual-bot architecture, knowledge base, access control
- Remove notebook mode, add RAG + Personal Assistant dual-bot setup
- Add knowledge base management (upload, URL scraping, document processing)
- Add user feature access control (allowed_features, features_override)
- Update admin dashboard with knowledge base tab
- Redesign login page, sidebar, and profile
- Add Celery tasks for async document processing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 21:26:40 +00:00

109 lines
3.8 KiB
Python

"""
Celery Task for processing Knowledge Base documents uploaded via admin panel.
Reads the uploaded file from disk, runs it through DocumentProcessor
(same pipeline as SharePoint sync), and updates the DB record.
"""
import asyncio
import logging
import os
from datetime import datetime
from typing import Any, Dict
from uuid import UUID
from sqlalchemy import select
from app.core.document_processor import DocumentProcessor, DocumentProcessingError
from app.database import AsyncSessionLocal
from app.models.knowledge_document import KnowledgeDocument, DocumentStatus
from celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(
name="app.tasks.knowledge_processing.process_knowledge_document",
bind=True,
max_retries=2,
default_retry_delay=30,
)
def process_knowledge_document(self, document_id: str, file_path: str) -> Dict[str, Any]:
"""
Process an uploaded knowledge base document.
Args:
document_id: UUID string of the KnowledgeDocument record
file_path: Path to the uploaded temp file on disk
Returns:
Dict with processing result
"""
return asyncio.run(_async_process_knowledge_document(document_id, file_path))
async def _async_process_knowledge_document(document_id: str, file_path: str) -> Dict[str, Any]:
"""Async implementation of process_knowledge_document."""
async with AsyncSessionLocal() as session:
# Load document record
result = await session.execute(
select(KnowledgeDocument).where(KnowledgeDocument.id == UUID(document_id))
)
doc = result.scalar_one_or_none()
if not doc:
logger.error("KnowledgeDocument %s not found", document_id)
return {"error": "document_not_found"}
# Update status to PROCESSING
doc.status = DocumentStatus.PROCESSING
await session.commit()
try:
# Read file bytes
with open(file_path, "rb") as f:
file_bytes = f.read()
# Process through DocumentProcessor (same pipeline as SharePoint)
processor = DocumentProcessor()
vector_count = await processor.process_document(
file_bytes=file_bytes,
file_name=doc.file_name,
file_type=doc.file_type,
sharepoint_id=doc.document_key, # Maps to sharepoint_id in Qdrant
file_url="", # No URL for uploaded files
source_id="knowledge_base", # Distinguishes from SharePoint docs
department_id=str(doc.department_id) if doc.department_id else None,
region_code=doc.region_code,
)
# Success: update record
doc.status = DocumentStatus.COMPLETED
doc.vector_count = vector_count
doc.processed_at = datetime.utcnow()
doc.error_message = None
await session.commit()
logger.info(
"Knowledge document %s processed: %d vectors",
document_id, vector_count,
)
return {
"document_id": document_id,
"vector_count": vector_count,
"status": "completed",
}
except Exception as exc:
logger.exception("Failed to process knowledge document %s: %s", document_id, exc)
doc.status = DocumentStatus.FAILED
doc.error_message = str(exc)[:2000]
await session.commit()
return {"error": str(exc), "document_id": document_id}
finally:
# Clean up temp file
try:
if os.path.exists(file_path):
os.remove(file_path)
except OSError as e:
logger.warning("Failed to remove temp file %s: %s", file_path, e)