303 lines
No EOL
12 KiB
Python
303 lines
No EOL
12 KiB
Python
import os
|
|
import json
|
|
import asyncio
|
|
from typing import Dict, Any, List, Optional
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from llama_index.core import VectorStoreIndex, StorageContext, Settings
|
|
from llama_index.vector_stores.chroma import ChromaVectorStore
|
|
from llama_index.embeddings.openai import OpenAIEmbedding
|
|
from llama_index.llms.openai import OpenAI
|
|
import chromadb
|
|
|
|
from ..config.settings import settings
|
|
from ..models.index import IndexInDB
|
|
from ..core.chroma_client import chroma_singleton
|
|
|
|
class RAGService:
|
|
def __init__(self):
|
|
self.openai_api_key = settings.openai_api_key
|
|
self.llamaparse_api_key = settings.llamaparse_api_key
|
|
self.indices_dir = Path(settings.indices_dir)
|
|
self.upload_dir = Path(settings.upload_dir)
|
|
|
|
# Configure LlamaIndex settings
|
|
Settings.llm = OpenAI(
|
|
model="gpt-4o",
|
|
api_key=self.openai_api_key,
|
|
temperature=0.1
|
|
)
|
|
|
|
Settings.embed_model = OpenAIEmbedding(
|
|
model="text-embedding-3-small",
|
|
api_key=self.openai_api_key
|
|
)
|
|
|
|
# Ensure directories exist
|
|
self.indices_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def get_chroma_client(self):
|
|
"""Get or create ChromaDB client using shared singleton"""
|
|
chroma_db_path = str(self.indices_dir / "chroma_db")
|
|
return chroma_singleton.get_client(chroma_db_path)
|
|
|
|
# NOTE: Index creation is now handled by LlamaProcessor
|
|
# This method is deprecated and should not be used
|
|
|
|
async def query_index(
|
|
self,
|
|
index_id: str,
|
|
query: str,
|
|
top_k: int = 10
|
|
) -> Dict[str, Any]:
|
|
"""Query an existing index"""
|
|
try:
|
|
index_dir = self.indices_dir / index_id
|
|
if not index_dir.exists():
|
|
return {
|
|
"success": False,
|
|
"message": f"Index {index_id} not found"
|
|
}
|
|
|
|
# Ensure consistent embedding model before querying
|
|
embedding_model = OpenAIEmbedding(
|
|
model="text-embedding-3-small",
|
|
api_key=self.openai_api_key
|
|
)
|
|
Settings.embed_model = embedding_model
|
|
|
|
# DEBUG: Log embedding model details
|
|
print(f"🔍 DEBUG - Embedding Model Configuration:")
|
|
print(f" - Model: text-embedding-3-small")
|
|
print(f" - API Key present: {bool(self.openai_api_key)}")
|
|
print(f" - Settings.embed_model: {type(Settings.embed_model).__name__}")
|
|
|
|
# Test embedding to get dimensions
|
|
try:
|
|
test_embedding = embedding_model.get_text_embedding("test")
|
|
print(f" - Test embedding dimensions: {len(test_embedding)}")
|
|
except Exception as e:
|
|
print(f" - ERROR getting test embedding: {e}")
|
|
|
|
# Load index (use consistent collection naming)
|
|
chroma_client = self.get_chroma_client()
|
|
collection_name = f"index_{index_id}"
|
|
|
|
# DEBUG: ChromaDB collection info
|
|
try:
|
|
chroma_collection = chroma_client.get_collection(name=collection_name)
|
|
collection_metadata = chroma_collection.metadata
|
|
collection_count = chroma_collection.count()
|
|
print(f"🔍 DEBUG - ChromaDB Collection:")
|
|
print(f" - Collection name: {collection_name}")
|
|
print(f" - Document count: {collection_count}")
|
|
print(f" - Collection metadata: {collection_metadata}")
|
|
|
|
# Try to peek at a few vectors to check dimensions
|
|
if collection_count > 0:
|
|
peek_result = chroma_collection.peek(limit=1)
|
|
if peek_result and 'embeddings' in peek_result and peek_result['embeddings']:
|
|
stored_dim = len(peek_result['embeddings'][0]) if peek_result['embeddings'][0] else "None"
|
|
print(f" - Stored vector dimensions: {stored_dim}")
|
|
else:
|
|
print(f" - No embeddings found in peek result")
|
|
except Exception as e:
|
|
print(f" - ERROR accessing collection: {e}")
|
|
raise e
|
|
|
|
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
|
|
storage_context = StorageContext.from_defaults(vector_store=vector_store)
|
|
|
|
# Load the index
|
|
index = VectorStoreIndex.from_vector_store(
|
|
vector_store=vector_store,
|
|
storage_context=storage_context
|
|
)
|
|
|
|
# Create query engine
|
|
query_engine = index.as_query_engine(
|
|
similarity_top_k=top_k,
|
|
response_mode="compact"
|
|
)
|
|
|
|
# DEBUG: Query execution details
|
|
print(f"🔍 DEBUG - Query Execution:")
|
|
print(f" - Query: '{query}'")
|
|
print(f" - Top K: {top_k}")
|
|
print(f" - Current Settings.embed_model: {type(Settings.embed_model).__name__}")
|
|
|
|
# Test query embedding before execution
|
|
try:
|
|
query_embedding = Settings.embed_model.get_text_embedding(query)
|
|
print(f" - Query embedding dimensions: {len(query_embedding)}")
|
|
except Exception as e:
|
|
print(f" - ERROR getting query embedding: {e}")
|
|
|
|
# Execute query
|
|
start_time = datetime.now()
|
|
print(f" - Starting query execution at {start_time}")
|
|
try:
|
|
response = query_engine.query(query)
|
|
print(f" - Query executed successfully")
|
|
except Exception as e:
|
|
print(f" - ERROR during query execution: {e}")
|
|
print(f" - Error type: {type(e).__name__}")
|
|
raise e
|
|
end_time = datetime.now()
|
|
|
|
# Extract source information
|
|
source_info = []
|
|
if hasattr(response, 'source_nodes'):
|
|
for node in response.source_nodes:
|
|
source_info.append({
|
|
"filename": node.metadata.get('filename', 'Unknown'),
|
|
"score": node.score,
|
|
"text_snippet": node.text[:200] + "..." if len(node.text) > 200 else node.text
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"response": str(response),
|
|
"sources": source_info,
|
|
"query_time": (end_time - start_time).total_seconds(),
|
|
"debug": {
|
|
"query": query,
|
|
"index_id": index_id,
|
|
"top_k": top_k,
|
|
"source_count": len(source_info)
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error querying index: {e}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error querying index: {str(e)}"
|
|
}
|
|
|
|
# NOTE: Document loading is now handled by LlamaProcessor
|
|
# This method is deprecated and should not be used
|
|
|
|
# NOTE: LlamaParse processing is now handled by LlamaProcessor
|
|
# This method is deprecated and should not be used
|
|
|
|
# NOTE: Document reading is now handled by LlamaProcessor
|
|
# This method is deprecated and should not be used
|
|
|
|
# NOTE: Document addition to index is now handled by LlamaProcessor
|
|
# This method is deprecated and should not be used
|
|
|
|
async def delete_index(self, index_id: str) -> bool:
|
|
"""Delete an index and all associated files"""
|
|
try:
|
|
index_dir = self.indices_dir / index_id
|
|
if index_dir.exists():
|
|
shutil.rmtree(index_dir)
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error deleting index: {e}")
|
|
return False
|
|
|
|
async def delete_index_complete(self, index_id: str) -> Dict[str, Any]:
|
|
"""Complete index deletion including ChromaDB cleanup"""
|
|
try:
|
|
# Delete vector index files
|
|
file_success = await self.delete_index(index_id)
|
|
|
|
# Delete ChromaDB collection
|
|
chroma_client = self.get_chroma_client()
|
|
collection_name = f"index_{index_id}"
|
|
|
|
collection_deleted = False
|
|
try:
|
|
chroma_client.delete_collection(collection_name)
|
|
collection_deleted = True
|
|
print(f"Successfully deleted ChromaDB collection: {collection_name}")
|
|
except Exception as e:
|
|
print(f"Warning: Could not delete ChromaDB collection {collection_name}: {e}")
|
|
|
|
# Clean up orphaned metadata in shared ChromaDB database
|
|
metadata_cleaned = self._cleanup_chromadb_metadata(index_id)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Index completely deleted",
|
|
"details": {
|
|
"files_deleted": file_success,
|
|
"collection_deleted": collection_deleted,
|
|
"metadata_cleaned": metadata_cleaned
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"message": f"Error during complete index deletion: {str(e)}"
|
|
}
|
|
|
|
def _cleanup_chromadb_metadata(self, index_id: str) -> bool:
|
|
"""Clean up orphaned metadata in ChromaDB SQLite database for specific index"""
|
|
import sqlite3
|
|
|
|
chroma_db_path = str(self.indices_dir / "chroma_db" / "chroma.sqlite3")
|
|
collection_name = f"index_{index_id}"
|
|
|
|
try:
|
|
with sqlite3.connect(chroma_db_path) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get the collection_id first
|
|
cursor.execute("""
|
|
SELECT id FROM collections WHERE name = ?
|
|
""", (collection_name,))
|
|
collection_result = cursor.fetchone()
|
|
|
|
if collection_result:
|
|
collection_id = collection_result[0]
|
|
|
|
# Delete embedding metadata for this specific collection
|
|
cursor.execute("""
|
|
DELETE FROM embedding_metadata
|
|
WHERE id IN (
|
|
SELECT em.id FROM embedding_metadata em
|
|
JOIN embeddings e ON em.id = e.id
|
|
WHERE e.collection_id = ?
|
|
)
|
|
""", (collection_id,))
|
|
|
|
metadata_count = cursor.rowcount
|
|
|
|
# Delete embeddings for this collection
|
|
cursor.execute("""
|
|
DELETE FROM embeddings
|
|
WHERE collection_id = ?
|
|
""", (collection_id,))
|
|
|
|
embedding_count = cursor.rowcount
|
|
|
|
# Delete the collection record
|
|
cursor.execute("""
|
|
DELETE FROM collections
|
|
WHERE id = ?
|
|
""", (collection_id,))
|
|
|
|
conn.commit()
|
|
print(f"Cleaned up ChromaDB metadata for {collection_name}: {metadata_count} metadata entries, {embedding_count} embeddings")
|
|
return True
|
|
else:
|
|
print(f"No collection found with name {collection_name}")
|
|
return True # Not an error if collection doesn't exist
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Could not clean up ChromaDB metadata for {collection_name}: {e}")
|
|
return False
|
|
|
|
def index_exists(self, index_id: str) -> bool:
|
|
"""Check if an index exists"""
|
|
index_dir = self.indices_dir / index_id
|
|
return index_dir.exists()
|
|
|
|
# Global RAG service instance
|
|
rag_service = RAGService() |