406 lines
No EOL
15 KiB
Python
406 lines
No EOL
15 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
from typing import Dict, Any, List
|
|
import time
|
|
from datetime import datetime
|
|
from bson import ObjectId
|
|
from urllib.parse import unquote
|
|
|
|
from ...config.database import get_database
|
|
from ...core.auth import get_current_active_user
|
|
# Cache import removed - caching disabled for data freshness
|
|
from ...models.user import UserInDB
|
|
from ...models.chat import ChatQuery, ChatResponse, ChatMessageCreate, ChatMessageInDB
|
|
from ...services.rag_service import rag_service
|
|
from ...services.llama_processor import llama_processor
|
|
from ...services.chat_context_service import chat_context_service
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/query", response_model=ChatResponse)
|
|
async def chat_query(
|
|
query_data: ChatQuery,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Process a chat query against a document index"""
|
|
start_time = time.time()
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and query_data.index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
# Check if index exists in database
|
|
index = await db.indices.find_one({"index_id": query_data.index_id})
|
|
if not index:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Index '{query_data.index_id}' not found"
|
|
)
|
|
|
|
# Check if index has processed documents
|
|
processed_docs = await db.documents.count_documents({
|
|
"index_id": query_data.index_id,
|
|
"processing_status": "completed",
|
|
"embedding_status": "completed"
|
|
})
|
|
|
|
# If no completed documents, check for any documents at all
|
|
if processed_docs == 0:
|
|
total_docs = await db.documents.count_documents({"index_id": query_data.index_id})
|
|
if total_docs == 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Index '{query_data.index_id}' has no documents. Please upload documents first."
|
|
)
|
|
else:
|
|
# Check processing status
|
|
processing_docs = await db.documents.count_documents({
|
|
"index_id": query_data.index_id,
|
|
"$or": [
|
|
{"processing_status": "processing"},
|
|
{"embedding_status": "processing"},
|
|
{"processing_status": "pending"},
|
|
{"embedding_status": "pending"}
|
|
]
|
|
})
|
|
|
|
failed_docs = await db.documents.count_documents({
|
|
"index_id": query_data.index_id,
|
|
"$or": [
|
|
{"processing_status": "failed"},
|
|
{"embedding_status": "failed"}
|
|
]
|
|
})
|
|
|
|
if processing_docs > 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Index '{query_data.index_id}' has {processing_docs} documents still processing. Please wait for processing to complete."
|
|
)
|
|
elif failed_docs > 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Index '{query_data.index_id}' has {failed_docs} documents that failed to process. Please check the admin panel and reprocess the documents."
|
|
)
|
|
else:
|
|
# Documents exist but status is unclear, check if any have parsed text
|
|
docs_with_text = await db.documents.count_documents({
|
|
"index_id": query_data.index_id,
|
|
"parsed_text": {"$exists": True, "$ne": None, "$ne": ""}
|
|
})
|
|
|
|
if docs_with_text > 0:
|
|
print(f"Warning: Index {query_data.index_id} has documents with unclear processing status but {docs_with_text} have parsed text")
|
|
# Continue with the query attempt
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Index '{query_data.index_id}' has documents but none have been processed successfully. Please check the admin panel."
|
|
)
|
|
|
|
# Query vector store for relevant chunks
|
|
try:
|
|
vector_results = await llama_processor.query_documents(
|
|
query_data.query, query_data.index_id, top_k=10
|
|
)
|
|
|
|
# Extract context chunks
|
|
context_chunks = [result["content"] for result in vector_results]
|
|
|
|
# Generate contextual response with conversation history
|
|
ai_result = await chat_context_service.generate_contextual_response(
|
|
query_data.query,
|
|
query_data.index_id,
|
|
str(current_user.id),
|
|
db,
|
|
context_chunks
|
|
)
|
|
|
|
result = {
|
|
"success": True,
|
|
"response": ai_result["response"],
|
|
"sources": vector_results,
|
|
"context_used": ai_result.get("context_used"),
|
|
"context_messages_count": ai_result.get("context_messages_count", 0)
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error in chat query: {e}")
|
|
# Handle specific ChromaDB errors more gracefully
|
|
if "does not exist" in str(e) or "Collection" in str(e):
|
|
# Check document count again
|
|
total_docs = await db.documents.count_documents({"index_id": query_data.index_id})
|
|
if total_docs == 0:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No documents found in index '{query_data.index_id}'. Please upload documents first."
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Vector database not ready for index '{query_data.index_id}'. The documents may still be processing. Please wait and try again."
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error processing query: {str(e)}"
|
|
)
|
|
|
|
# Result is always successful at this point since we handle errors above
|
|
|
|
# Prepare response
|
|
response_time = time.time() - start_time
|
|
debug_info = {
|
|
"sources": result.get("sources", []),
|
|
"context_used": result.get("context_used"),
|
|
"context_messages_count": result.get("context_messages_count", 0),
|
|
"cached": False,
|
|
"response_time": response_time
|
|
}
|
|
|
|
response = ChatResponse(
|
|
response=result["response"],
|
|
debug_info=debug_info,
|
|
cached=False,
|
|
response_time=response_time
|
|
)
|
|
|
|
# Save chat message to database
|
|
await _save_chat_message(
|
|
current_user.id, query_data, response, db
|
|
)
|
|
|
|
return response
|
|
|
|
@router.get("/history/{index_id}")
|
|
async def get_chat_history(
|
|
index_id: str,
|
|
limit: int = 50,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get chat history for a specific index"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
# Get chat messages in chronological order (oldest first), excluding soft-deleted
|
|
cursor = db.chat_messages.find({
|
|
"user_id": current_user.id,
|
|
"index_id": decoded_index_id,
|
|
"deleted_by_user": {"$ne": True}
|
|
}).sort("created_at", 1).limit(limit)
|
|
|
|
messages = []
|
|
async for msg in cursor:
|
|
message = ChatMessageInDB(**msg)
|
|
# Use separate timestamps if available, otherwise use created_at
|
|
user_time = msg.get("user_timestamp", message.created_at)
|
|
assistant_time = msg.get("assistant_timestamp", message.created_at)
|
|
|
|
messages.append({
|
|
"id": str(message.id),
|
|
"query": message.query,
|
|
"response": message.response,
|
|
"created_at": message.created_at,
|
|
"user_timestamp": user_time,
|
|
"assistant_timestamp": assistant_time,
|
|
"response_time": message.response_time,
|
|
"cached": message.cached,
|
|
"sources": message.sources,
|
|
"context_used": message.context_used
|
|
})
|
|
|
|
return {"messages": messages}
|
|
|
|
@router.delete("/history/{index_id}")
|
|
async def clear_chat_history(
|
|
index_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Clear chat history for a specific index (soft delete)"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
# Soft delete chat messages by marking them as deleted
|
|
result = await db.chat_messages.update_many(
|
|
{
|
|
"user_id": current_user.id,
|
|
"index_id": decoded_index_id,
|
|
"deleted_by_user": {"$ne": True}
|
|
},
|
|
{
|
|
"$set": {
|
|
"deleted_by_user": True,
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
}
|
|
)
|
|
|
|
# Note: Cache clearing removed - caching is disabled for data freshness
|
|
|
|
return {"message": f"Cleared {result.modified_count} chat messages"}
|
|
|
|
@router.get("/context/{index_id}")
|
|
async def get_conversation_context(
|
|
index_id: str,
|
|
limit: int = 5,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get conversation context for debugging/display"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
# Get conversation context
|
|
context_messages = await chat_context_service.get_conversation_context(
|
|
str(current_user.id), decoded_index_id, db, limit
|
|
)
|
|
|
|
return {
|
|
"context_messages": context_messages,
|
|
"count": len(context_messages),
|
|
"formatted_context": chat_context_service.format_context_for_ai(context_messages)
|
|
}
|
|
|
|
@router.get("/index-status/{index_id}")
|
|
async def get_index_chat_status(
|
|
index_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Check if an index is ready for chat queries"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
# Check if index exists in database
|
|
index = await db.indices.find_one({"index_id": decoded_index_id})
|
|
if not index:
|
|
return {
|
|
"ready": False,
|
|
"reason": "Index not found",
|
|
"details": {
|
|
"index_exists": False,
|
|
"total_documents": 0,
|
|
"processed_documents": 0,
|
|
"failed_documents": 0,
|
|
"processing_documents": 0
|
|
}
|
|
}
|
|
|
|
# Get document statistics
|
|
total_docs = await db.documents.count_documents({"index_id": decoded_index_id})
|
|
processed_docs = await db.documents.count_documents({
|
|
"index_id": decoded_index_id,
|
|
"processing_status": "completed",
|
|
"embedding_status": "completed"
|
|
})
|
|
failed_docs = await db.documents.count_documents({
|
|
"index_id": decoded_index_id,
|
|
"$or": [
|
|
{"processing_status": "failed"},
|
|
{"embedding_status": "failed"}
|
|
]
|
|
})
|
|
processing_docs = await db.documents.count_documents({
|
|
"index_id": decoded_index_id,
|
|
"$or": [
|
|
{"processing_status": "processing"},
|
|
{"embedding_status": "processing"},
|
|
{"processing_status": "pending"},
|
|
{"embedding_status": "pending"}
|
|
]
|
|
})
|
|
|
|
# Check ChromaDB collection
|
|
collection_info = llama_processor.get_collection_info(decoded_index_id)
|
|
|
|
# Determine if ready
|
|
ready = processed_docs > 0 and collection_info["exists"] and collection_info["count"] > 0
|
|
|
|
reason = ""
|
|
if total_docs == 0:
|
|
reason = "No documents uploaded"
|
|
elif processed_docs == 0:
|
|
if processing_docs > 0:
|
|
reason = f"{processing_docs} documents still processing"
|
|
elif failed_docs > 0:
|
|
reason = f"All {failed_docs} documents failed to process"
|
|
else:
|
|
reason = "No documents have been processed successfully"
|
|
elif not collection_info["exists"]:
|
|
reason = "Vector database collection not found"
|
|
elif collection_info["count"] == 0:
|
|
reason = "Vector database collection is empty"
|
|
|
|
return {
|
|
"ready": ready,
|
|
"reason": reason if not ready else "Index ready for queries",
|
|
"details": {
|
|
"index_exists": True,
|
|
"index_name": index["name"],
|
|
"total_documents": total_docs,
|
|
"processed_documents": processed_docs,
|
|
"failed_documents": failed_docs,
|
|
"processing_documents": processing_docs,
|
|
"collection_exists": collection_info["exists"],
|
|
"collection_count": collection_info.get("count", 0),
|
|
"collection_error": collection_info.get("error")
|
|
}
|
|
}
|
|
|
|
async def _save_chat_message(
|
|
user_id,
|
|
query_data: ChatQuery,
|
|
response: ChatResponse,
|
|
db: AsyncIOMotorDatabase
|
|
):
|
|
"""Save chat message to database with proper timestamp"""
|
|
try:
|
|
current_time = datetime.utcnow()
|
|
|
|
message_data = ChatMessageCreate(
|
|
user_id=user_id,
|
|
index_id=query_data.index_id,
|
|
query=query_data.query,
|
|
response=response.response,
|
|
created_at=current_time,
|
|
updated_at=current_time
|
|
)
|
|
|
|
message_dict = message_data.dict()
|
|
message_dict["debug_info"] = response.debug_info
|
|
message_dict["response_time"] = response.response_time
|
|
message_dict["cached"] = response.cached
|
|
message_dict["sources"] = response.debug_info.get("sources", [])
|
|
message_dict["context_used"] = response.debug_info.get("context_used")
|
|
message_dict["created_at"] = current_time
|
|
message_dict["updated_at"] = current_time
|
|
|
|
# Add separate timestamps for user message and assistant response
|
|
message_dict["user_timestamp"] = current_time
|
|
message_dict["assistant_timestamp"] = current_time
|
|
message_dict["deleted_by_user"] = False
|
|
|
|
await db.chat_messages.insert_one(message_dict)
|
|
|
|
except Exception as e:
|
|
print(f"Error saving chat message: {e}")
|
|
# Don't fail the request if we can't save the message |