from fastapi import APIRouter, Depends, HTTPException from motor.motor_asyncio import AsyncIOMotorDatabase from typing import Dict, Any, List import time from datetime import datetime from bson import ObjectId from urllib.parse import unquote from ...config.database import get_database from ...core.auth import get_current_active_user # Cache import removed - caching disabled for data freshness from ...models.user import UserInDB from ...models.chat import ChatQuery, ChatResponse, ChatMessageCreate, ChatMessageInDB from ...services.rag_service import rag_service from ...services.llama_processor import llama_processor from ...services.chat_context_service import chat_context_service router = APIRouter() @router.post("/query", response_model=ChatResponse) async def chat_query( query_data: ChatQuery, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Process a chat query against a document index""" start_time = time.time() # Check if user has access to this index if current_user.role.value != "admin" and query_data.index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this index") # Check if index exists in database index = await db.indices.find_one({"index_id": query_data.index_id}) if not index: raise HTTPException( status_code=404, detail=f"Index '{query_data.index_id}' not found" ) # Check if index has processed documents processed_docs = await db.documents.count_documents({ "index_id": query_data.index_id, "processing_status": "completed", "embedding_status": "completed" }) # If no completed documents, check for any documents at all if processed_docs == 0: total_docs = await db.documents.count_documents({"index_id": query_data.index_id}) if total_docs == 0: raise HTTPException( status_code=400, detail=f"Index '{query_data.index_id}' has no documents. Please upload documents first." ) else: # Check processing status processing_docs = await db.documents.count_documents({ "index_id": query_data.index_id, "$or": [ {"processing_status": "processing"}, {"embedding_status": "processing"}, {"processing_status": "pending"}, {"embedding_status": "pending"} ] }) failed_docs = await db.documents.count_documents({ "index_id": query_data.index_id, "$or": [ {"processing_status": "failed"}, {"embedding_status": "failed"} ] }) if processing_docs > 0: raise HTTPException( status_code=400, detail=f"Index '{query_data.index_id}' has {processing_docs} documents still processing. Please wait for processing to complete." ) elif failed_docs > 0: raise HTTPException( status_code=400, detail=f"Index '{query_data.index_id}' has {failed_docs} documents that failed to process. Please check the admin panel and reprocess the documents." ) else: # Documents exist but status is unclear, check if any have parsed text docs_with_text = await db.documents.count_documents({ "index_id": query_data.index_id, "parsed_text": {"$exists": True, "$ne": None, "$ne": ""} }) if docs_with_text > 0: print(f"Warning: Index {query_data.index_id} has documents with unclear processing status but {docs_with_text} have parsed text") # Continue with the query attempt else: raise HTTPException( status_code=400, detail=f"Index '{query_data.index_id}' has documents but none have been processed successfully. Please check the admin panel." ) # Query vector store for relevant chunks try: vector_results = await llama_processor.query_documents( query_data.query, query_data.index_id, top_k=10 ) # Extract context chunks context_chunks = [result["content"] for result in vector_results] # Generate contextual response with conversation history ai_result = await chat_context_service.generate_contextual_response( query_data.query, query_data.index_id, str(current_user.id), db, context_chunks ) result = { "success": True, "response": ai_result["response"], "sources": vector_results, "context_used": ai_result.get("context_used"), "context_messages_count": ai_result.get("context_messages_count", 0) } except Exception as e: print(f"Error in chat query: {e}") # Handle specific ChromaDB errors more gracefully if "does not exist" in str(e) or "Collection" in str(e): # Check document count again total_docs = await db.documents.count_documents({"index_id": query_data.index_id}) if total_docs == 0: raise HTTPException( status_code=404, detail=f"No documents found in index '{query_data.index_id}'. Please upload documents first." ) else: raise HTTPException( status_code=404, detail=f"Vector database not ready for index '{query_data.index_id}'. The documents may still be processing. Please wait and try again." ) else: raise HTTPException( status_code=500, detail=f"Error processing query: {str(e)}" ) # Result is always successful at this point since we handle errors above # Prepare response response_time = time.time() - start_time debug_info = { "sources": result.get("sources", []), "context_used": result.get("context_used"), "context_messages_count": result.get("context_messages_count", 0), "cached": False, "response_time": response_time } response = ChatResponse( response=result["response"], debug_info=debug_info, cached=False, response_time=response_time ) # Save chat message to database await _save_chat_message( current_user.id, query_data, response, db ) return response @router.get("/history/{index_id}") async def get_chat_history( index_id: str, limit: int = 50, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get chat history for a specific index""" # Decode URL-encoded index_id decoded_index_id = unquote(index_id) # Check if user has access to this index if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this index") # Get chat messages in chronological order (oldest first), excluding soft-deleted cursor = db.chat_messages.find({ "user_id": current_user.id, "index_id": decoded_index_id, "deleted_by_user": {"$ne": True} }).sort("created_at", 1).limit(limit) messages = [] async for msg in cursor: message = ChatMessageInDB(**msg) # Use separate timestamps if available, otherwise use created_at user_time = msg.get("user_timestamp", message.created_at) assistant_time = msg.get("assistant_timestamp", message.created_at) messages.append({ "id": str(message.id), "query": message.query, "response": message.response, "created_at": message.created_at, "user_timestamp": user_time, "assistant_timestamp": assistant_time, "response_time": message.response_time, "cached": message.cached, "sources": message.sources, "context_used": message.context_used }) return {"messages": messages} @router.delete("/history/{index_id}") async def clear_chat_history( index_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Clear chat history for a specific index (soft delete)""" # Decode URL-encoded index_id decoded_index_id = unquote(index_id) # Check if user has access to this index if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this index") # Soft delete chat messages by marking them as deleted result = await db.chat_messages.update_many( { "user_id": current_user.id, "index_id": decoded_index_id, "deleted_by_user": {"$ne": True} }, { "$set": { "deleted_by_user": True, "updated_at": datetime.utcnow() } } ) # Note: Cache clearing removed - caching is disabled for data freshness return {"message": f"Cleared {result.modified_count} chat messages"} @router.get("/context/{index_id}") async def get_conversation_context( index_id: str, limit: int = 5, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get conversation context for debugging/display""" # Decode URL-encoded index_id decoded_index_id = unquote(index_id) # Check if user has access to this index if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this index") # Get conversation context context_messages = await chat_context_service.get_conversation_context( str(current_user.id), decoded_index_id, db, limit ) return { "context_messages": context_messages, "count": len(context_messages), "formatted_context": chat_context_service.format_context_for_ai(context_messages) } @router.get("/index-status/{index_id}") async def get_index_chat_status( index_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Check if an index is ready for chat queries""" # Decode URL-encoded index_id decoded_index_id = unquote(index_id) # Check if user has access to this index if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this index") # Check if index exists in database index = await db.indices.find_one({"index_id": decoded_index_id}) if not index: return { "ready": False, "reason": "Index not found", "details": { "index_exists": False, "total_documents": 0, "processed_documents": 0, "failed_documents": 0, "processing_documents": 0 } } # Get document statistics total_docs = await db.documents.count_documents({"index_id": decoded_index_id}) processed_docs = await db.documents.count_documents({ "index_id": decoded_index_id, "processing_status": "completed", "embedding_status": "completed" }) failed_docs = await db.documents.count_documents({ "index_id": decoded_index_id, "$or": [ {"processing_status": "failed"}, {"embedding_status": "failed"} ] }) processing_docs = await db.documents.count_documents({ "index_id": decoded_index_id, "$or": [ {"processing_status": "processing"}, {"embedding_status": "processing"}, {"processing_status": "pending"}, {"embedding_status": "pending"} ] }) # Check ChromaDB collection collection_info = llama_processor.get_collection_info(decoded_index_id) # Determine if ready ready = processed_docs > 0 and collection_info["exists"] and collection_info["count"] > 0 reason = "" if total_docs == 0: reason = "No documents uploaded" elif processed_docs == 0: if processing_docs > 0: reason = f"{processing_docs} documents still processing" elif failed_docs > 0: reason = f"All {failed_docs} documents failed to process" else: reason = "No documents have been processed successfully" elif not collection_info["exists"]: reason = "Vector database collection not found" elif collection_info["count"] == 0: reason = "Vector database collection is empty" return { "ready": ready, "reason": reason if not ready else "Index ready for queries", "details": { "index_exists": True, "index_name": index["name"], "total_documents": total_docs, "processed_documents": processed_docs, "failed_documents": failed_docs, "processing_documents": processing_docs, "collection_exists": collection_info["exists"], "collection_count": collection_info.get("count", 0), "collection_error": collection_info.get("error") } } async def _save_chat_message( user_id, query_data: ChatQuery, response: ChatResponse, db: AsyncIOMotorDatabase ): """Save chat message to database with proper timestamp""" try: current_time = datetime.utcnow() message_data = ChatMessageCreate( user_id=user_id, index_id=query_data.index_id, query=query_data.query, response=response.response, created_at=current_time, updated_at=current_time ) message_dict = message_data.dict() message_dict["debug_info"] = response.debug_info message_dict["response_time"] = response.response_time message_dict["cached"] = response.cached message_dict["sources"] = response.debug_info.get("sources", []) message_dict["context_used"] = response.debug_info.get("context_used") message_dict["created_at"] = current_time message_dict["updated_at"] = current_time # Add separate timestamps for user message and assistant response message_dict["user_timestamp"] = current_time message_dict["assistant_timestamp"] = current_time message_dict["deleted_by_user"] = False await db.chat_messages.insert_one(message_dict) except Exception as e: print(f"Error saving chat message: {e}") # Don't fail the request if we can't save the message