contract-query/backend/app/services/rag_service.py
2025-08-14 15:03:33 -05:00

303 lines
No EOL
12 KiB
Python

import os
import json
import asyncio
from typing import Dict, Any, List, Optional
from pathlib import Path
from datetime import datetime
from llama_index.core import VectorStoreIndex, StorageContext, Settings
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
import chromadb
from ..config.settings import settings
from ..models.index import IndexInDB
from ..core.chroma_client import chroma_singleton
class RAGService:
def __init__(self):
self.openai_api_key = settings.openai_api_key
self.llamaparse_api_key = settings.llamaparse_api_key
self.indices_dir = Path(settings.indices_dir)
self.upload_dir = Path(settings.upload_dir)
# Configure LlamaIndex settings
Settings.llm = OpenAI(
model="gpt-4o",
api_key=self.openai_api_key,
temperature=0.1
)
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=self.openai_api_key
)
# Ensure directories exist
self.indices_dir.mkdir(parents=True, exist_ok=True)
def get_chroma_client(self):
"""Get or create ChromaDB client using shared singleton"""
chroma_db_path = str(self.indices_dir / "chroma_db")
return chroma_singleton.get_client(chroma_db_path)
# NOTE: Index creation is now handled by LlamaProcessor
# This method is deprecated and should not be used
async def query_index(
self,
index_id: str,
query: str,
top_k: int = 10
) -> Dict[str, Any]:
"""Query an existing index"""
try:
index_dir = self.indices_dir / index_id
if not index_dir.exists():
return {
"success": False,
"message": f"Index {index_id} not found"
}
# Ensure consistent embedding model before querying
embedding_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=self.openai_api_key
)
Settings.embed_model = embedding_model
# DEBUG: Log embedding model details
print(f"🔍 DEBUG - Embedding Model Configuration:")
print(f" - Model: text-embedding-3-small")
print(f" - API Key present: {bool(self.openai_api_key)}")
print(f" - Settings.embed_model: {type(Settings.embed_model).__name__}")
# Test embedding to get dimensions
try:
test_embedding = embedding_model.get_text_embedding("test")
print(f" - Test embedding dimensions: {len(test_embedding)}")
except Exception as e:
print(f" - ERROR getting test embedding: {e}")
# Load index (use consistent collection naming)
chroma_client = self.get_chroma_client()
collection_name = f"index_{index_id}"
# DEBUG: ChromaDB collection info
try:
chroma_collection = chroma_client.get_collection(name=collection_name)
collection_metadata = chroma_collection.metadata
collection_count = chroma_collection.count()
print(f"🔍 DEBUG - ChromaDB Collection:")
print(f" - Collection name: {collection_name}")
print(f" - Document count: {collection_count}")
print(f" - Collection metadata: {collection_metadata}")
# Try to peek at a few vectors to check dimensions
if collection_count > 0:
peek_result = chroma_collection.peek(limit=1)
if peek_result and 'embeddings' in peek_result and peek_result['embeddings']:
stored_dim = len(peek_result['embeddings'][0]) if peek_result['embeddings'][0] else "None"
print(f" - Stored vector dimensions: {stored_dim}")
else:
print(f" - No embeddings found in peek result")
except Exception as e:
print(f" - ERROR accessing collection: {e}")
raise e
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# Load the index
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
storage_context=storage_context
)
# Create query engine
query_engine = index.as_query_engine(
similarity_top_k=top_k,
response_mode="compact"
)
# DEBUG: Query execution details
print(f"🔍 DEBUG - Query Execution:")
print(f" - Query: '{query}'")
print(f" - Top K: {top_k}")
print(f" - Current Settings.embed_model: {type(Settings.embed_model).__name__}")
# Test query embedding before execution
try:
query_embedding = Settings.embed_model.get_text_embedding(query)
print(f" - Query embedding dimensions: {len(query_embedding)}")
except Exception as e:
print(f" - ERROR getting query embedding: {e}")
# Execute query
start_time = datetime.now()
print(f" - Starting query execution at {start_time}")
try:
response = query_engine.query(query)
print(f" - Query executed successfully")
except Exception as e:
print(f" - ERROR during query execution: {e}")
print(f" - Error type: {type(e).__name__}")
raise e
end_time = datetime.now()
# Extract source information
source_info = []
if hasattr(response, 'source_nodes'):
for node in response.source_nodes:
source_info.append({
"filename": node.metadata.get('filename', 'Unknown'),
"score": node.score,
"text_snippet": node.text[:200] + "..." if len(node.text) > 200 else node.text
})
return {
"success": True,
"response": str(response),
"sources": source_info,
"query_time": (end_time - start_time).total_seconds(),
"debug": {
"query": query,
"index_id": index_id,
"top_k": top_k,
"source_count": len(source_info)
}
}
except Exception as e:
print(f"Error querying index: {e}")
return {
"success": False,
"message": f"Error querying index: {str(e)}"
}
# NOTE: Document loading is now handled by LlamaProcessor
# This method is deprecated and should not be used
# NOTE: LlamaParse processing is now handled by LlamaProcessor
# This method is deprecated and should not be used
# NOTE: Document reading is now handled by LlamaProcessor
# This method is deprecated and should not be used
# NOTE: Document addition to index is now handled by LlamaProcessor
# This method is deprecated and should not be used
async def delete_index(self, index_id: str) -> bool:
"""Delete an index and all associated files"""
try:
index_dir = self.indices_dir / index_id
if index_dir.exists():
shutil.rmtree(index_dir)
return True
return False
except Exception as e:
print(f"Error deleting index: {e}")
return False
async def delete_index_complete(self, index_id: str) -> Dict[str, Any]:
"""Complete index deletion including ChromaDB cleanup"""
try:
# Delete vector index files
file_success = await self.delete_index(index_id)
# Delete ChromaDB collection
chroma_client = self.get_chroma_client()
collection_name = f"index_{index_id}"
collection_deleted = False
try:
chroma_client.delete_collection(collection_name)
collection_deleted = True
print(f"Successfully deleted ChromaDB collection: {collection_name}")
except Exception as e:
print(f"Warning: Could not delete ChromaDB collection {collection_name}: {e}")
# Clean up orphaned metadata in shared ChromaDB database
metadata_cleaned = self._cleanup_chromadb_metadata(index_id)
return {
"success": True,
"message": "Index completely deleted",
"details": {
"files_deleted": file_success,
"collection_deleted": collection_deleted,
"metadata_cleaned": metadata_cleaned
}
}
except Exception as e:
return {
"success": False,
"message": f"Error during complete index deletion: {str(e)}"
}
def _cleanup_chromadb_metadata(self, index_id: str) -> bool:
"""Clean up orphaned metadata in ChromaDB SQLite database for specific index"""
import sqlite3
chroma_db_path = str(self.indices_dir / "chroma_db" / "chroma.sqlite3")
collection_name = f"index_{index_id}"
try:
with sqlite3.connect(chroma_db_path) as conn:
cursor = conn.cursor()
# Get the collection_id first
cursor.execute("""
SELECT id FROM collections WHERE name = ?
""", (collection_name,))
collection_result = cursor.fetchone()
if collection_result:
collection_id = collection_result[0]
# Delete embedding metadata for this specific collection
cursor.execute("""
DELETE FROM embedding_metadata
WHERE id IN (
SELECT em.id FROM embedding_metadata em
JOIN embeddings e ON em.id = e.id
WHERE e.collection_id = ?
)
""", (collection_id,))
metadata_count = cursor.rowcount
# Delete embeddings for this collection
cursor.execute("""
DELETE FROM embeddings
WHERE collection_id = ?
""", (collection_id,))
embedding_count = cursor.rowcount
# Delete the collection record
cursor.execute("""
DELETE FROM collections
WHERE id = ?
""", (collection_id,))
conn.commit()
print(f"Cleaned up ChromaDB metadata for {collection_name}: {metadata_count} metadata entries, {embedding_count} embeddings")
return True
else:
print(f"No collection found with name {collection_name}")
return True # Not an error if collection doesn't exist
except Exception as e:
print(f"Warning: Could not clean up ChromaDB metadata for {collection_name}: {e}")
return False
def index_exists(self, index_id: str) -> bool:
"""Check if an index exists"""
index_dir = self.indices_dir / index_id
return index_dir.exists()
# Global RAG service instance
rag_service = RAGService()