265 lines
No EOL
9.4 KiB
Python
265 lines
No EOL
9.4 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
from typing import List
|
|
from bson import ObjectId
|
|
from datetime import datetime
|
|
import uuid
|
|
from urllib.parse import unquote
|
|
|
|
from ...config.database import get_database
|
|
from ...core.auth import get_current_active_user, get_current_admin_user
|
|
from ...models.user import UserInDB
|
|
from ...models.index import Index, IndexCreate, IndexInDB
|
|
from ...services.rag_service import rag_service
|
|
from ...services.document_processor import document_processor
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/create", response_model=dict)
|
|
async def create_index(
|
|
index_data: IndexCreate,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Create a new document index"""
|
|
# Generate unique index ID
|
|
index_id = f"{index_data.name.lower().replace(' ', '-')}-{datetime.now().strftime('%Y-%m-%d')}-{str(uuid.uuid4())[:8]}"
|
|
|
|
# Create index record
|
|
index_dict = index_data.dict()
|
|
index_dict["index_id"] = index_id
|
|
index_dict["status"] = "active"
|
|
index_dict["document_count"] = 0
|
|
index_dict["settings"] = {}
|
|
index_dict["created_at"] = datetime.utcnow()
|
|
index_dict["updated_at"] = datetime.utcnow()
|
|
|
|
# Insert into database
|
|
result = await db.indices.insert_one(index_dict)
|
|
index_dict["_id"] = result.inserted_id
|
|
|
|
# Grant access to the creator
|
|
await db.users.update_one(
|
|
{"_id": current_user.id},
|
|
{"$addToSet": {"index_access": index_id}}
|
|
)
|
|
|
|
return {
|
|
"message": "Index created successfully",
|
|
"index_id": index_id,
|
|
"name": index_data.name,
|
|
"id": str(result.inserted_id)
|
|
}
|
|
|
|
@router.get("", response_model=List[dict])
|
|
async def get_user_indices(
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get all indices accessible to the current user"""
|
|
# Ensure index_access is a list and not None
|
|
user_index_access = current_user.index_access if current_user.index_access else []
|
|
|
|
if current_user.role.value == "admin":
|
|
# Admin can see all indices
|
|
cursor = db.indices.find({"status": "active"})
|
|
else:
|
|
# Regular users see only their accessible indices
|
|
# If user has no index access, they should see no indices
|
|
if not user_index_access:
|
|
return []
|
|
|
|
cursor = db.indices.find({
|
|
"index_id": {"$in": user_index_access},
|
|
"status": "active"
|
|
})
|
|
|
|
indices = []
|
|
async for index in cursor:
|
|
index_obj = IndexInDB(**index)
|
|
|
|
# Double-check access control for non-admin users
|
|
if current_user.role.value != "admin":
|
|
if index_obj.index_id not in user_index_access:
|
|
continue # Skip this index if user doesn't have access
|
|
|
|
# Get real-time document count instead of stored value
|
|
real_document_count = await db.documents.count_documents({"index_id": index_obj.index_id})
|
|
|
|
indices.append({
|
|
"id": str(index_obj.id),
|
|
"index_id": index_obj.index_id,
|
|
"name": index_obj.name,
|
|
"description": index_obj.description,
|
|
"document_count": real_document_count,
|
|
"created_at": index_obj.created_at,
|
|
"updated_at": index_obj.updated_at,
|
|
"status": index_obj.status
|
|
})
|
|
|
|
return indices
|
|
|
|
@router.get("/{index_id}", response_model=dict)
|
|
async def get_index_details(
|
|
index_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get details of a specific index"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
index = await db.indices.find_one({"index_id": decoded_index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
index_obj = IndexInDB(**index)
|
|
|
|
# Get document count
|
|
document_count = await db.documents.count_documents({"index_id": decoded_index_id})
|
|
|
|
# Get documents
|
|
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
|
|
|
|
return {
|
|
"id": str(index_obj.id),
|
|
"index_id": index_obj.index_id,
|
|
"name": index_obj.name,
|
|
"description": index_obj.description,
|
|
"document_count": document_count,
|
|
"created_at": index_obj.created_at,
|
|
"updated_at": index_obj.updated_at,
|
|
"status": index_obj.status,
|
|
"settings": index_obj.settings,
|
|
"documents": [
|
|
{
|
|
"id": str(doc.id),
|
|
"filename": doc.filename,
|
|
"original_filename": doc.original_filename,
|
|
"processing_status": doc.processing_status,
|
|
"created_at": doc.created_at
|
|
}
|
|
for doc in documents
|
|
]
|
|
}
|
|
|
|
@router.post("/{index_id}/rebuild")
|
|
async def rebuild_index(
|
|
index_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Rebuild the vector index from all documents"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
# Get all documents for this index
|
|
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
|
|
|
|
if not documents:
|
|
raise HTTPException(status_code=400, detail="No documents found for this index")
|
|
|
|
# NOTE: Index rebuilding is now handled by reprocessing documents through LlamaProcessor
|
|
# Clear existing vectors and reprocess all documents in this index
|
|
import asyncio
|
|
from ...services.llama_processor import llama_processor
|
|
|
|
reprocessed_count = 0
|
|
for doc in documents:
|
|
try:
|
|
print(f"Rebuilding document {doc.id}: {doc.original_filename}")
|
|
|
|
# Step 1: Clear existing vectors from ChromaDB
|
|
if doc.vector_ids:
|
|
print(f" - Clearing {len(doc.vector_ids)} existing vectors")
|
|
await llama_processor.delete_document_embeddings(
|
|
str(doc.id),
|
|
decoded_index_id
|
|
)
|
|
|
|
# Step 2: Clear document metadata and reset status
|
|
await db.documents.update_one(
|
|
{"_id": doc.id},
|
|
{
|
|
"$set": {
|
|
"processing_status": "pending",
|
|
"embedding_status": "pending",
|
|
"summary_status": "pending",
|
|
"updated_at": datetime.utcnow()
|
|
},
|
|
"$unset": {
|
|
"parsed_text": "",
|
|
"text_chunks": "",
|
|
"chunk_count": "",
|
|
"vector_ids": "",
|
|
"contract_summary": "",
|
|
"summary_created_at": ""
|
|
}
|
|
}
|
|
)
|
|
|
|
# Step 3: Start reprocessing
|
|
asyncio.create_task(llama_processor._process_document_async(doc, db))
|
|
reprocessed_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"Error queueing document {doc.id} for reprocessing: {e}")
|
|
|
|
# Update index timestamp
|
|
await db.indices.update_one(
|
|
{"index_id": decoded_index_id},
|
|
{"$set": {"updated_at": datetime.utcnow()}}
|
|
)
|
|
|
|
return {
|
|
"message": f"Index rebuild started - {reprocessed_count} documents queued for reprocessing",
|
|
"document_count": reprocessed_count
|
|
}
|
|
|
|
@router.delete("/{index_id}")
|
|
async def delete_index(
|
|
index_id: str,
|
|
current_user: UserInDB = Depends(get_current_admin_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Delete an index (admin only)"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
index = await db.indices.find_one({"index_id": decoded_index_id})
|
|
if not index:
|
|
raise HTTPException(status_code=404, detail="Index not found")
|
|
|
|
# Delete vector index with complete cleanup
|
|
deletion_result = await rag_service.delete_index_complete(decoded_index_id)
|
|
if not deletion_result["success"]:
|
|
print(f"Warning during index deletion: {deletion_result['message']}")
|
|
|
|
# Delete all documents in this index
|
|
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
|
|
for doc in documents:
|
|
await document_processor.delete_document(str(doc.id), db)
|
|
|
|
# Note: Cache clearing removed - caching is disabled for data freshness
|
|
|
|
# Mark index as deleted
|
|
await db.indices.update_one(
|
|
{"index_id": decoded_index_id},
|
|
{"$set": {"status": "deleted", "updated_at": datetime.utcnow()}}
|
|
)
|
|
|
|
# Remove index access from all users
|
|
await db.users.update_many(
|
|
{"index_access": decoded_index_id},
|
|
{"$pull": {"index_access": decoded_index_id}}
|
|
)
|
|
|
|
return {"message": "Index deleted successfully"} |