contract-query/backend/app/api/v1/indices.py
2025-08-14 15:03:33 -05:00

265 lines
No EOL
9.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from motor.motor_asyncio import AsyncIOMotorDatabase
from typing import List
from bson import ObjectId
from datetime import datetime
import uuid
from urllib.parse import unquote
from ...config.database import get_database
from ...core.auth import get_current_active_user, get_current_admin_user
from ...models.user import UserInDB
from ...models.index import Index, IndexCreate, IndexInDB
from ...services.rag_service import rag_service
from ...services.document_processor import document_processor
router = APIRouter()
@router.post("/create", response_model=dict)
async def create_index(
index_data: IndexCreate,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Create a new document index"""
# Generate unique index ID
index_id = f"{index_data.name.lower().replace(' ', '-')}-{datetime.now().strftime('%Y-%m-%d')}-{str(uuid.uuid4())[:8]}"
# Create index record
index_dict = index_data.dict()
index_dict["index_id"] = index_id
index_dict["status"] = "active"
index_dict["document_count"] = 0
index_dict["settings"] = {}
index_dict["created_at"] = datetime.utcnow()
index_dict["updated_at"] = datetime.utcnow()
# Insert into database
result = await db.indices.insert_one(index_dict)
index_dict["_id"] = result.inserted_id
# Grant access to the creator
await db.users.update_one(
{"_id": current_user.id},
{"$addToSet": {"index_access": index_id}}
)
return {
"message": "Index created successfully",
"index_id": index_id,
"name": index_data.name,
"id": str(result.inserted_id)
}
@router.get("", response_model=List[dict])
async def get_user_indices(
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get all indices accessible to the current user"""
# Ensure index_access is a list and not None
user_index_access = current_user.index_access if current_user.index_access else []
if current_user.role.value == "admin":
# Admin can see all indices
cursor = db.indices.find({"status": "active"})
else:
# Regular users see only their accessible indices
# If user has no index access, they should see no indices
if not user_index_access:
return []
cursor = db.indices.find({
"index_id": {"$in": user_index_access},
"status": "active"
})
indices = []
async for index in cursor:
index_obj = IndexInDB(**index)
# Double-check access control for non-admin users
if current_user.role.value != "admin":
if index_obj.index_id not in user_index_access:
continue # Skip this index if user doesn't have access
# Get real-time document count instead of stored value
real_document_count = await db.documents.count_documents({"index_id": index_obj.index_id})
indices.append({
"id": str(index_obj.id),
"index_id": index_obj.index_id,
"name": index_obj.name,
"description": index_obj.description,
"document_count": real_document_count,
"created_at": index_obj.created_at,
"updated_at": index_obj.updated_at,
"status": index_obj.status
})
return indices
@router.get("/{index_id}", response_model=dict)
async def get_index_details(
index_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get details of a specific index"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
# Check if user has access to this index
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
index = await db.indices.find_one({"index_id": decoded_index_id})
if not index:
raise HTTPException(status_code=404, detail="Index not found")
index_obj = IndexInDB(**index)
# Get document count
document_count = await db.documents.count_documents({"index_id": decoded_index_id})
# Get documents
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
return {
"id": str(index_obj.id),
"index_id": index_obj.index_id,
"name": index_obj.name,
"description": index_obj.description,
"document_count": document_count,
"created_at": index_obj.created_at,
"updated_at": index_obj.updated_at,
"status": index_obj.status,
"settings": index_obj.settings,
"documents": [
{
"id": str(doc.id),
"filename": doc.filename,
"original_filename": doc.original_filename,
"processing_status": doc.processing_status,
"created_at": doc.created_at
}
for doc in documents
]
}
@router.post("/{index_id}/rebuild")
async def rebuild_index(
index_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Rebuild the vector index from all documents"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
# Check if user has access to this index
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
# Get all documents for this index
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
if not documents:
raise HTTPException(status_code=400, detail="No documents found for this index")
# NOTE: Index rebuilding is now handled by reprocessing documents through LlamaProcessor
# Clear existing vectors and reprocess all documents in this index
import asyncio
from ...services.llama_processor import llama_processor
reprocessed_count = 0
for doc in documents:
try:
print(f"Rebuilding document {doc.id}: {doc.original_filename}")
# Step 1: Clear existing vectors from ChromaDB
if doc.vector_ids:
print(f" - Clearing {len(doc.vector_ids)} existing vectors")
await llama_processor.delete_document_embeddings(
str(doc.id),
decoded_index_id
)
# Step 2: Clear document metadata and reset status
await db.documents.update_one(
{"_id": doc.id},
{
"$set": {
"processing_status": "pending",
"embedding_status": "pending",
"summary_status": "pending",
"updated_at": datetime.utcnow()
},
"$unset": {
"parsed_text": "",
"text_chunks": "",
"chunk_count": "",
"vector_ids": "",
"contract_summary": "",
"summary_created_at": ""
}
}
)
# Step 3: Start reprocessing
asyncio.create_task(llama_processor._process_document_async(doc, db))
reprocessed_count += 1
except Exception as e:
print(f"Error queueing document {doc.id} for reprocessing: {e}")
# Update index timestamp
await db.indices.update_one(
{"index_id": decoded_index_id},
{"$set": {"updated_at": datetime.utcnow()}}
)
return {
"message": f"Index rebuild started - {reprocessed_count} documents queued for reprocessing",
"document_count": reprocessed_count
}
@router.delete("/{index_id}")
async def delete_index(
index_id: str,
current_user: UserInDB = Depends(get_current_admin_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Delete an index (admin only)"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
index = await db.indices.find_one({"index_id": decoded_index_id})
if not index:
raise HTTPException(status_code=404, detail="Index not found")
# Delete vector index with complete cleanup
deletion_result = await rag_service.delete_index_complete(decoded_index_id)
if not deletion_result["success"]:
print(f"Warning during index deletion: {deletion_result['message']}")
# Delete all documents in this index
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
for doc in documents:
await document_processor.delete_document(str(doc.id), db)
# Note: Cache clearing removed - caching is disabled for data freshness
# Mark index as deleted
await db.indices.update_one(
{"index_id": decoded_index_id},
{"$set": {"status": "deleted", "updated_at": datetime.utcnow()}}
)
# Remove index access from all users
await db.users.update_many(
{"index_access": decoded_index_id},
{"$pull": {"index_access": decoded_index_id}}
)
return {"message": "Index deleted successfully"}