contract-query/backend/app/services/llama_processor.py
2025-08-14 15:03:33 -05:00

881 lines
No EOL
35 KiB
Python

import os
import uuid
import asyncio
from typing import List, Dict, Any, Optional, Union
from pathlib import Path
import aiofiles
from fastapi import UploadFile, HTTPException
from motor.motor_asyncio import AsyncIOMotorDatabase
from bson import ObjectId
from datetime import datetime
from llama_index.core import (
VectorStoreIndex,
StorageContext,
Settings,
Document as LlamaDocument
)
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.embeddings import BaseEmbedding
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_cloud_services import LlamaParse
import chromadb
from chromadb.config import Settings as ChromaSettings
from chromadb.utils import embedding_functions
from ..config.settings import settings
from ..models.document import DocumentInDB, DocumentCreate
from ..core.chroma_client import chroma_singleton
from ..models.index import IndexInDB
from ..models.user import UserInDB
from ..utils.file_utils import validate_file, get_file_info
from ..services.contract_summary_service import contract_summary_service
class LlamaProcessor:
def __init__(self):
self.upload_dir = Path(settings.upload_dir)
self.indices_dir = Path(settings.indices_dir)
self.allowed_extensions = {
'.pdf', '.docx', '.doc', '.txt', '.csv', '.json', '.html', '.md', '.rtf'
}
self.max_file_size = 50 * 1024 * 1024 # 50MB
# Initialize LlamaIndex components
self._setup_llama_index()
# ChromaDB client managed by singleton
def get_chroma_client(self):
"""Get or create ChromaDB client using shared singleton"""
chroma_db_path = str(self.indices_dir / "chroma_db")
return chroma_singleton.get_client(chroma_db_path)
def _setup_llama_index(self):
"""Setup LlamaIndex components"""
# Configure OpenAI
Settings.llm = OpenAI(
model="gpt-4o",
api_key=settings.openai_api_key,
temperature=0.1
)
# Configure embeddings
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=settings.openai_api_key
)
# Configure semantic text splitter
Settings.text_splitter = SemanticSplitterNodeParser.from_defaults(
embed_model=OpenAIEmbedding(
model="text-embedding-3-small",
api_key=settings.openai_api_key
),
buffer_size=2,
breakpoint_percentile_threshold=70
)
async def process_single_file(
self,
file: UploadFile,
index_id: str,
user: UserInDB,
db: AsyncIOMotorDatabase,
custom_name: Optional[str] = None
) -> DocumentInDB:
"""Process a single uploaded file"""
# Validate file
await self._validate_file(file)
# Generate unique filename
file_extension = Path(file.filename).suffix.lower()
unique_filename = f"{uuid.uuid4()}{file_extension}"
# Create index-specific upload directory
index_upload_dir = self.upload_dir / index_id
index_upload_dir.mkdir(parents=True, exist_ok=True)
# Save file
file_path = index_upload_dir / unique_filename
await self._save_file(file, file_path)
# Create document record
document_name = custom_name or file.filename
document_data = DocumentCreate(
filename=unique_filename,
original_filename=document_name,
file_size=file.size,
content_type=file.content_type,
index_id=index_id,
uploaded_by=user.id
)
document_dict = document_data.dict()
document_dict["file_path"] = str(file_path)
document_dict["processing_status"] = "pending"
document_dict["embedding_status"] = "pending"
document_dict["summary_status"] = "pending"
document_dict["metadata"] = {}
document_dict["created_at"] = datetime.utcnow()
document_dict["updated_at"] = datetime.utcnow()
# Save to database
result = await db.documents.insert_one(document_dict)
document_dict["_id"] = result.inserted_id
document = DocumentInDB(**document_dict)
# Process document asynchronously
asyncio.create_task(self._process_document_async(document, db))
return document
async def process_multiple_files(
self,
files: List[UploadFile],
index_id: str,
user: UserInDB,
db: AsyncIOMotorDatabase,
base_name: str
) -> List[DocumentInDB]:
"""Process multiple uploaded files"""
documents = []
for i, file in enumerate(files, 1):
# Generate custom name with serial number
file_extension = Path(file.filename).suffix.lower()
custom_name = f"{base_name}_{i:03d}{file_extension}"
document = await self.process_single_file(
file, index_id, user, db, custom_name
)
documents.append(document)
return documents
async def _process_document_async(self, document: DocumentInDB, db: AsyncIOMotorDatabase):
"""Process document asynchronously"""
print(f"Starting processing for document {document.id}: {document.original_filename}")
try:
# Update status to processing
print(f"Setting document {document.id} to processing status")
await self._update_document_status(
document.id, "processing", "pending", "pending", db
)
# Small delay to ensure status update is committed
import asyncio
await asyncio.sleep(0.1)
# Parse document text
print(f"Parsing document text for {document.id}")
parsed_text = await self._parse_document_text(document.file_path)
print(f"Parsed text length: {len(parsed_text)} characters")
# Update parsing status
await self._update_document_status(
document.id, "completed", "processing", "pending", db
)
# Create text chunks
print(f"Creating text chunks for {document.id}")
chunks = await self._create_text_chunks(parsed_text, document.index_id)
print(f"Created {len(chunks)} chunks")
# Create embeddings and store in vector database
print(f"Creating embeddings for {document.id}")
vector_ids = await self._create_embeddings(
chunks, document.index_id, str(document.id)
)
print(f"Created {len(vector_ids)} embeddings")
# Update document with parsed data
print(f"Updating document {document.id} with parsed data")
await self._update_document_with_parsed_data(
document.id, parsed_text, chunks, vector_ids, db
)
# Update status to completed (core processing done)
print(f"Completing processing for document {document.id}")
await self._update_document_status(
document.id, "completed", "completed", "pending", db
)
# Extract contract summary (non-blocking)
print(f"Extracting contract summary for {document.id}")
try:
await self._extract_contract_summary(
document.id, parsed_text, document.original_filename, db
)
except Exception as summary_error:
print(f"Warning: Contract summary extraction failed for {document.id}: {summary_error}")
# Mark summary as failed but don't fail the entire document
await self._update_document_status(
document.id, "completed", "completed", "failed", db
)
print(f"Successfully processed document {document.id}")
except Exception as e:
print(f"Error processing document {document.id}: {str(e)}")
import traceback
traceback.print_exc()
await self._update_document_status(
document.id, "failed", "failed", "failed", db
)
# Store error in metadata
await db.documents.update_one(
{"_id": ObjectId(document.id)},
{"$set": {"metadata.error": str(e)}}
)
async def _parse_document_text(self, file_path: str) -> str:
"""Parse text from document using LlamaParse with premium mode (async)"""
file_path = Path(file_path)
print(f"Parsing file: {file_path}")
print(f"File exists: {file_path.exists()}")
print(f"File size: {file_path.stat().st_size if file_path.exists() else 'N/A'}")
# Check if LlamaParse API key is available
if not settings.llamaparse_api_key:
raise HTTPException(
status_code=500,
detail="LlamaParse API key is required for document processing"
)
try:
print("Using LlamaParse with premium mode (async)")
# Run LlamaParse in a thread pool to avoid blocking the event loop
def _run_llamaparse():
parser = LlamaParse(
api_key=settings.llamaparse_api_key,
premium_mode=True,
result_type="markdown",
verbose=True
)
return parser.load_data(str(file_path))
# Execute the synchronous LlamaParse call in a thread pool
import asyncio
loop = asyncio.get_event_loop()
documents = await loop.run_in_executor(None, _run_llamaparse)
print(f"LlamaParse loaded {len(documents)} documents from file")
# Combine all document text
full_text = ""
for doc in documents:
full_text += doc.text + "\n\n"
text_result = full_text.strip()
print(f"Final parsed text length: {len(text_result)} characters")
if len(text_result) == 0:
raise Exception("No text extracted from document via LlamaParse")
return text_result
except Exception as e:
print(f"Error in _parse_document_text with LlamaParse: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error parsing document with LlamaParse: {str(e)}"
)
async def _create_text_chunks(self, text: str, index_id: str) -> List[str]:
"""Create text chunks using LlamaIndex SemanticSplitter"""
try:
print(f"Creating semantic chunks for text of length {len(text)}")
# Create semantic splitter with OpenAI embeddings
semantic_splitter = SemanticSplitterNodeParser.from_defaults(
embed_model=OpenAIEmbedding(
model="text-embedding-3-small",
api_key=settings.openai_api_key
),
buffer_size=2,
breakpoint_percentile_threshold=70
)
# Create a document and split it semantically
llama_doc = LlamaDocument(text=text)
nodes = semantic_splitter.get_nodes_from_documents([llama_doc])
# Extract text from nodes
chunks = [node.text for node in nodes]
print(f"Created {len(chunks)} semantic chunks")
if len(chunks) == 0:
raise Exception("No semantic chunks created from text")
return chunks
except Exception as e:
print(f"Error in _create_text_chunks: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error creating semantic text chunks: {str(e)}"
)
async def _create_embeddings(
self,
chunks: List[str],
index_id: str,
document_id: str
) -> List[str]:
"""Create embeddings and store using LlamaIndex"""
try:
collection_name = f"index_{index_id}"
print(f"🔍 DEBUG - Creating embeddings using LlamaIndex for collection: {collection_name}")
# Create LlamaIndex documents from chunks
documents = []
vector_ids = []
for i, chunk in enumerate(chunks):
chunk_id = f"{document_id}_{i}"
vector_ids.append(chunk_id)
# Create LlamaIndex document with metadata
doc = LlamaDocument(
text=chunk,
metadata={
"document_id": document_id,
"chunk_index": i,
"index_id": index_id,
"chunk_id": chunk_id
}
)
documents.append(doc)
print(f" - Created {len(documents)} LlamaIndex documents")
print(f" - Document IDs: {vector_ids}")
print(f" - Document lengths: {[len(doc.text) for doc in documents]}")
# Get or create ChromaDB collection using LlamaIndex
client = self.get_chroma_client()
try:
chroma_collection = client.get_collection(name=collection_name)
current_count = chroma_collection.count()
print(f" - Found existing collection with {current_count} vectors")
except Exception:
# Create collection - let LlamaIndex handle the embedding function
print(f" - Creating new ChromaDB collection")
chroma_collection = client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
# Create LlamaIndex vector store and index
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# Check if index exists
try:
# Try to load existing index
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
storage_context=storage_context
)
print(f" - Loaded existing LlamaIndex VectorStoreIndex")
except Exception:
# Create new index
index = VectorStoreIndex.from_documents(
[], # Start empty
storage_context=storage_context
)
print(f" - Created new LlamaIndex VectorStoreIndex")
# Insert documents into the index (async to avoid blocking)
print(f" - Inserting {len(documents)} documents into LlamaIndex")
def _insert_documents():
for doc in documents:
print(f" - Inserting document chunk with metadata: {doc.metadata}")
index.insert(doc)
return True
# Run embedding creation in thread pool to avoid blocking
await asyncio.get_event_loop().run_in_executor(None, _insert_documents)
# Verify the final count
final_count = chroma_collection.count()
print(f" - Collection count after adding: {final_count}")
print(f" - Successfully added {len(vector_ids)} vectors for document {document_id}")
return vector_ids
except Exception as e:
print(f"Error in _create_embeddings: {str(e)}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=500,
detail=f"Error creating embeddings: {str(e)}"
)
async def _update_document_status(
self,
document_id: str,
processing_status: str,
embedding_status: str,
summary_status: str,
db: AsyncIOMotorDatabase
):
"""Update document processing status"""
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": {
"processing_status": processing_status,
"embedding_status": embedding_status,
"summary_status": summary_status,
"updated_at": datetime.utcnow()
}}
)
async def _update_document_with_parsed_data(
self,
document_id: str,
parsed_text: str,
chunks: List[str],
vector_ids: List[str],
db: AsyncIOMotorDatabase
):
"""Update document with parsed data"""
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": {
"parsed_text": parsed_text,
"text_chunks": chunks,
"chunk_count": len(chunks),
"vector_ids": vector_ids,
"updated_at": datetime.utcnow()
}}
)
async def _extract_contract_summary(
self,
document_id: str,
parsed_text: str,
filename: str,
db: AsyncIOMotorDatabase
):
"""Extract contract summary asynchronously"""
try:
# Update summary status to processing
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": {
"summary_status": "processing",
"updated_at": datetime.utcnow()
}}
)
# Extract summary using the contract summary service
result = await contract_summary_service.extract_contract_summary(
parsed_text, filename
)
if result["success"]:
# Validate the summary
validated_summary = contract_summary_service.validate_contract_summary(
result["summary"]
)
# Store in database
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": {
"contract_summary": validated_summary.dict(),
"summary_status": "completed",
"summary_created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}}
)
print(f"Successfully extracted contract summary for {document_id}")
else:
# Store error
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": {
"summary_status": "failed",
"metadata.summary_error": result.get("error", "Unknown error"),
"updated_at": datetime.utcnow()
}}
)
print(f"Failed to extract contract summary for {document_id}: {result.get('error')}")
except Exception as e:
print(f"Error extracting contract summary for {document_id}: {str(e)}")
import traceback
traceback.print_exc()
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": {
"summary_status": "failed",
"metadata.summary_error": str(e),
"updated_at": datetime.utcnow()
}}
)
async def _validate_file(self, file: UploadFile):
"""Validate uploaded file"""
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Check file extension
file_extension = Path(file.filename).suffix.lower()
if file_extension not in self.allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"File type {file_extension} not supported. Allowed types: {', '.join(self.allowed_extensions)}"
)
# Check file size
if file.size > self.max_file_size:
raise HTTPException(
status_code=400,
detail=f"File too large. Maximum size: {self.max_file_size / (1024*1024):.1f}MB"
)
async def _save_file(self, file: UploadFile, file_path: Path):
"""Save uploaded file to disk"""
try:
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error saving file: {str(e)}"
)
async def query_documents(
self,
query: str,
index_id: str,
top_k: int = 10
) -> List[Dict[str, Any]]:
"""Query documents using LlamaIndex VectorStoreIndex"""
try:
collection_name = f"index_{index_id}"
# Ensure consistent embedding model for LlamaIndex
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=settings.openai_api_key
)
# DEBUG: LlamaIndex query setup
print(f"🔍 DEBUG - LlamaIndex Query Execution:")
print(f" - Query: '{query}'")
print(f" - Collection name: {collection_name}")
print(f" - Top K: {top_k}")
print(f" - Settings.embed_model: {type(Settings.embed_model).__name__}")
# Test embedding dimensions
try:
test_embedding = Settings.embed_model.get_text_embedding("test")
print(f" - Query embedding dimensions: {len(test_embedding)}")
except Exception as e:
print(f" - ERROR getting test embedding: {e}")
# Check if collection exists (without specifying embedding function)
try:
client = self.get_chroma_client()
chroma_collection = client.get_collection(name=collection_name)
collection_count = chroma_collection.count()
print(f" - Found collection with {collection_count} documents")
if collection_count == 0:
raise HTTPException(
status_code=404,
detail=f"Index '{index_id}' exists but contains no processed documents. Please upload and process documents first."
)
except Exception as collection_error:
print(f" - Collection {collection_name} not found: {collection_error}")
raise HTTPException(
status_code=404,
detail=f"Vector collection for index '{index_id}' does not exist. Documents may still be processing or failed to process. Please check the admin panel for processing status."
)
# Create LlamaIndex VectorStore and Index
try:
print(f" - Creating LlamaIndex VectorStore...")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# Load the index
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
storage_context=storage_context
)
# Create query engine
query_engine = index.as_query_engine(
similarity_top_k=top_k,
response_mode="no_text" # Only return source nodes, no generated text
)
print(f" - Executing LlamaIndex query...")
response = query_engine.query(query)
print(f" - Query successful")
# Extract and format results from source nodes
formatted_results = []
if hasattr(response, 'source_nodes') and response.source_nodes:
for i, node in enumerate(response.source_nodes):
# Get similarity score (higher is better)
similarity_score = node.score if hasattr(node, 'score') and node.score is not None else 0.5
# Convert similarity score to distance (lower is better)
# LlamaIndex typically returns scores between 0 and 1, where 1 is most similar
distance = 1.0 - similarity_score
formatted_results.append({
"content": node.text,
"metadata": node.metadata,
"score": similarity_score,
"distance": distance,
"document_id": node.metadata.get("document_id", "unknown"),
"chunk_index": node.metadata.get("chunk_index", i)
})
print(f" - Retrieved {len(formatted_results)} relevant chunks")
print(f" - Similarity scores: {[r['score'] for r in formatted_results]}")
print(f" - Distance values: {[r['distance'] for r in formatted_results]}")
else:
print(f" - No source nodes found in response")
except Exception as query_error:
print(f" - ERROR during LlamaIndex query: {query_error}")
print(f" - Error type: {type(query_error).__name__}")
raise query_error
if not formatted_results:
raise HTTPException(
status_code=404,
detail=f"No relevant documents found for query '{query}' in index '{index_id}'. Try rephrasing your question or ensure documents are properly processed."
)
return formatted_results
except HTTPException:
# Re-raise HTTPExceptions as-is
raise
except Exception as e:
print(f"Unexpected error in query_documents: {e}")
raise HTTPException(
status_code=500,
detail=f"Error querying documents: {str(e)}"
)
def generate_response(
self,
query: str,
context_chunks: List[str],
index_id: str
) -> str:
"""Generate response using OpenAI with retrieved context"""
try:
# Prepare context
context = "\n\n".join(context_chunks)
# Create prompt
prompt = f"""Based on the following context, answer the user's question. If the answer is not in the context, say "I don't have enough information to answer that question."
Context:
{context}
Question: {query}
Answer:"""
# Generate response using OpenAI
llm = OpenAI(
model="gpt-4o",
api_key=settings.openai_api_key,
temperature=0.1
)
# Use sync completion
response = llm.complete(prompt)
return response.text
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error generating response: {str(e)}"
)
async def delete_document_embeddings(
self,
document_id: str,
index_id: str
):
"""Delete document embeddings from vector store"""
try:
collection_name = f"index_{index_id}"
print(f"🗑️ DEBUG - Deleting embeddings for document {document_id} from collection {collection_name}")
collection = self.get_chroma_client().get_collection(name=collection_name)
# Get collection count before deletion
count_before = collection.count()
print(f" - Collection count before deletion: {count_before}")
# Strategy 1: Try to delete by document_id metadata
try:
results = collection.get(where={"document_id": document_id})
print(f" - Strategy 1 - Found {len(results['ids']) if results['ids'] else 0} vectors by document_id")
if results["ids"]:
collection.delete(ids=results["ids"])
count_after = collection.count()
print(f" - Strategy 1 - Successfully deleted {count_before - count_after} vectors")
if count_before != count_after:
return # Success, exit early
except Exception as e:
print(f" - Strategy 1 failed: {e}")
# Strategy 2: Try to delete by chunk_id pattern (document_id_*)
try:
# Get all vectors and filter by chunk_id pattern
all_results = collection.get()
matching_ids = []
if all_results["ids"] and all_results["metadatas"]:
for vid, metadata in zip(all_results["ids"], all_results["metadatas"]):
if metadata and "chunk_id" in metadata:
if metadata["chunk_id"].startswith(f"{document_id}_"):
matching_ids.append(vid)
print(f" - Strategy 2 - Found {len(matching_ids)} vectors by chunk_id pattern")
if matching_ids:
collection.delete(ids=matching_ids)
count_after = collection.count()
print(f" - Strategy 2 - Successfully deleted {count_before - count_after} vectors")
if count_before != count_after:
return # Success, exit early
except Exception as e:
print(f" - Strategy 2 failed: {e}")
# Strategy 3: Try to delete by any metadata containing document_id
try:
all_results = collection.get()
matching_ids = []
if all_results["ids"] and all_results["metadatas"]:
for vid, metadata in zip(all_results["ids"], all_results["metadatas"]):
if metadata:
# Check if any metadata value contains the document_id
for key, value in metadata.items():
if str(value) == document_id:
matching_ids.append(vid)
break
print(f" - Strategy 3 - Found {len(matching_ids)} vectors by metadata scan")
if matching_ids:
collection.delete(ids=matching_ids)
count_after = collection.count()
print(f" - Strategy 3 - Successfully deleted {count_before - count_after} vectors")
if count_before != count_after:
return # Success, exit early
except Exception as e:
print(f" - Strategy 3 failed: {e}")
# If we get here, no vectors were deleted
print(f" - WARNING: No vectors were deleted for document {document_id}")
# Debug: Show some sample metadata to understand the structure
try:
sample_results = collection.get(limit=3)
print(f" - DEBUG: Sample metadata structures:")
for i, metadata in enumerate(sample_results.get("metadatas", [])[:3]):
print(f" - Sample {i}: {metadata}")
except Exception as e:
print(f" - DEBUG: Could not get sample metadata: {e}")
except Exception as e:
print(f"Error deleting embeddings for document {document_id}: {e}")
import traceback
traceback.print_exc()
def check_collection_exists(self, index_id: str) -> bool:
"""Check if ChromaDB collection exists for an index"""
try:
collection_name = f"index_{index_id}"
collection = self.get_chroma_client().get_collection(name=collection_name)
return True
except Exception:
return False
def get_collection_info(self, index_id: str) -> Dict[str, Any]:
"""Get information about a ChromaDB collection"""
try:
collection_name = f"index_{index_id}"
collection = self.get_chroma_client().get_collection(name=collection_name)
return {
"exists": True,
"name": collection_name,
"count": collection.count(),
"metadata": collection.metadata
}
except Exception as e:
return {
"exists": False,
"name": f"index_{index_id}",
"count": 0,
"error": str(e)
}
async def generate_document_summary(self, text: str, filename: str) -> str:
"""Generate AI summary of a document"""
try:
# Check text length and raise error if too long
if len(text) > settings.max_summary_chars:
error_msg = f"Document too large for summary: {len(text)} characters exceeds maximum of {settings.max_summary_chars} characters"
print(f"Error: {error_msg}")
return f"Error: {error_msg}"
# Create summarization prompt
prompt = f"""Please provide a concise summary of the following document "{filename}".
Focus on the main points, key information, and important details.
Keep the summary between 150-300 words.
Document content:
{text}
Summary:"""
# Generate summary using OpenAI via Settings.llm
response = await Settings.llm.acomplete(prompt)
summary = str(response).strip()
# Fallback if summary is too short
if len(summary) < 50:
summary = "Unable to generate detailed summary. This document contains text that may require manual review."
return summary
except Exception as e:
return f"Error generating summary: {str(e)}"
# Global processor instance
llama_processor = LlamaProcessor()