contract-query/backend/app/api/v1/chat.py
2025-08-14 15:03:33 -05:00

406 lines
No EOL
15 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from motor.motor_asyncio import AsyncIOMotorDatabase
from typing import Dict, Any, List
import time
from datetime import datetime
from bson import ObjectId
from urllib.parse import unquote
from ...config.database import get_database
from ...core.auth import get_current_active_user
# Cache import removed - caching disabled for data freshness
from ...models.user import UserInDB
from ...models.chat import ChatQuery, ChatResponse, ChatMessageCreate, ChatMessageInDB
from ...services.rag_service import rag_service
from ...services.llama_processor import llama_processor
from ...services.chat_context_service import chat_context_service
router = APIRouter()
@router.post("/query", response_model=ChatResponse)
async def chat_query(
query_data: ChatQuery,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Process a chat query against a document index"""
start_time = time.time()
# Check if user has access to this index
if current_user.role.value != "admin" and query_data.index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
# Check if index exists in database
index = await db.indices.find_one({"index_id": query_data.index_id})
if not index:
raise HTTPException(
status_code=404,
detail=f"Index '{query_data.index_id}' not found"
)
# Check if index has processed documents
processed_docs = await db.documents.count_documents({
"index_id": query_data.index_id,
"processing_status": "completed",
"embedding_status": "completed"
})
# If no completed documents, check for any documents at all
if processed_docs == 0:
total_docs = await db.documents.count_documents({"index_id": query_data.index_id})
if total_docs == 0:
raise HTTPException(
status_code=400,
detail=f"Index '{query_data.index_id}' has no documents. Please upload documents first."
)
else:
# Check processing status
processing_docs = await db.documents.count_documents({
"index_id": query_data.index_id,
"$or": [
{"processing_status": "processing"},
{"embedding_status": "processing"},
{"processing_status": "pending"},
{"embedding_status": "pending"}
]
})
failed_docs = await db.documents.count_documents({
"index_id": query_data.index_id,
"$or": [
{"processing_status": "failed"},
{"embedding_status": "failed"}
]
})
if processing_docs > 0:
raise HTTPException(
status_code=400,
detail=f"Index '{query_data.index_id}' has {processing_docs} documents still processing. Please wait for processing to complete."
)
elif failed_docs > 0:
raise HTTPException(
status_code=400,
detail=f"Index '{query_data.index_id}' has {failed_docs} documents that failed to process. Please check the admin panel and reprocess the documents."
)
else:
# Documents exist but status is unclear, check if any have parsed text
docs_with_text = await db.documents.count_documents({
"index_id": query_data.index_id,
"parsed_text": {"$exists": True, "$ne": None, "$ne": ""}
})
if docs_with_text > 0:
print(f"Warning: Index {query_data.index_id} has documents with unclear processing status but {docs_with_text} have parsed text")
# Continue with the query attempt
else:
raise HTTPException(
status_code=400,
detail=f"Index '{query_data.index_id}' has documents but none have been processed successfully. Please check the admin panel."
)
# Query vector store for relevant chunks
try:
vector_results = await llama_processor.query_documents(
query_data.query, query_data.index_id, top_k=10
)
# Extract context chunks
context_chunks = [result["content"] for result in vector_results]
# Generate contextual response with conversation history
ai_result = await chat_context_service.generate_contextual_response(
query_data.query,
query_data.index_id,
str(current_user.id),
db,
context_chunks
)
result = {
"success": True,
"response": ai_result["response"],
"sources": vector_results,
"context_used": ai_result.get("context_used"),
"context_messages_count": ai_result.get("context_messages_count", 0)
}
except Exception as e:
print(f"Error in chat query: {e}")
# Handle specific ChromaDB errors more gracefully
if "does not exist" in str(e) or "Collection" in str(e):
# Check document count again
total_docs = await db.documents.count_documents({"index_id": query_data.index_id})
if total_docs == 0:
raise HTTPException(
status_code=404,
detail=f"No documents found in index '{query_data.index_id}'. Please upload documents first."
)
else:
raise HTTPException(
status_code=404,
detail=f"Vector database not ready for index '{query_data.index_id}'. The documents may still be processing. Please wait and try again."
)
else:
raise HTTPException(
status_code=500,
detail=f"Error processing query: {str(e)}"
)
# Result is always successful at this point since we handle errors above
# Prepare response
response_time = time.time() - start_time
debug_info = {
"sources": result.get("sources", []),
"context_used": result.get("context_used"),
"context_messages_count": result.get("context_messages_count", 0),
"cached": False,
"response_time": response_time
}
response = ChatResponse(
response=result["response"],
debug_info=debug_info,
cached=False,
response_time=response_time
)
# Save chat message to database
await _save_chat_message(
current_user.id, query_data, response, db
)
return response
@router.get("/history/{index_id}")
async def get_chat_history(
index_id: str,
limit: int = 50,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get chat history for a specific index"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
# Check if user has access to this index
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
# Get chat messages in chronological order (oldest first), excluding soft-deleted
cursor = db.chat_messages.find({
"user_id": current_user.id,
"index_id": decoded_index_id,
"deleted_by_user": {"$ne": True}
}).sort("created_at", 1).limit(limit)
messages = []
async for msg in cursor:
message = ChatMessageInDB(**msg)
# Use separate timestamps if available, otherwise use created_at
user_time = msg.get("user_timestamp", message.created_at)
assistant_time = msg.get("assistant_timestamp", message.created_at)
messages.append({
"id": str(message.id),
"query": message.query,
"response": message.response,
"created_at": message.created_at,
"user_timestamp": user_time,
"assistant_timestamp": assistant_time,
"response_time": message.response_time,
"cached": message.cached,
"sources": message.sources,
"context_used": message.context_used
})
return {"messages": messages}
@router.delete("/history/{index_id}")
async def clear_chat_history(
index_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Clear chat history for a specific index (soft delete)"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
# Check if user has access to this index
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
# Soft delete chat messages by marking them as deleted
result = await db.chat_messages.update_many(
{
"user_id": current_user.id,
"index_id": decoded_index_id,
"deleted_by_user": {"$ne": True}
},
{
"$set": {
"deleted_by_user": True,
"updated_at": datetime.utcnow()
}
}
)
# Note: Cache clearing removed - caching is disabled for data freshness
return {"message": f"Cleared {result.modified_count} chat messages"}
@router.get("/context/{index_id}")
async def get_conversation_context(
index_id: str,
limit: int = 5,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get conversation context for debugging/display"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
# Check if user has access to this index
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
# Get conversation context
context_messages = await chat_context_service.get_conversation_context(
str(current_user.id), decoded_index_id, db, limit
)
return {
"context_messages": context_messages,
"count": len(context_messages),
"formatted_context": chat_context_service.format_context_for_ai(context_messages)
}
@router.get("/index-status/{index_id}")
async def get_index_chat_status(
index_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Check if an index is ready for chat queries"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
# Check if user has access to this index
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
# Check if index exists in database
index = await db.indices.find_one({"index_id": decoded_index_id})
if not index:
return {
"ready": False,
"reason": "Index not found",
"details": {
"index_exists": False,
"total_documents": 0,
"processed_documents": 0,
"failed_documents": 0,
"processing_documents": 0
}
}
# Get document statistics
total_docs = await db.documents.count_documents({"index_id": decoded_index_id})
processed_docs = await db.documents.count_documents({
"index_id": decoded_index_id,
"processing_status": "completed",
"embedding_status": "completed"
})
failed_docs = await db.documents.count_documents({
"index_id": decoded_index_id,
"$or": [
{"processing_status": "failed"},
{"embedding_status": "failed"}
]
})
processing_docs = await db.documents.count_documents({
"index_id": decoded_index_id,
"$or": [
{"processing_status": "processing"},
{"embedding_status": "processing"},
{"processing_status": "pending"},
{"embedding_status": "pending"}
]
})
# Check ChromaDB collection
collection_info = llama_processor.get_collection_info(decoded_index_id)
# Determine if ready
ready = processed_docs > 0 and collection_info["exists"] and collection_info["count"] > 0
reason = ""
if total_docs == 0:
reason = "No documents uploaded"
elif processed_docs == 0:
if processing_docs > 0:
reason = f"{processing_docs} documents still processing"
elif failed_docs > 0:
reason = f"All {failed_docs} documents failed to process"
else:
reason = "No documents have been processed successfully"
elif not collection_info["exists"]:
reason = "Vector database collection not found"
elif collection_info["count"] == 0:
reason = "Vector database collection is empty"
return {
"ready": ready,
"reason": reason if not ready else "Index ready for queries",
"details": {
"index_exists": True,
"index_name": index["name"],
"total_documents": total_docs,
"processed_documents": processed_docs,
"failed_documents": failed_docs,
"processing_documents": processing_docs,
"collection_exists": collection_info["exists"],
"collection_count": collection_info.get("count", 0),
"collection_error": collection_info.get("error")
}
}
async def _save_chat_message(
user_id,
query_data: ChatQuery,
response: ChatResponse,
db: AsyncIOMotorDatabase
):
"""Save chat message to database with proper timestamp"""
try:
current_time = datetime.utcnow()
message_data = ChatMessageCreate(
user_id=user_id,
index_id=query_data.index_id,
query=query_data.query,
response=response.response,
created_at=current_time,
updated_at=current_time
)
message_dict = message_data.dict()
message_dict["debug_info"] = response.debug_info
message_dict["response_time"] = response.response_time
message_dict["cached"] = response.cached
message_dict["sources"] = response.debug_info.get("sources", [])
message_dict["context_used"] = response.debug_info.get("context_used")
message_dict["created_at"] = current_time
message_dict["updated_at"] = current_time
# Add separate timestamps for user message and assistant response
message_dict["user_timestamp"] = current_time
message_dict["assistant_timestamp"] = current_time
message_dict["deleted_by_user"] = False
await db.chat_messages.insert_one(message_dict)
except Exception as e:
print(f"Error saving chat message: {e}")
# Don't fail the request if we can't save the message