881 lines
No EOL
35 KiB
Python
881 lines
No EOL
35 KiB
Python
import os
|
|
import uuid
|
|
import asyncio
|
|
from typing import List, Dict, Any, Optional, Union
|
|
from pathlib import Path
|
|
import aiofiles
|
|
from fastapi import UploadFile, HTTPException
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
from bson import ObjectId
|
|
from datetime import datetime
|
|
|
|
from llama_index.core import (
|
|
VectorStoreIndex,
|
|
StorageContext,
|
|
Settings,
|
|
Document as LlamaDocument
|
|
)
|
|
from llama_index.core.node_parser import SemanticSplitterNodeParser
|
|
from llama_index.core.embeddings import BaseEmbedding
|
|
from llama_index.embeddings.openai import OpenAIEmbedding
|
|
from llama_index.llms.openai import OpenAI
|
|
from llama_index.vector_stores.chroma import ChromaVectorStore
|
|
from llama_cloud_services import LlamaParse
|
|
import chromadb
|
|
from chromadb.config import Settings as ChromaSettings
|
|
from chromadb.utils import embedding_functions
|
|
|
|
from ..config.settings import settings
|
|
from ..models.document import DocumentInDB, DocumentCreate
|
|
from ..core.chroma_client import chroma_singleton
|
|
from ..models.index import IndexInDB
|
|
from ..models.user import UserInDB
|
|
from ..utils.file_utils import validate_file, get_file_info
|
|
from ..services.contract_summary_service import contract_summary_service
|
|
|
|
|
|
class LlamaProcessor:
|
|
def __init__(self):
|
|
self.upload_dir = Path(settings.upload_dir)
|
|
self.indices_dir = Path(settings.indices_dir)
|
|
self.allowed_extensions = {
|
|
'.pdf', '.docx', '.doc', '.txt', '.csv', '.json', '.html', '.md', '.rtf'
|
|
}
|
|
self.max_file_size = 50 * 1024 * 1024 # 50MB
|
|
|
|
# Initialize LlamaIndex components
|
|
self._setup_llama_index()
|
|
|
|
# ChromaDB client managed by singleton
|
|
|
|
def get_chroma_client(self):
|
|
"""Get or create ChromaDB client using shared singleton"""
|
|
chroma_db_path = str(self.indices_dir / "chroma_db")
|
|
return chroma_singleton.get_client(chroma_db_path)
|
|
|
|
def _setup_llama_index(self):
|
|
"""Setup LlamaIndex components"""
|
|
# Configure OpenAI
|
|
Settings.llm = OpenAI(
|
|
model="gpt-4o",
|
|
api_key=settings.openai_api_key,
|
|
temperature=0.1
|
|
)
|
|
|
|
# Configure embeddings
|
|
Settings.embed_model = OpenAIEmbedding(
|
|
model="text-embedding-3-small",
|
|
api_key=settings.openai_api_key
|
|
)
|
|
|
|
# Configure semantic text splitter
|
|
Settings.text_splitter = SemanticSplitterNodeParser.from_defaults(
|
|
embed_model=OpenAIEmbedding(
|
|
model="text-embedding-3-small",
|
|
api_key=settings.openai_api_key
|
|
),
|
|
buffer_size=2,
|
|
breakpoint_percentile_threshold=70
|
|
)
|
|
|
|
async def process_single_file(
|
|
self,
|
|
file: UploadFile,
|
|
index_id: str,
|
|
user: UserInDB,
|
|
db: AsyncIOMotorDatabase,
|
|
custom_name: Optional[str] = None
|
|
) -> DocumentInDB:
|
|
"""Process a single uploaded file"""
|
|
# Validate file
|
|
await self._validate_file(file)
|
|
|
|
# Generate unique filename
|
|
file_extension = Path(file.filename).suffix.lower()
|
|
unique_filename = f"{uuid.uuid4()}{file_extension}"
|
|
|
|
# Create index-specific upload directory
|
|
index_upload_dir = self.upload_dir / index_id
|
|
index_upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save file
|
|
file_path = index_upload_dir / unique_filename
|
|
await self._save_file(file, file_path)
|
|
|
|
# Create document record
|
|
document_name = custom_name or file.filename
|
|
document_data = DocumentCreate(
|
|
filename=unique_filename,
|
|
original_filename=document_name,
|
|
file_size=file.size,
|
|
content_type=file.content_type,
|
|
index_id=index_id,
|
|
uploaded_by=user.id
|
|
)
|
|
|
|
document_dict = document_data.dict()
|
|
document_dict["file_path"] = str(file_path)
|
|
document_dict["processing_status"] = "pending"
|
|
document_dict["embedding_status"] = "pending"
|
|
document_dict["summary_status"] = "pending"
|
|
document_dict["metadata"] = {}
|
|
document_dict["created_at"] = datetime.utcnow()
|
|
document_dict["updated_at"] = datetime.utcnow()
|
|
|
|
# Save to database
|
|
result = await db.documents.insert_one(document_dict)
|
|
document_dict["_id"] = result.inserted_id
|
|
|
|
document = DocumentInDB(**document_dict)
|
|
|
|
# Process document asynchronously
|
|
asyncio.create_task(self._process_document_async(document, db))
|
|
|
|
return document
|
|
|
|
async def process_multiple_files(
|
|
self,
|
|
files: List[UploadFile],
|
|
index_id: str,
|
|
user: UserInDB,
|
|
db: AsyncIOMotorDatabase,
|
|
base_name: str
|
|
) -> List[DocumentInDB]:
|
|
"""Process multiple uploaded files"""
|
|
documents = []
|
|
|
|
for i, file in enumerate(files, 1):
|
|
# Generate custom name with serial number
|
|
file_extension = Path(file.filename).suffix.lower()
|
|
custom_name = f"{base_name}_{i:03d}{file_extension}"
|
|
|
|
document = await self.process_single_file(
|
|
file, index_id, user, db, custom_name
|
|
)
|
|
documents.append(document)
|
|
|
|
return documents
|
|
|
|
async def _process_document_async(self, document: DocumentInDB, db: AsyncIOMotorDatabase):
|
|
"""Process document asynchronously"""
|
|
print(f"Starting processing for document {document.id}: {document.original_filename}")
|
|
|
|
try:
|
|
# Update status to processing
|
|
print(f"Setting document {document.id} to processing status")
|
|
await self._update_document_status(
|
|
document.id, "processing", "pending", "pending", db
|
|
)
|
|
|
|
# Small delay to ensure status update is committed
|
|
import asyncio
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Parse document text
|
|
print(f"Parsing document text for {document.id}")
|
|
parsed_text = await self._parse_document_text(document.file_path)
|
|
print(f"Parsed text length: {len(parsed_text)} characters")
|
|
|
|
# Update parsing status
|
|
await self._update_document_status(
|
|
document.id, "completed", "processing", "pending", db
|
|
)
|
|
|
|
# Create text chunks
|
|
print(f"Creating text chunks for {document.id}")
|
|
chunks = await self._create_text_chunks(parsed_text, document.index_id)
|
|
print(f"Created {len(chunks)} chunks")
|
|
|
|
# Create embeddings and store in vector database
|
|
print(f"Creating embeddings for {document.id}")
|
|
vector_ids = await self._create_embeddings(
|
|
chunks, document.index_id, str(document.id)
|
|
)
|
|
print(f"Created {len(vector_ids)} embeddings")
|
|
|
|
# Update document with parsed data
|
|
print(f"Updating document {document.id} with parsed data")
|
|
await self._update_document_with_parsed_data(
|
|
document.id, parsed_text, chunks, vector_ids, db
|
|
)
|
|
|
|
# Update status to completed (core processing done)
|
|
print(f"Completing processing for document {document.id}")
|
|
await self._update_document_status(
|
|
document.id, "completed", "completed", "pending", db
|
|
)
|
|
|
|
# Extract contract summary (non-blocking)
|
|
print(f"Extracting contract summary for {document.id}")
|
|
try:
|
|
await self._extract_contract_summary(
|
|
document.id, parsed_text, document.original_filename, db
|
|
)
|
|
except Exception as summary_error:
|
|
print(f"Warning: Contract summary extraction failed for {document.id}: {summary_error}")
|
|
# Mark summary as failed but don't fail the entire document
|
|
await self._update_document_status(
|
|
document.id, "completed", "completed", "failed", db
|
|
)
|
|
|
|
print(f"Successfully processed document {document.id}")
|
|
|
|
except Exception as e:
|
|
print(f"Error processing document {document.id}: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
await self._update_document_status(
|
|
document.id, "failed", "failed", "failed", db
|
|
)
|
|
# Store error in metadata
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document.id)},
|
|
{"$set": {"metadata.error": str(e)}}
|
|
)
|
|
|
|
async def _parse_document_text(self, file_path: str) -> str:
|
|
"""Parse text from document using LlamaParse with premium mode (async)"""
|
|
file_path = Path(file_path)
|
|
|
|
print(f"Parsing file: {file_path}")
|
|
print(f"File exists: {file_path.exists()}")
|
|
print(f"File size: {file_path.stat().st_size if file_path.exists() else 'N/A'}")
|
|
|
|
# Check if LlamaParse API key is available
|
|
if not settings.llamaparse_api_key:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="LlamaParse API key is required for document processing"
|
|
)
|
|
|
|
try:
|
|
print("Using LlamaParse with premium mode (async)")
|
|
|
|
# Run LlamaParse in a thread pool to avoid blocking the event loop
|
|
def _run_llamaparse():
|
|
parser = LlamaParse(
|
|
api_key=settings.llamaparse_api_key,
|
|
premium_mode=True,
|
|
result_type="markdown",
|
|
verbose=True
|
|
)
|
|
return parser.load_data(str(file_path))
|
|
|
|
# Execute the synchronous LlamaParse call in a thread pool
|
|
import asyncio
|
|
loop = asyncio.get_event_loop()
|
|
documents = await loop.run_in_executor(None, _run_llamaparse)
|
|
|
|
print(f"LlamaParse loaded {len(documents)} documents from file")
|
|
|
|
# Combine all document text
|
|
full_text = ""
|
|
for doc in documents:
|
|
full_text += doc.text + "\n\n"
|
|
|
|
text_result = full_text.strip()
|
|
print(f"Final parsed text length: {len(text_result)} characters")
|
|
|
|
if len(text_result) == 0:
|
|
raise Exception("No text extracted from document via LlamaParse")
|
|
|
|
return text_result
|
|
|
|
except Exception as e:
|
|
print(f"Error in _parse_document_text with LlamaParse: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error parsing document with LlamaParse: {str(e)}"
|
|
)
|
|
|
|
async def _create_text_chunks(self, text: str, index_id: str) -> List[str]:
|
|
"""Create text chunks using LlamaIndex SemanticSplitter"""
|
|
try:
|
|
print(f"Creating semantic chunks for text of length {len(text)}")
|
|
|
|
# Create semantic splitter with OpenAI embeddings
|
|
semantic_splitter = SemanticSplitterNodeParser.from_defaults(
|
|
embed_model=OpenAIEmbedding(
|
|
model="text-embedding-3-small",
|
|
api_key=settings.openai_api_key
|
|
),
|
|
buffer_size=2,
|
|
breakpoint_percentile_threshold=70
|
|
)
|
|
|
|
# Create a document and split it semantically
|
|
llama_doc = LlamaDocument(text=text)
|
|
nodes = semantic_splitter.get_nodes_from_documents([llama_doc])
|
|
|
|
# Extract text from nodes
|
|
chunks = [node.text for node in nodes]
|
|
|
|
print(f"Created {len(chunks)} semantic chunks")
|
|
|
|
if len(chunks) == 0:
|
|
raise Exception("No semantic chunks created from text")
|
|
|
|
return chunks
|
|
|
|
except Exception as e:
|
|
print(f"Error in _create_text_chunks: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error creating semantic text chunks: {str(e)}"
|
|
)
|
|
|
|
async def _create_embeddings(
|
|
self,
|
|
chunks: List[str],
|
|
index_id: str,
|
|
document_id: str
|
|
) -> List[str]:
|
|
"""Create embeddings and store using LlamaIndex"""
|
|
try:
|
|
collection_name = f"index_{index_id}"
|
|
print(f"🔍 DEBUG - Creating embeddings using LlamaIndex for collection: {collection_name}")
|
|
|
|
# Create LlamaIndex documents from chunks
|
|
documents = []
|
|
vector_ids = []
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
chunk_id = f"{document_id}_{i}"
|
|
vector_ids.append(chunk_id)
|
|
|
|
# Create LlamaIndex document with metadata
|
|
doc = LlamaDocument(
|
|
text=chunk,
|
|
metadata={
|
|
"document_id": document_id,
|
|
"chunk_index": i,
|
|
"index_id": index_id,
|
|
"chunk_id": chunk_id
|
|
}
|
|
)
|
|
documents.append(doc)
|
|
|
|
print(f" - Created {len(documents)} LlamaIndex documents")
|
|
print(f" - Document IDs: {vector_ids}")
|
|
print(f" - Document lengths: {[len(doc.text) for doc in documents]}")
|
|
|
|
# Get or create ChromaDB collection using LlamaIndex
|
|
client = self.get_chroma_client()
|
|
try:
|
|
chroma_collection = client.get_collection(name=collection_name)
|
|
current_count = chroma_collection.count()
|
|
print(f" - Found existing collection with {current_count} vectors")
|
|
except Exception:
|
|
# Create collection - let LlamaIndex handle the embedding function
|
|
print(f" - Creating new ChromaDB collection")
|
|
chroma_collection = client.create_collection(
|
|
name=collection_name,
|
|
metadata={"hnsw:space": "cosine"}
|
|
)
|
|
|
|
# Create LlamaIndex vector store and index
|
|
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
|
|
storage_context = StorageContext.from_defaults(vector_store=vector_store)
|
|
|
|
# Check if index exists
|
|
try:
|
|
# Try to load existing index
|
|
index = VectorStoreIndex.from_vector_store(
|
|
vector_store=vector_store,
|
|
storage_context=storage_context
|
|
)
|
|
print(f" - Loaded existing LlamaIndex VectorStoreIndex")
|
|
except Exception:
|
|
# Create new index
|
|
index = VectorStoreIndex.from_documents(
|
|
[], # Start empty
|
|
storage_context=storage_context
|
|
)
|
|
print(f" - Created new LlamaIndex VectorStoreIndex")
|
|
|
|
# Insert documents into the index (async to avoid blocking)
|
|
print(f" - Inserting {len(documents)} documents into LlamaIndex")
|
|
|
|
def _insert_documents():
|
|
for doc in documents:
|
|
print(f" - Inserting document chunk with metadata: {doc.metadata}")
|
|
index.insert(doc)
|
|
return True
|
|
|
|
# Run embedding creation in thread pool to avoid blocking
|
|
await asyncio.get_event_loop().run_in_executor(None, _insert_documents)
|
|
|
|
# Verify the final count
|
|
final_count = chroma_collection.count()
|
|
print(f" - Collection count after adding: {final_count}")
|
|
print(f" - Successfully added {len(vector_ids)} vectors for document {document_id}")
|
|
|
|
return vector_ids
|
|
|
|
except Exception as e:
|
|
print(f"Error in _create_embeddings: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error creating embeddings: {str(e)}"
|
|
)
|
|
|
|
async def _update_document_status(
|
|
self,
|
|
document_id: str,
|
|
processing_status: str,
|
|
embedding_status: str,
|
|
summary_status: str,
|
|
db: AsyncIOMotorDatabase
|
|
):
|
|
"""Update document processing status"""
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"processing_status": processing_status,
|
|
"embedding_status": embedding_status,
|
|
"summary_status": summary_status,
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
async def _update_document_with_parsed_data(
|
|
self,
|
|
document_id: str,
|
|
parsed_text: str,
|
|
chunks: List[str],
|
|
vector_ids: List[str],
|
|
db: AsyncIOMotorDatabase
|
|
):
|
|
"""Update document with parsed data"""
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"parsed_text": parsed_text,
|
|
"text_chunks": chunks,
|
|
"chunk_count": len(chunks),
|
|
"vector_ids": vector_ids,
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
async def _extract_contract_summary(
|
|
self,
|
|
document_id: str,
|
|
parsed_text: str,
|
|
filename: str,
|
|
db: AsyncIOMotorDatabase
|
|
):
|
|
"""Extract contract summary asynchronously"""
|
|
try:
|
|
# Update summary status to processing
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"summary_status": "processing",
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
# Extract summary using the contract summary service
|
|
result = await contract_summary_service.extract_contract_summary(
|
|
parsed_text, filename
|
|
)
|
|
|
|
if result["success"]:
|
|
# Validate the summary
|
|
validated_summary = contract_summary_service.validate_contract_summary(
|
|
result["summary"]
|
|
)
|
|
|
|
# Store in database
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"contract_summary": validated_summary.dict(),
|
|
"summary_status": "completed",
|
|
"summary_created_at": datetime.utcnow(),
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
print(f"Successfully extracted contract summary for {document_id}")
|
|
|
|
else:
|
|
# Store error
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"summary_status": "failed",
|
|
"metadata.summary_error": result.get("error", "Unknown error"),
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
print(f"Failed to extract contract summary for {document_id}: {result.get('error')}")
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting contract summary for {document_id}: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"summary_status": "failed",
|
|
"metadata.summary_error": str(e),
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
async def _validate_file(self, file: UploadFile):
|
|
"""Validate uploaded file"""
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No file provided")
|
|
|
|
# Check file extension
|
|
file_extension = Path(file.filename).suffix.lower()
|
|
if file_extension not in self.allowed_extensions:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File type {file_extension} not supported. Allowed types: {', '.join(self.allowed_extensions)}"
|
|
)
|
|
|
|
# Check file size
|
|
if file.size > self.max_file_size:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File too large. Maximum size: {self.max_file_size / (1024*1024):.1f}MB"
|
|
)
|
|
|
|
async def _save_file(self, file: UploadFile, file_path: Path):
|
|
"""Save uploaded file to disk"""
|
|
try:
|
|
async with aiofiles.open(file_path, 'wb') as f:
|
|
content = await file.read()
|
|
await f.write(content)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error saving file: {str(e)}"
|
|
)
|
|
|
|
async def query_documents(
|
|
self,
|
|
query: str,
|
|
index_id: str,
|
|
top_k: int = 10
|
|
) -> List[Dict[str, Any]]:
|
|
"""Query documents using LlamaIndex VectorStoreIndex"""
|
|
try:
|
|
collection_name = f"index_{index_id}"
|
|
|
|
# Ensure consistent embedding model for LlamaIndex
|
|
Settings.embed_model = OpenAIEmbedding(
|
|
model="text-embedding-3-small",
|
|
api_key=settings.openai_api_key
|
|
)
|
|
|
|
# DEBUG: LlamaIndex query setup
|
|
print(f"🔍 DEBUG - LlamaIndex Query Execution:")
|
|
print(f" - Query: '{query}'")
|
|
print(f" - Collection name: {collection_name}")
|
|
print(f" - Top K: {top_k}")
|
|
print(f" - Settings.embed_model: {type(Settings.embed_model).__name__}")
|
|
|
|
# Test embedding dimensions
|
|
try:
|
|
test_embedding = Settings.embed_model.get_text_embedding("test")
|
|
print(f" - Query embedding dimensions: {len(test_embedding)}")
|
|
except Exception as e:
|
|
print(f" - ERROR getting test embedding: {e}")
|
|
|
|
# Check if collection exists (without specifying embedding function)
|
|
try:
|
|
client = self.get_chroma_client()
|
|
chroma_collection = client.get_collection(name=collection_name)
|
|
collection_count = chroma_collection.count()
|
|
print(f" - Found collection with {collection_count} documents")
|
|
|
|
if collection_count == 0:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Index '{index_id}' exists but contains no processed documents. Please upload and process documents first."
|
|
)
|
|
except Exception as collection_error:
|
|
print(f" - Collection {collection_name} not found: {collection_error}")
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Vector collection for index '{index_id}' does not exist. Documents may still be processing or failed to process. Please check the admin panel for processing status."
|
|
)
|
|
|
|
# Create LlamaIndex VectorStore and Index
|
|
try:
|
|
print(f" - Creating LlamaIndex VectorStore...")
|
|
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
|
|
storage_context = StorageContext.from_defaults(vector_store=vector_store)
|
|
|
|
# Load the index
|
|
index = VectorStoreIndex.from_vector_store(
|
|
vector_store=vector_store,
|
|
storage_context=storage_context
|
|
)
|
|
|
|
# Create query engine
|
|
query_engine = index.as_query_engine(
|
|
similarity_top_k=top_k,
|
|
response_mode="no_text" # Only return source nodes, no generated text
|
|
)
|
|
|
|
print(f" - Executing LlamaIndex query...")
|
|
response = query_engine.query(query)
|
|
print(f" - Query successful")
|
|
|
|
# Extract and format results from source nodes
|
|
formatted_results = []
|
|
if hasattr(response, 'source_nodes') and response.source_nodes:
|
|
for i, node in enumerate(response.source_nodes):
|
|
# Get similarity score (higher is better)
|
|
similarity_score = node.score if hasattr(node, 'score') and node.score is not None else 0.5
|
|
|
|
# Convert similarity score to distance (lower is better)
|
|
# LlamaIndex typically returns scores between 0 and 1, where 1 is most similar
|
|
distance = 1.0 - similarity_score
|
|
|
|
formatted_results.append({
|
|
"content": node.text,
|
|
"metadata": node.metadata,
|
|
"score": similarity_score,
|
|
"distance": distance,
|
|
"document_id": node.metadata.get("document_id", "unknown"),
|
|
"chunk_index": node.metadata.get("chunk_index", i)
|
|
})
|
|
print(f" - Retrieved {len(formatted_results)} relevant chunks")
|
|
print(f" - Similarity scores: {[r['score'] for r in formatted_results]}")
|
|
print(f" - Distance values: {[r['distance'] for r in formatted_results]}")
|
|
else:
|
|
print(f" - No source nodes found in response")
|
|
|
|
except Exception as query_error:
|
|
print(f" - ERROR during LlamaIndex query: {query_error}")
|
|
print(f" - Error type: {type(query_error).__name__}")
|
|
raise query_error
|
|
|
|
if not formatted_results:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No relevant documents found for query '{query}' in index '{index_id}'. Try rephrasing your question or ensure documents are properly processed."
|
|
)
|
|
|
|
return formatted_results
|
|
|
|
except HTTPException:
|
|
# Re-raise HTTPExceptions as-is
|
|
raise
|
|
except Exception as e:
|
|
print(f"Unexpected error in query_documents: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error querying documents: {str(e)}"
|
|
)
|
|
|
|
def generate_response(
|
|
self,
|
|
query: str,
|
|
context_chunks: List[str],
|
|
index_id: str
|
|
) -> str:
|
|
"""Generate response using OpenAI with retrieved context"""
|
|
try:
|
|
# Prepare context
|
|
context = "\n\n".join(context_chunks)
|
|
|
|
# Create prompt
|
|
prompt = f"""Based on the following context, answer the user's question. If the answer is not in the context, say "I don't have enough information to answer that question."
|
|
|
|
Context:
|
|
{context}
|
|
|
|
Question: {query}
|
|
|
|
Answer:"""
|
|
|
|
# Generate response using OpenAI
|
|
llm = OpenAI(
|
|
model="gpt-4o",
|
|
api_key=settings.openai_api_key,
|
|
temperature=0.1
|
|
)
|
|
|
|
# Use sync completion
|
|
response = llm.complete(prompt)
|
|
|
|
return response.text
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error generating response: {str(e)}"
|
|
)
|
|
|
|
async def delete_document_embeddings(
|
|
self,
|
|
document_id: str,
|
|
index_id: str
|
|
):
|
|
"""Delete document embeddings from vector store"""
|
|
try:
|
|
collection_name = f"index_{index_id}"
|
|
print(f"🗑️ DEBUG - Deleting embeddings for document {document_id} from collection {collection_name}")
|
|
|
|
collection = self.get_chroma_client().get_collection(name=collection_name)
|
|
|
|
# Get collection count before deletion
|
|
count_before = collection.count()
|
|
print(f" - Collection count before deletion: {count_before}")
|
|
|
|
# Strategy 1: Try to delete by document_id metadata
|
|
try:
|
|
results = collection.get(where={"document_id": document_id})
|
|
print(f" - Strategy 1 - Found {len(results['ids']) if results['ids'] else 0} vectors by document_id")
|
|
|
|
if results["ids"]:
|
|
collection.delete(ids=results["ids"])
|
|
count_after = collection.count()
|
|
print(f" - Strategy 1 - Successfully deleted {count_before - count_after} vectors")
|
|
if count_before != count_after:
|
|
return # Success, exit early
|
|
except Exception as e:
|
|
print(f" - Strategy 1 failed: {e}")
|
|
|
|
# Strategy 2: Try to delete by chunk_id pattern (document_id_*)
|
|
try:
|
|
# Get all vectors and filter by chunk_id pattern
|
|
all_results = collection.get()
|
|
matching_ids = []
|
|
|
|
if all_results["ids"] and all_results["metadatas"]:
|
|
for vid, metadata in zip(all_results["ids"], all_results["metadatas"]):
|
|
if metadata and "chunk_id" in metadata:
|
|
if metadata["chunk_id"].startswith(f"{document_id}_"):
|
|
matching_ids.append(vid)
|
|
|
|
print(f" - Strategy 2 - Found {len(matching_ids)} vectors by chunk_id pattern")
|
|
|
|
if matching_ids:
|
|
collection.delete(ids=matching_ids)
|
|
count_after = collection.count()
|
|
print(f" - Strategy 2 - Successfully deleted {count_before - count_after} vectors")
|
|
if count_before != count_after:
|
|
return # Success, exit early
|
|
except Exception as e:
|
|
print(f" - Strategy 2 failed: {e}")
|
|
|
|
# Strategy 3: Try to delete by any metadata containing document_id
|
|
try:
|
|
all_results = collection.get()
|
|
matching_ids = []
|
|
|
|
if all_results["ids"] and all_results["metadatas"]:
|
|
for vid, metadata in zip(all_results["ids"], all_results["metadatas"]):
|
|
if metadata:
|
|
# Check if any metadata value contains the document_id
|
|
for key, value in metadata.items():
|
|
if str(value) == document_id:
|
|
matching_ids.append(vid)
|
|
break
|
|
|
|
print(f" - Strategy 3 - Found {len(matching_ids)} vectors by metadata scan")
|
|
|
|
if matching_ids:
|
|
collection.delete(ids=matching_ids)
|
|
count_after = collection.count()
|
|
print(f" - Strategy 3 - Successfully deleted {count_before - count_after} vectors")
|
|
if count_before != count_after:
|
|
return # Success, exit early
|
|
except Exception as e:
|
|
print(f" - Strategy 3 failed: {e}")
|
|
|
|
# If we get here, no vectors were deleted
|
|
print(f" - WARNING: No vectors were deleted for document {document_id}")
|
|
|
|
# Debug: Show some sample metadata to understand the structure
|
|
try:
|
|
sample_results = collection.get(limit=3)
|
|
print(f" - DEBUG: Sample metadata structures:")
|
|
for i, metadata in enumerate(sample_results.get("metadatas", [])[:3]):
|
|
print(f" - Sample {i}: {metadata}")
|
|
except Exception as e:
|
|
print(f" - DEBUG: Could not get sample metadata: {e}")
|
|
|
|
except Exception as e:
|
|
print(f"Error deleting embeddings for document {document_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def check_collection_exists(self, index_id: str) -> bool:
|
|
"""Check if ChromaDB collection exists for an index"""
|
|
try:
|
|
collection_name = f"index_{index_id}"
|
|
collection = self.get_chroma_client().get_collection(name=collection_name)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def get_collection_info(self, index_id: str) -> Dict[str, Any]:
|
|
"""Get information about a ChromaDB collection"""
|
|
try:
|
|
collection_name = f"index_{index_id}"
|
|
collection = self.get_chroma_client().get_collection(name=collection_name)
|
|
|
|
return {
|
|
"exists": True,
|
|
"name": collection_name,
|
|
"count": collection.count(),
|
|
"metadata": collection.metadata
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"exists": False,
|
|
"name": f"index_{index_id}",
|
|
"count": 0,
|
|
"error": str(e)
|
|
}
|
|
|
|
async def generate_document_summary(self, text: str, filename: str) -> str:
|
|
"""Generate AI summary of a document"""
|
|
try:
|
|
# Check text length and raise error if too long
|
|
if len(text) > settings.max_summary_chars:
|
|
error_msg = f"Document too large for summary: {len(text)} characters exceeds maximum of {settings.max_summary_chars} characters"
|
|
print(f"Error: {error_msg}")
|
|
return f"Error: {error_msg}"
|
|
|
|
# Create summarization prompt
|
|
prompt = f"""Please provide a concise summary of the following document "{filename}".
|
|
Focus on the main points, key information, and important details.
|
|
Keep the summary between 150-300 words.
|
|
|
|
Document content:
|
|
{text}
|
|
|
|
Summary:"""
|
|
|
|
# Generate summary using OpenAI via Settings.llm
|
|
response = await Settings.llm.acomplete(prompt)
|
|
summary = str(response).strip()
|
|
|
|
# Fallback if summary is too short
|
|
if len(summary) < 50:
|
|
summary = "Unable to generate detailed summary. This document contains text that may require manual review."
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
return f"Error generating summary: {str(e)}"
|
|
|
|
|
|
# Global processor instance
|
|
llama_processor = LlamaProcessor() |