from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form from motor.motor_asyncio import AsyncIOMotorDatabase from typing import List, Optional from bson import ObjectId from datetime import datetime from urllib.parse import unquote from ...config.database import get_database from ...config.settings import settings from ...core.auth import get_current_admin_user from ...models.user import UserInDB, UserUpdate, UserRole from ...models.document import DocumentInDB from ...models.index import IndexInDB from ...services.llama_processor import llama_processor router = APIRouter() @router.get("/users", response_model=List[dict]) async def get_all_users( admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get all users (admin only)""" users = [] cursor = db.users.find({}) async for user in cursor: user_obj = UserInDB(**user) users.append({ "id": str(user_obj.id), "email": user_obj.email, "role": user_obj.role, "is_active": user_obj.is_active, "index_access": user_obj.index_access, "created_at": user_obj.created_at, "updated_at": user_obj.updated_at }) return users @router.post("/users") async def create_user( user_data: dict, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Create a new user (admin only)""" try: from ...core.security import get_password_hash # Check if email already exists existing_user = await db.users.find_one({"email": user_data["email"]}) if existing_user: raise HTTPException(status_code=400, detail="Email already registered") # Hash the password hashed_password = get_password_hash(user_data["password"]) # Create user document new_user = { "email": user_data["email"], "hashed_password": hashed_password, "role": user_data.get("role", "user"), "is_active": user_data.get("is_active", True), "index_access": [], "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } # Insert user result = await db.users.insert_one(new_user) return { "message": "User created successfully", "user_id": str(result.inserted_id), "email": user_data["email"] } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Error creating user: {str(e)}") @router.put("/users/{user_id}") async def update_user( user_id: str, user_update: UserUpdate, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Update a user (admin only)""" # Check if user exists user = await db.users.find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException(status_code=404, detail="User not found") # Prepare update data update_data = {} if user_update.email is not None: # Check if email is already taken existing = await db.users.find_one({ "email": user_update.email, "_id": {"$ne": ObjectId(user_id)} }) if existing: raise HTTPException(status_code=400, detail="Email already in use") update_data["email"] = user_update.email if user_update.role is not None: update_data["role"] = user_update.role if user_update.is_active is not None: update_data["is_active"] = user_update.is_active if user_update.password is not None: from ...core.security import get_password_hash update_data["hashed_password"] = get_password_hash(user_update.password) if update_data: update_data["updated_at"] = datetime.utcnow() await db.users.update_one( {"_id": ObjectId(user_id)}, {"$set": update_data} ) return {"message": "User updated successfully"} @router.delete("/users/{user_id}") async def delete_user( user_id: str, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Delete a user (admin only)""" # Don't allow admin to delete themselves if str(admin_user.id) == user_id: raise HTTPException(status_code=400, detail="Cannot delete your own account") # Check if user exists user = await db.users.find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException(status_code=404, detail="User not found") # Delete user await db.users.delete_one({"_id": ObjectId(user_id)}) return {"message": "User deleted successfully"} @router.post("/users/{user_id}/grant-index-access") async def grant_index_access( user_id: str, request_data: dict, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Grant user access to an index""" index_id = request_data.get("index_id") if not index_id: raise HTTPException(status_code=400, detail="index_id is required") # Check if user exists user = await db.users.find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException(status_code=404, detail="User not found") # Check if index exists index = await db.indices.find_one({"index_id": index_id}) if not index: raise HTTPException(status_code=404, detail="Index not found") # Grant access await db.users.update_one( {"_id": ObjectId(user_id)}, {"$addToSet": {"index_access": index_id}} ) return {"message": "Index access granted successfully"} @router.post("/users/{user_id}/revoke-index-access") async def revoke_index_access( user_id: str, request_data: dict, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Revoke user access to an index""" index_id = request_data.get("index_id") if not index_id: raise HTTPException(status_code=400, detail="index_id is required") # Check if user exists user = await db.users.find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException(status_code=404, detail="User not found") # Revoke access await db.users.update_one( {"_id": ObjectId(user_id)}, {"$pull": {"index_access": index_id}} ) return {"message": "Index access revoked successfully"} @router.post("/grant-all-indices/{user_id}") async def grant_all_indices_access( user_id: str, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Grant user access to all indices""" # Check if user exists user = await db.users.find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException(status_code=404, detail="User not found") # Get all active indices indices = [] cursor = db.indices.find({"status": "active"}) async for index in cursor: indices.append(index["index_id"]) # Grant access to all indices await db.users.update_one( {"_id": ObjectId(user_id)}, {"$set": {"index_access": indices}} ) return { "message": "Access granted to all indices", "index_count": len(indices) } @router.get("/stats") async def get_system_stats( admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get system statistics""" # Count users total_users = await db.users.count_documents({}) active_users = await db.users.count_documents({"is_active": True}) admin_users = await db.users.count_documents({"role": UserRole.ADMIN}) # Count indices total_indices = await db.indices.count_documents({"status": "active"}) # Count documents total_documents = await db.documents.count_documents({}) pending_documents = await db.documents.count_documents({"processing_status": "pending"}) processing_documents = await db.documents.count_documents({"processing_status": "processing"}) completed_documents = await db.documents.count_documents({"processing_status": "completed"}) failed_documents = await db.documents.count_documents({"processing_status": "failed"}) # Count chat messages total_messages = await db.chat_messages.count_documents({}) return { "users": { "total": total_users, "active": active_users, "admins": admin_users }, "indices": { "total": total_indices }, "documents": { "total": total_documents, "pending": pending_documents, "processing": processing_documents, "completed": completed_documents, "failed": failed_documents }, "chat_messages": { "total": total_messages } } @router.post("/documents/upload-single") async def upload_single_document( file: UploadFile = File(...), index_id: str = Form(...), custom_name: Optional[str] = Form(None), admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Upload a single document for processing (admin only)""" # Verify index exists index = await db.indices.find_one({"index_id": index_id}) if not index: raise HTTPException(status_code=404, detail="Index not found") # Process document document = await llama_processor.process_single_file( file, index_id, admin_user, db, custom_name ) return { "message": "Document uploaded and processing started", "document_id": str(document.id), "filename": document.original_filename, "status": document.processing_status } @router.post("/documents/upload-multiple") async def upload_multiple_documents( files: List[UploadFile] = File(...), index_id: str = Form(...), base_name: str = Form(...), admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Upload multiple documents for processing (admin only)""" # Verify index exists index = await db.indices.find_one({"index_id": index_id}) if not index: raise HTTPException(status_code=404, detail="Index not found") # Process documents documents = await llama_processor.process_multiple_files( files, index_id, admin_user, db, base_name ) return { "message": f"Successfully uploaded {len(documents)} documents", "documents": [ { "id": str(doc.id), "filename": doc.original_filename, "status": doc.processing_status } for doc in documents ] } @router.get("/documents/processing-status") async def get_processing_status( admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get overall processing status (admin only)""" # Get counts for each status statuses = { "pending": 0, "processing": 0, "completed": 0, "failed": 0 } # Count processing status for status in statuses.keys(): statuses[status] = await db.documents.count_documents({ "processing_status": status }) # Count embedding status embedding_statuses = {} for status in statuses.keys(): embedding_statuses[status] = await db.documents.count_documents({ "embedding_status": status }) # Get documents with unclear status unclear_docs = await db.documents.count_documents({ "$or": [ {"processing_status": {"$exists": False}}, {"embedding_status": {"$exists": False}} ] }) return { "processing_status": statuses, "embedding_status": embedding_statuses, "unclear_status_count": unclear_docs, "total_documents": sum(statuses.values()) } @router.get("/documents/{index_id}") async def get_index_documents( index_id: str, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get all documents for an index (admin only)""" # Decode URL-encoded index_id decoded_index_id = unquote(index_id) # Verify index exists index = await db.indices.find_one({"index_id": decoded_index_id}) if not index: raise HTTPException(status_code=404, detail="Index not found") # Get documents documents = [] cursor = db.documents.find({"index_id": decoded_index_id}) async for doc in cursor: documents.append({ "id": str(doc["_id"]), "filename": doc["original_filename"], "file_size": doc["file_size"], "processing_status": doc["processing_status"], "embedding_status": doc["embedding_status"], "chunk_count": doc.get("chunk_count", 0), "created_at": doc["created_at"], "updated_at": doc["updated_at"] }) return { "index_id": decoded_index_id, "index_name": index["name"], "documents": documents, "total_documents": len(documents) } @router.post("/documents/{document_id}/reprocess") async def reprocess_document( document_id: str, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Reprocess a document (admin only)""" # Get document document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: raise HTTPException(status_code=404, detail="Document not found") # Reset processing status await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "processing_status": "pending", "embedding_status": "pending", "parsed_text": None, "text_chunks": None, "chunk_count": 0, "vector_ids": None, "updated_at": datetime.utcnow() }} ) # Create document object for reprocessing doc_obj = DocumentInDB(**document) # Start reprocessing import asyncio asyncio.create_task(llama_processor._process_document_async(doc_obj, db)) return { "message": "Document reprocessing started", "document_id": document_id, "status": "pending" } @router.delete("/documents/{document_id}") async def delete_document( document_id: str, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Delete a document and its embeddings (admin only)""" # Get document document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: raise HTTPException(status_code=404, detail="Document not found") # Use the enhanced document processor for complete cleanup from ...services.document_processor import document_processor success = await document_processor.delete_document(document_id, db) if not success: raise HTTPException(status_code=500, detail="Failed to delete document") return { "message": "Document deleted successfully", "document_id": document_id } @router.post("/chat/query") async def admin_chat_query( query: str = Form(...), index_id: str = Form(...), top_k: int = Form(5), admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Query documents using RAG (admin only)""" # Verify index exists index = await db.indices.find_one({"index_id": index_id}) if not index: raise HTTPException(status_code=404, detail="Index not found") try: # Query vector store results = await llama_processor.query_documents( query, index_id, top_k ) # Extract context chunks context_chunks = [result["content"] for result in results] # Generate response response = await llama_processor.generate_response( query, context_chunks, index_id ) return { "query": query, "response": response, "sources": results, "index_id": index_id } except Exception as e: raise HTTPException( status_code=500, detail=f"Error processing query: {str(e)}" ) @router.get("/indices") async def get_all_indices( admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get all indices (admin only)""" indices = [] cursor = db.indices.find({"status": "active"}) async for index in cursor: # Count documents for this index doc_count = await db.documents.count_documents({"index_id": index["index_id"]}) indices.append({ "id": str(index["_id"]), "index_id": index["index_id"], "name": index["name"], "description": index.get("description", ""), "document_count": doc_count, "created_by": str(index["created_by"]), "created_at": index["created_at"], "chunk_size": index.get("chunk_size", 1000), "chunk_overlap": index.get("chunk_overlap", 200) }) return { "indices": indices, "total": len(indices) } @router.post("/indices/create") async def create_index( name: str = Form(...), description: Optional[str] = Form(None), chunk_size: int = Form(1000), chunk_overlap: int = Form(200), admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Create a new index (admin only)""" import uuid # Generate unique index ID index_id = str(uuid.uuid4()) # Create index record index_data = { "index_id": index_id, "name": name, "description": description, "created_by": admin_user.id, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), "status": "active", "document_count": 0, "chunk_size": chunk_size, "chunk_overlap": chunk_overlap, "embedding_model": "text-embedding-ada-002", "settings": {} } # Save to database result = await db.indices.insert_one(index_data) return { "message": "Index created successfully", "index_id": index_id, "name": name, "id": str(result.inserted_id) } @router.delete("/indices/{index_id}") async def delete_index( index_id: str, admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Delete an index and all its documents (admin only)""" # Decode URL-encoded index_id decoded_index_id = unquote(index_id) # Check if index exists index = await db.indices.find_one({"index_id": decoded_index_id}) if not index: raise HTTPException(status_code=404, detail="Index not found") try: # Get all documents for this index documents_cursor = db.documents.find({"index_id": decoded_index_id}) document_count = 0 async for doc in documents_cursor: document_count += 1 # Delete embeddings from vector store await llama_processor.delete_document_embeddings( str(doc["_id"]), decoded_index_id ) # Delete file from filesystem from pathlib import Path file_path = Path(doc["file_path"]) if file_path.exists(): try: file_path.unlink() except Exception as e: print(f"Error deleting file {file_path}: {e}") # Delete all documents for this index await db.documents.delete_many({"index_id": decoded_index_id}) # Delete all chat messages for this index chat_result = await db.chat_messages.delete_many({"index_id": decoded_index_id}) # Delete the index record await db.indices.delete_one({"index_id": decoded_index_id}) # Use complete ChromaDB cleanup instead of manual deletion from ...services.rag_service import rag_service cleanup_result = await rag_service.delete_index_complete(decoded_index_id) if not cleanup_result["success"]: print(f"Warning during complete index cleanup: {cleanup_result['message']}") # Note: Cache clearing removed - caching is disabled for data freshness # Delete index upload directory from pathlib import Path index_dir = Path(settings.upload_dir) / decoded_index_id if index_dir.exists(): try: import shutil shutil.rmtree(index_dir) except Exception as e: print(f"Error deleting index directory {index_dir}: {e}") return { "message": "Index deleted successfully", "index_id": decoded_index_id, "documents_deleted": document_count, "chat_messages_deleted": chat_result.deleted_count } except Exception as e: raise HTTPException( status_code=500, detail=f"Error deleting index: {str(e)}" ) @router.post("/documents/process-pending") async def process_pending_documents( admin_user: UserInDB = Depends(get_current_admin_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Process all pending documents (admin only)""" import asyncio # Find all documents that are pending or failed cursor = db.documents.find({ "$or": [ {"processing_status": "pending"}, {"processing_status": "failed"}, {"embedding_status": "pending"}, {"embedding_status": "failed"} ] }) processed_count = 0 async for doc in cursor: try: document = DocumentInDB(**doc) # Start processing asyncio.create_task(llama_processor._process_document_async(document, db)) processed_count += 1 except Exception as e: print(f"Error queueing document {doc['_id']} for processing: {e}") return { "message": f"Queued {processed_count} documents for processing", "count": processed_count }