699 lines
No EOL
23 KiB
Python
699 lines
No EOL
23 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
from typing import List, Optional
|
|
from bson import ObjectId
|
|
from datetime import datetime
|
|
from urllib.parse import unquote
|
|
|
|
from ...config.database import get_database
|
|
from ...config.settings import settings
|
|
from ...core.auth import get_current_admin_user
|
|
from ...models.user import UserInDB, UserUpdate, UserRole
|
|
from ...models.document import DocumentInDB
|
|
from ...models.index import IndexInDB
|
|
from ...services.llama_processor import llama_processor
|
|
|
|
router = APIRouter()
|
|
|
|
@router.get("/users", response_model=List[dict])
|
|
async def get_all_users(
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get all users (admin only)"""
|
|
users = []
|
|
cursor = db.users.find({})
|
|
|
|
async for user in cursor:
|
|
user_obj = UserInDB(**user)
|
|
users.append({
|
|
"id": str(user_obj.id),
|
|
"email": user_obj.email,
|
|
"role": user_obj.role,
|
|
"is_active": user_obj.is_active,
|
|
"index_access": user_obj.index_access,
|
|
"created_at": user_obj.created_at,
|
|
"updated_at": user_obj.updated_at
|
|
})
|
|
|
|
return users
|
|
|
|
@router.post("/users")
|
|
async def create_user(
|
|
user_data: dict,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Create a new user (admin only)"""
|
|
try:
|
|
from ...core.security import get_password_hash
|
|
|
|
# Check if email already exists
|
|
existing_user = await db.users.find_one({"email": user_data["email"]})
|
|
if existing_user:
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
|
|
# Hash the password
|
|
hashed_password = get_password_hash(user_data["password"])
|
|
|
|
# Create user document
|
|
new_user = {
|
|
"email": user_data["email"],
|
|
"hashed_password": hashed_password,
|
|
"role": user_data.get("role", "user"),
|
|
"is_active": user_data.get("is_active", True),
|
|
"index_access": [],
|
|
"created_at": datetime.utcnow(),
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
# Insert user
|
|
result = await db.users.insert_one(new_user)
|
|
|
|
return {
|
|
"message": "User created successfully",
|
|
"user_id": str(result.inserted_id),
|
|
"email": user_data["email"]
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error creating user: {str(e)}")
|
|
|
|
@router.put("/users/{user_id}")
|
|
async def update_user(
|
|
user_id: str,
|
|
user_update: UserUpdate,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Update a user (admin only)"""
|
|
# Check if user exists
|
|
user = await db.users.find_one({"_id": ObjectId(user_id)})
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Prepare update data
|
|
update_data = {}
|
|
if user_update.email is not None:
|
|
# Check if email is already taken
|
|
existing = await db.users.find_one({
|
|
"email": user_update.email,
|
|
"_id": {"$ne": ObjectId(user_id)}
|
|
})
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Email already in use")
|
|
update_data["email"] = user_update.email
|
|
|
|
if user_update.role is not None:
|
|
update_data["role"] = user_update.role
|
|
|
|
if user_update.is_active is not None:
|
|
update_data["is_active"] = user_update.is_active
|
|
|
|
if user_update.password is not None:
|
|
from ...core.security import get_password_hash
|
|
update_data["hashed_password"] = get_password_hash(user_update.password)
|
|
|
|
if update_data:
|
|
update_data["updated_at"] = datetime.utcnow()
|
|
await db.users.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{"$set": update_data}
|
|
)
|
|
|
|
return {"message": "User updated successfully"}
|
|
|
|
@router.delete("/users/{user_id}")
|
|
async def delete_user(
|
|
user_id: str,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Delete a user (admin only)"""
|
|
# Don't allow admin to delete themselves
|
|
if str(admin_user.id) == user_id:
|
|
raise HTTPException(status_code=400, detail="Cannot delete your own account")
|
|
|
|
# Check if user exists
|
|
user = await db.users.find_one({"_id": ObjectId(user_id)})
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Delete user
|
|
await db.users.delete_one({"_id": ObjectId(user_id)})
|
|
|
|
return {"message": "User deleted successfully"}
|
|
|
|
@router.post("/users/{user_id}/grant-index-access")
|
|
async def grant_index_access(
|
|
user_id: str,
|
|
request_data: dict,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Grant user access to an index"""
|
|
index_id = request_data.get("index_id")
|
|
if not index_id:
|
|
raise HTTPException(status_code=400, detail="index_id is required")
|
|
|
|
# Check if user exists
|
|
user = await db.users.find_one({"_id": ObjectId(user_id)})
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Check if index exists
|
|
index = await db.indices.find_one({"index_id": index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
# Grant access
|
|
await db.users.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{"$addToSet": {"index_access": index_id}}
|
|
)
|
|
|
|
return {"message": "Index access granted successfully"}
|
|
|
|
@router.post("/users/{user_id}/revoke-index-access")
|
|
async def revoke_index_access(
|
|
user_id: str,
|
|
request_data: dict,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Revoke user access to an index"""
|
|
index_id = request_data.get("index_id")
|
|
if not index_id:
|
|
raise HTTPException(status_code=400, detail="index_id is required")
|
|
|
|
# Check if user exists
|
|
user = await db.users.find_one({"_id": ObjectId(user_id)})
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Revoke access
|
|
await db.users.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{"$pull": {"index_access": index_id}}
|
|
)
|
|
|
|
return {"message": "Index access revoked successfully"}
|
|
|
|
@router.post("/grant-all-indices/{user_id}")
|
|
async def grant_all_indices_access(
|
|
user_id: str,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Grant user access to all indices"""
|
|
# Check if user exists
|
|
user = await db.users.find_one({"_id": ObjectId(user_id)})
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Get all active indices
|
|
indices = []
|
|
cursor = db.indices.find({"status": "active"})
|
|
async for index in cursor:
|
|
indices.append(index["index_id"])
|
|
|
|
# Grant access to all indices
|
|
await db.users.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{"$set": {"index_access": indices}}
|
|
)
|
|
|
|
return {
|
|
"message": "Access granted to all indices",
|
|
"index_count": len(indices)
|
|
}
|
|
|
|
@router.get("/stats")
|
|
async def get_system_stats(
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get system statistics"""
|
|
# Count users
|
|
total_users = await db.users.count_documents({})
|
|
active_users = await db.users.count_documents({"is_active": True})
|
|
admin_users = await db.users.count_documents({"role": UserRole.ADMIN})
|
|
|
|
# Count indices
|
|
total_indices = await db.indices.count_documents({"status": "active"})
|
|
|
|
# Count documents
|
|
total_documents = await db.documents.count_documents({})
|
|
pending_documents = await db.documents.count_documents({"processing_status": "pending"})
|
|
processing_documents = await db.documents.count_documents({"processing_status": "processing"})
|
|
completed_documents = await db.documents.count_documents({"processing_status": "completed"})
|
|
failed_documents = await db.documents.count_documents({"processing_status": "failed"})
|
|
|
|
# Count chat messages
|
|
total_messages = await db.chat_messages.count_documents({})
|
|
|
|
return {
|
|
"users": {
|
|
"total": total_users,
|
|
"active": active_users,
|
|
"admins": admin_users
|
|
},
|
|
"indices": {
|
|
"total": total_indices
|
|
},
|
|
"documents": {
|
|
"total": total_documents,
|
|
"pending": pending_documents,
|
|
"processing": processing_documents,
|
|
"completed": completed_documents,
|
|
"failed": failed_documents
|
|
},
|
|
"chat_messages": {
|
|
"total": total_messages
|
|
}
|
|
}
|
|
|
|
@router.post("/documents/upload-single")
|
|
async def upload_single_document(
|
|
file: UploadFile = File(...),
|
|
index_id: str = Form(...),
|
|
custom_name: Optional[str] = Form(None),
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Upload a single document for processing (admin only)"""
|
|
# Verify index exists
|
|
index = await db.indices.find_one({"index_id": index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
# Process document
|
|
document = await llama_processor.process_single_file(
|
|
file, index_id, admin_user, db, custom_name
|
|
)
|
|
|
|
return {
|
|
"message": "Document uploaded and processing started",
|
|
"document_id": str(document.id),
|
|
"filename": document.original_filename,
|
|
"status": document.processing_status
|
|
}
|
|
|
|
@router.post("/documents/upload-multiple")
|
|
async def upload_multiple_documents(
|
|
files: List[UploadFile] = File(...),
|
|
index_id: str = Form(...),
|
|
base_name: str = Form(...),
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Upload multiple documents for processing (admin only)"""
|
|
# Verify index exists
|
|
index = await db.indices.find_one({"index_id": index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
# Process documents
|
|
documents = await llama_processor.process_multiple_files(
|
|
files, index_id, admin_user, db, base_name
|
|
)
|
|
|
|
return {
|
|
"message": f"Successfully uploaded {len(documents)} documents",
|
|
"documents": [
|
|
{
|
|
"id": str(doc.id),
|
|
"filename": doc.original_filename,
|
|
"status": doc.processing_status
|
|
} for doc in documents
|
|
]
|
|
}
|
|
|
|
@router.get("/documents/processing-status")
|
|
async def get_processing_status(
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get overall processing status (admin only)"""
|
|
|
|
# Get counts for each status
|
|
statuses = {
|
|
"pending": 0,
|
|
"processing": 0,
|
|
"completed": 0,
|
|
"failed": 0
|
|
}
|
|
|
|
# Count processing status
|
|
for status in statuses.keys():
|
|
statuses[status] = await db.documents.count_documents({
|
|
"processing_status": status
|
|
})
|
|
|
|
# Count embedding status
|
|
embedding_statuses = {}
|
|
for status in statuses.keys():
|
|
embedding_statuses[status] = await db.documents.count_documents({
|
|
"embedding_status": status
|
|
})
|
|
|
|
# Get documents with unclear status
|
|
unclear_docs = await db.documents.count_documents({
|
|
"$or": [
|
|
{"processing_status": {"$exists": False}},
|
|
{"embedding_status": {"$exists": False}}
|
|
]
|
|
})
|
|
|
|
return {
|
|
"processing_status": statuses,
|
|
"embedding_status": embedding_statuses,
|
|
"unclear_status_count": unclear_docs,
|
|
"total_documents": sum(statuses.values())
|
|
}
|
|
|
|
@router.get("/documents/{index_id}")
|
|
async def get_index_documents(
|
|
index_id: str,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get all documents for an index (admin only)"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Verify index exists
|
|
index = await db.indices.find_one({"index_id": decoded_index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
# Get documents
|
|
documents = []
|
|
cursor = db.documents.find({"index_id": decoded_index_id})
|
|
|
|
async for doc in cursor:
|
|
documents.append({
|
|
"id": str(doc["_id"]),
|
|
"filename": doc["original_filename"],
|
|
"file_size": doc["file_size"],
|
|
"processing_status": doc["processing_status"],
|
|
"embedding_status": doc["embedding_status"],
|
|
"chunk_count": doc.get("chunk_count", 0),
|
|
"created_at": doc["created_at"],
|
|
"updated_at": doc["updated_at"]
|
|
})
|
|
|
|
return {
|
|
"index_id": decoded_index_id,
|
|
"index_name": index["name"],
|
|
"documents": documents,
|
|
"total_documents": len(documents)
|
|
}
|
|
|
|
@router.post("/documents/{document_id}/reprocess")
|
|
async def reprocess_document(
|
|
document_id: str,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Reprocess a document (admin only)"""
|
|
# Get document
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
# Reset processing status
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"processing_status": "pending",
|
|
"embedding_status": "pending",
|
|
"parsed_text": None,
|
|
"text_chunks": None,
|
|
"chunk_count": 0,
|
|
"vector_ids": None,
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
# Create document object for reprocessing
|
|
doc_obj = DocumentInDB(**document)
|
|
|
|
# Start reprocessing
|
|
import asyncio
|
|
asyncio.create_task(llama_processor._process_document_async(doc_obj, db))
|
|
|
|
return {
|
|
"message": "Document reprocessing started",
|
|
"document_id": document_id,
|
|
"status": "pending"
|
|
}
|
|
|
|
@router.delete("/documents/{document_id}")
|
|
async def delete_document(
|
|
document_id: str,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Delete a document and its embeddings (admin only)"""
|
|
# Get document
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
# Use the enhanced document processor for complete cleanup
|
|
from ...services.document_processor import document_processor
|
|
success = await document_processor.delete_document(document_id, db)
|
|
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail="Failed to delete document")
|
|
|
|
return {
|
|
"message": "Document deleted successfully",
|
|
"document_id": document_id
|
|
}
|
|
|
|
@router.post("/chat/query")
|
|
async def admin_chat_query(
|
|
query: str = Form(...),
|
|
index_id: str = Form(...),
|
|
top_k: int = Form(5),
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Query documents using RAG (admin only)"""
|
|
# Verify index exists
|
|
index = await db.indices.find_one({"index_id": index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
try:
|
|
# Query vector store
|
|
results = await llama_processor.query_documents(
|
|
query, index_id, top_k
|
|
)
|
|
|
|
# Extract context chunks
|
|
context_chunks = [result["content"] for result in results]
|
|
|
|
# Generate response
|
|
response = await llama_processor.generate_response(
|
|
query, context_chunks, index_id
|
|
)
|
|
|
|
return {
|
|
"query": query,
|
|
"response": response,
|
|
"sources": results,
|
|
"index_id": index_id
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error processing query: {str(e)}"
|
|
)
|
|
|
|
@router.get("/indices")
|
|
async def get_all_indices(
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get all indices (admin only)"""
|
|
indices = []
|
|
cursor = db.indices.find({"status": "active"})
|
|
|
|
async for index in cursor:
|
|
# Count documents for this index
|
|
doc_count = await db.documents.count_documents({"index_id": index["index_id"]})
|
|
|
|
indices.append({
|
|
"id": str(index["_id"]),
|
|
"index_id": index["index_id"],
|
|
"name": index["name"],
|
|
"description": index.get("description", ""),
|
|
"document_count": doc_count,
|
|
"created_by": str(index["created_by"]),
|
|
"created_at": index["created_at"],
|
|
"chunk_size": index.get("chunk_size", 1000),
|
|
"chunk_overlap": index.get("chunk_overlap", 200)
|
|
})
|
|
|
|
return {
|
|
"indices": indices,
|
|
"total": len(indices)
|
|
}
|
|
|
|
@router.post("/indices/create")
|
|
async def create_index(
|
|
name: str = Form(...),
|
|
description: Optional[str] = Form(None),
|
|
chunk_size: int = Form(1000),
|
|
chunk_overlap: int = Form(200),
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Create a new index (admin only)"""
|
|
import uuid
|
|
|
|
# Generate unique index ID
|
|
index_id = str(uuid.uuid4())
|
|
|
|
# Create index record
|
|
index_data = {
|
|
"index_id": index_id,
|
|
"name": name,
|
|
"description": description,
|
|
"created_by": admin_user.id,
|
|
"created_at": datetime.utcnow(),
|
|
"updated_at": datetime.utcnow(),
|
|
"status": "active",
|
|
"document_count": 0,
|
|
"chunk_size": chunk_size,
|
|
"chunk_overlap": chunk_overlap,
|
|
"embedding_model": "text-embedding-ada-002",
|
|
"settings": {}
|
|
}
|
|
|
|
# Save to database
|
|
result = await db.indices.insert_one(index_data)
|
|
|
|
return {
|
|
"message": "Index created successfully",
|
|
"index_id": index_id,
|
|
"name": name,
|
|
"id": str(result.inserted_id)
|
|
}
|
|
|
|
@router.delete("/indices/{index_id}")
|
|
async def delete_index(
|
|
index_id: str,
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Delete an index and all its documents (admin only)"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if index exists
|
|
index = await db.indices.find_one({"index_id": decoded_index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
try:
|
|
# Get all documents for this index
|
|
documents_cursor = db.documents.find({"index_id": decoded_index_id})
|
|
document_count = 0
|
|
|
|
async for doc in documents_cursor:
|
|
document_count += 1
|
|
# Delete embeddings from vector store
|
|
await llama_processor.delete_document_embeddings(
|
|
str(doc["_id"]), decoded_index_id
|
|
)
|
|
|
|
# Delete file from filesystem
|
|
from pathlib import Path
|
|
file_path = Path(doc["file_path"])
|
|
if file_path.exists():
|
|
try:
|
|
file_path.unlink()
|
|
except Exception as e:
|
|
print(f"Error deleting file {file_path}: {e}")
|
|
|
|
# Delete all documents for this index
|
|
await db.documents.delete_many({"index_id": decoded_index_id})
|
|
|
|
# Delete all chat messages for this index
|
|
chat_result = await db.chat_messages.delete_many({"index_id": decoded_index_id})
|
|
|
|
# Delete the index record
|
|
await db.indices.delete_one({"index_id": decoded_index_id})
|
|
|
|
# Use complete ChromaDB cleanup instead of manual deletion
|
|
from ...services.rag_service import rag_service
|
|
cleanup_result = await rag_service.delete_index_complete(decoded_index_id)
|
|
if not cleanup_result["success"]:
|
|
print(f"Warning during complete index cleanup: {cleanup_result['message']}")
|
|
|
|
# Note: Cache clearing removed - caching is disabled for data freshness
|
|
|
|
# Delete index upload directory
|
|
from pathlib import Path
|
|
index_dir = Path(settings.upload_dir) / decoded_index_id
|
|
if index_dir.exists():
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(index_dir)
|
|
except Exception as e:
|
|
print(f"Error deleting index directory {index_dir}: {e}")
|
|
|
|
return {
|
|
"message": "Index deleted successfully",
|
|
"index_id": decoded_index_id,
|
|
"documents_deleted": document_count,
|
|
"chat_messages_deleted": chat_result.deleted_count
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error deleting index: {str(e)}"
|
|
)
|
|
|
|
@router.post("/documents/process-pending")
|
|
async def process_pending_documents(
|
|
admin_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Process all pending documents (admin only)"""
|
|
import asyncio
|
|
|
|
# Find all documents that are pending or failed
|
|
cursor = db.documents.find({
|
|
"$or": [
|
|
{"processing_status": "pending"},
|
|
{"processing_status": "failed"},
|
|
{"embedding_status": "pending"},
|
|
{"embedding_status": "failed"}
|
|
]
|
|
})
|
|
|
|
processed_count = 0
|
|
async for doc in cursor:
|
|
try:
|
|
document = DocumentInDB(**doc)
|
|
|
|
# Start processing
|
|
asyncio.create_task(llama_processor._process_document_async(document, db))
|
|
processed_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"Error queueing document {doc['_id']} for processing: {e}")
|
|
|
|
return {
|
|
"message": f"Queued {processed_count} documents for processing",
|
|
"count": processed_count
|
|
} |