import os import uuid import asyncio from typing import List, Dict, Any, Optional, Union from pathlib import Path import aiofiles from fastapi import UploadFile, HTTPException from motor.motor_asyncio import AsyncIOMotorDatabase from bson import ObjectId from datetime import datetime from llama_index.core import ( VectorStoreIndex, StorageContext, Settings, Document as LlamaDocument ) from llama_index.core.node_parser import SemanticSplitterNodeParser from llama_index.core.embeddings import BaseEmbedding from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.llms.openai import OpenAI from llama_index.vector_stores.chroma import ChromaVectorStore from llama_cloud_services import LlamaParse import chromadb from chromadb.config import Settings as ChromaSettings from chromadb.utils import embedding_functions from ..config.settings import settings from ..models.document import DocumentInDB, DocumentCreate from ..core.chroma_client import chroma_singleton from ..models.index import IndexInDB from ..models.user import UserInDB from ..utils.file_utils import validate_file, get_file_info from ..services.contract_summary_service import contract_summary_service class LlamaProcessor: def __init__(self): self.upload_dir = Path(settings.upload_dir) self.indices_dir = Path(settings.indices_dir) self.allowed_extensions = { '.pdf', '.docx', '.doc', '.txt', '.csv', '.json', '.html', '.md', '.rtf' } self.max_file_size = 50 * 1024 * 1024 # 50MB # Initialize LlamaIndex components self._setup_llama_index() # ChromaDB client managed by singleton def get_chroma_client(self): """Get or create ChromaDB client using shared singleton""" chroma_db_path = str(self.indices_dir / "chroma_db") return chroma_singleton.get_client(chroma_db_path) def _setup_llama_index(self): """Setup LlamaIndex components""" # Configure OpenAI Settings.llm = OpenAI( model="gpt-4o", api_key=settings.openai_api_key, temperature=0.1 ) # Configure embeddings Settings.embed_model = OpenAIEmbedding( model="text-embedding-3-small", api_key=settings.openai_api_key ) # Configure semantic text splitter Settings.text_splitter = SemanticSplitterNodeParser.from_defaults( embed_model=OpenAIEmbedding( model="text-embedding-3-small", api_key=settings.openai_api_key ), buffer_size=2, breakpoint_percentile_threshold=70 ) async def process_single_file( self, file: UploadFile, index_id: str, user: UserInDB, db: AsyncIOMotorDatabase, custom_name: Optional[str] = None ) -> DocumentInDB: """Process a single uploaded file""" # Validate file await self._validate_file(file) # Generate unique filename file_extension = Path(file.filename).suffix.lower() unique_filename = f"{uuid.uuid4()}{file_extension}" # Create index-specific upload directory index_upload_dir = self.upload_dir / index_id index_upload_dir.mkdir(parents=True, exist_ok=True) # Save file file_path = index_upload_dir / unique_filename await self._save_file(file, file_path) # Create document record document_name = custom_name or file.filename document_data = DocumentCreate( filename=unique_filename, original_filename=document_name, file_size=file.size, content_type=file.content_type, index_id=index_id, uploaded_by=user.id ) document_dict = document_data.dict() document_dict["file_path"] = str(file_path) document_dict["processing_status"] = "pending" document_dict["embedding_status"] = "pending" document_dict["summary_status"] = "pending" document_dict["metadata"] = {} document_dict["created_at"] = datetime.utcnow() document_dict["updated_at"] = datetime.utcnow() # Save to database result = await db.documents.insert_one(document_dict) document_dict["_id"] = result.inserted_id document = DocumentInDB(**document_dict) # Process document asynchronously asyncio.create_task(self._process_document_async(document, db)) return document async def process_multiple_files( self, files: List[UploadFile], index_id: str, user: UserInDB, db: AsyncIOMotorDatabase, base_name: str ) -> List[DocumentInDB]: """Process multiple uploaded files""" documents = [] for i, file in enumerate(files, 1): # Generate custom name with serial number file_extension = Path(file.filename).suffix.lower() custom_name = f"{base_name}_{i:03d}{file_extension}" document = await self.process_single_file( file, index_id, user, db, custom_name ) documents.append(document) return documents async def _process_document_async(self, document: DocumentInDB, db: AsyncIOMotorDatabase): """Process document asynchronously""" print(f"Starting processing for document {document.id}: {document.original_filename}") try: # Update status to processing print(f"Setting document {document.id} to processing status") await self._update_document_status( document.id, "processing", "pending", "pending", db ) # Small delay to ensure status update is committed import asyncio await asyncio.sleep(0.1) # Parse document text print(f"Parsing document text for {document.id}") parsed_text = await self._parse_document_text(document.file_path) print(f"Parsed text length: {len(parsed_text)} characters") # Update parsing status await self._update_document_status( document.id, "completed", "processing", "pending", db ) # Create text chunks print(f"Creating text chunks for {document.id}") chunks = await self._create_text_chunks(parsed_text, document.index_id) print(f"Created {len(chunks)} chunks") # Create embeddings and store in vector database print(f"Creating embeddings for {document.id}") vector_ids = await self._create_embeddings( chunks, document.index_id, str(document.id) ) print(f"Created {len(vector_ids)} embeddings") # Update document with parsed data print(f"Updating document {document.id} with parsed data") await self._update_document_with_parsed_data( document.id, parsed_text, chunks, vector_ids, db ) # Update status to completed (core processing done) print(f"Completing processing for document {document.id}") await self._update_document_status( document.id, "completed", "completed", "pending", db ) # Extract contract summary (non-blocking) print(f"Extracting contract summary for {document.id}") try: await self._extract_contract_summary( document.id, parsed_text, document.original_filename, db ) except Exception as summary_error: print(f"Warning: Contract summary extraction failed for {document.id}: {summary_error}") # Mark summary as failed but don't fail the entire document await self._update_document_status( document.id, "completed", "completed", "failed", db ) print(f"Successfully processed document {document.id}") except Exception as e: print(f"Error processing document {document.id}: {str(e)}") import traceback traceback.print_exc() await self._update_document_status( document.id, "failed", "failed", "failed", db ) # Store error in metadata await db.documents.update_one( {"_id": ObjectId(document.id)}, {"$set": {"metadata.error": str(e)}} ) async def _parse_document_text(self, file_path: str) -> str: """Parse text from document using LlamaParse with premium mode (async)""" file_path = Path(file_path) print(f"Parsing file: {file_path}") print(f"File exists: {file_path.exists()}") print(f"File size: {file_path.stat().st_size if file_path.exists() else 'N/A'}") # Check if LlamaParse API key is available if not settings.llamaparse_api_key: raise HTTPException( status_code=500, detail="LlamaParse API key is required for document processing" ) try: print("Using LlamaParse with premium mode (async)") # Run LlamaParse in a thread pool to avoid blocking the event loop def _run_llamaparse(): parser = LlamaParse( api_key=settings.llamaparse_api_key, premium_mode=True, result_type="markdown", verbose=True ) return parser.load_data(str(file_path)) # Execute the synchronous LlamaParse call in a thread pool import asyncio loop = asyncio.get_event_loop() documents = await loop.run_in_executor(None, _run_llamaparse) print(f"LlamaParse loaded {len(documents)} documents from file") # Combine all document text full_text = "" for doc in documents: full_text += doc.text + "\n\n" text_result = full_text.strip() print(f"Final parsed text length: {len(text_result)} characters") if len(text_result) == 0: raise Exception("No text extracted from document via LlamaParse") return text_result except Exception as e: print(f"Error in _parse_document_text with LlamaParse: {str(e)}") raise HTTPException( status_code=500, detail=f"Error parsing document with LlamaParse: {str(e)}" ) async def _create_text_chunks(self, text: str, index_id: str) -> List[str]: """Create text chunks using LlamaIndex SemanticSplitter""" try: print(f"Creating semantic chunks for text of length {len(text)}") # Create semantic splitter with OpenAI embeddings semantic_splitter = SemanticSplitterNodeParser.from_defaults( embed_model=OpenAIEmbedding( model="text-embedding-3-small", api_key=settings.openai_api_key ), buffer_size=2, breakpoint_percentile_threshold=70 ) # Create a document and split it semantically llama_doc = LlamaDocument(text=text) nodes = semantic_splitter.get_nodes_from_documents([llama_doc]) # Extract text from nodes chunks = [node.text for node in nodes] print(f"Created {len(chunks)} semantic chunks") if len(chunks) == 0: raise Exception("No semantic chunks created from text") return chunks except Exception as e: print(f"Error in _create_text_chunks: {str(e)}") raise HTTPException( status_code=500, detail=f"Error creating semantic text chunks: {str(e)}" ) async def _create_embeddings( self, chunks: List[str], index_id: str, document_id: str ) -> List[str]: """Create embeddings and store using LlamaIndex""" try: collection_name = f"index_{index_id}" print(f"🔍 DEBUG - Creating embeddings using LlamaIndex for collection: {collection_name}") # Create LlamaIndex documents from chunks documents = [] vector_ids = [] for i, chunk in enumerate(chunks): chunk_id = f"{document_id}_{i}" vector_ids.append(chunk_id) # Create LlamaIndex document with metadata doc = LlamaDocument( text=chunk, metadata={ "document_id": document_id, "chunk_index": i, "index_id": index_id, "chunk_id": chunk_id } ) documents.append(doc) print(f" - Created {len(documents)} LlamaIndex documents") print(f" - Document IDs: {vector_ids}") print(f" - Document lengths: {[len(doc.text) for doc in documents]}") # Get or create ChromaDB collection using LlamaIndex client = self.get_chroma_client() try: chroma_collection = client.get_collection(name=collection_name) current_count = chroma_collection.count() print(f" - Found existing collection with {current_count} vectors") except Exception: # Create collection - let LlamaIndex handle the embedding function print(f" - Creating new ChromaDB collection") chroma_collection = client.create_collection( name=collection_name, metadata={"hnsw:space": "cosine"} ) # Create LlamaIndex vector store and index vector_store = ChromaVectorStore(chroma_collection=chroma_collection) storage_context = StorageContext.from_defaults(vector_store=vector_store) # Check if index exists try: # Try to load existing index index = VectorStoreIndex.from_vector_store( vector_store=vector_store, storage_context=storage_context ) print(f" - Loaded existing LlamaIndex VectorStoreIndex") except Exception: # Create new index index = VectorStoreIndex.from_documents( [], # Start empty storage_context=storage_context ) print(f" - Created new LlamaIndex VectorStoreIndex") # Insert documents into the index (async to avoid blocking) print(f" - Inserting {len(documents)} documents into LlamaIndex") def _insert_documents(): for doc in documents: print(f" - Inserting document chunk with metadata: {doc.metadata}") index.insert(doc) return True # Run embedding creation in thread pool to avoid blocking await asyncio.get_event_loop().run_in_executor(None, _insert_documents) # Verify the final count final_count = chroma_collection.count() print(f" - Collection count after adding: {final_count}") print(f" - Successfully added {len(vector_ids)} vectors for document {document_id}") return vector_ids except Exception as e: print(f"Error in _create_embeddings: {str(e)}") import traceback traceback.print_exc() raise HTTPException( status_code=500, detail=f"Error creating embeddings: {str(e)}" ) async def _update_document_status( self, document_id: str, processing_status: str, embedding_status: str, summary_status: str, db: AsyncIOMotorDatabase ): """Update document processing status""" await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "processing_status": processing_status, "embedding_status": embedding_status, "summary_status": summary_status, "updated_at": datetime.utcnow() }} ) async def _update_document_with_parsed_data( self, document_id: str, parsed_text: str, chunks: List[str], vector_ids: List[str], db: AsyncIOMotorDatabase ): """Update document with parsed data""" await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "parsed_text": parsed_text, "text_chunks": chunks, "chunk_count": len(chunks), "vector_ids": vector_ids, "updated_at": datetime.utcnow() }} ) async def _extract_contract_summary( self, document_id: str, parsed_text: str, filename: str, db: AsyncIOMotorDatabase ): """Extract contract summary asynchronously""" try: # Update summary status to processing await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "summary_status": "processing", "updated_at": datetime.utcnow() }} ) # Extract summary using the contract summary service result = await contract_summary_service.extract_contract_summary( parsed_text, filename ) if result["success"]: # Validate the summary validated_summary = contract_summary_service.validate_contract_summary( result["summary"] ) # Store in database await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "contract_summary": validated_summary.dict(), "summary_status": "completed", "summary_created_at": datetime.utcnow(), "updated_at": datetime.utcnow() }} ) print(f"Successfully extracted contract summary for {document_id}") else: # Store error await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "summary_status": "failed", "metadata.summary_error": result.get("error", "Unknown error"), "updated_at": datetime.utcnow() }} ) print(f"Failed to extract contract summary for {document_id}: {result.get('error')}") except Exception as e: print(f"Error extracting contract summary for {document_id}: {str(e)}") import traceback traceback.print_exc() await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "summary_status": "failed", "metadata.summary_error": str(e), "updated_at": datetime.utcnow() }} ) async def _validate_file(self, file: UploadFile): """Validate uploaded file""" if not file.filename: raise HTTPException(status_code=400, detail="No file provided") # Check file extension file_extension = Path(file.filename).suffix.lower() if file_extension not in self.allowed_extensions: raise HTTPException( status_code=400, detail=f"File type {file_extension} not supported. Allowed types: {', '.join(self.allowed_extensions)}" ) # Check file size if file.size > self.max_file_size: raise HTTPException( status_code=400, detail=f"File too large. Maximum size: {self.max_file_size / (1024*1024):.1f}MB" ) async def _save_file(self, file: UploadFile, file_path: Path): """Save uploaded file to disk""" try: async with aiofiles.open(file_path, 'wb') as f: content = await file.read() await f.write(content) except Exception as e: raise HTTPException( status_code=500, detail=f"Error saving file: {str(e)}" ) async def query_documents( self, query: str, index_id: str, top_k: int = 10 ) -> List[Dict[str, Any]]: """Query documents using LlamaIndex VectorStoreIndex""" try: collection_name = f"index_{index_id}" # Ensure consistent embedding model for LlamaIndex Settings.embed_model = OpenAIEmbedding( model="text-embedding-3-small", api_key=settings.openai_api_key ) # DEBUG: LlamaIndex query setup print(f"🔍 DEBUG - LlamaIndex Query Execution:") print(f" - Query: '{query}'") print(f" - Collection name: {collection_name}") print(f" - Top K: {top_k}") print(f" - Settings.embed_model: {type(Settings.embed_model).__name__}") # Test embedding dimensions try: test_embedding = Settings.embed_model.get_text_embedding("test") print(f" - Query embedding dimensions: {len(test_embedding)}") except Exception as e: print(f" - ERROR getting test embedding: {e}") # Check if collection exists (without specifying embedding function) try: client = self.get_chroma_client() chroma_collection = client.get_collection(name=collection_name) collection_count = chroma_collection.count() print(f" - Found collection with {collection_count} documents") if collection_count == 0: raise HTTPException( status_code=404, detail=f"Index '{index_id}' exists but contains no processed documents. Please upload and process documents first." ) except Exception as collection_error: print(f" - Collection {collection_name} not found: {collection_error}") raise HTTPException( status_code=404, detail=f"Vector collection for index '{index_id}' does not exist. Documents may still be processing or failed to process. Please check the admin panel for processing status." ) # Create LlamaIndex VectorStore and Index try: print(f" - Creating LlamaIndex VectorStore...") vector_store = ChromaVectorStore(chroma_collection=chroma_collection) storage_context = StorageContext.from_defaults(vector_store=vector_store) # Load the index index = VectorStoreIndex.from_vector_store( vector_store=vector_store, storage_context=storage_context ) # Create query engine query_engine = index.as_query_engine( similarity_top_k=top_k, response_mode="no_text" # Only return source nodes, no generated text ) print(f" - Executing LlamaIndex query...") response = query_engine.query(query) print(f" - Query successful") # Extract and format results from source nodes formatted_results = [] if hasattr(response, 'source_nodes') and response.source_nodes: for i, node in enumerate(response.source_nodes): # Get similarity score (higher is better) similarity_score = node.score if hasattr(node, 'score') and node.score is not None else 0.5 # Convert similarity score to distance (lower is better) # LlamaIndex typically returns scores between 0 and 1, where 1 is most similar distance = 1.0 - similarity_score formatted_results.append({ "content": node.text, "metadata": node.metadata, "score": similarity_score, "distance": distance, "document_id": node.metadata.get("document_id", "unknown"), "chunk_index": node.metadata.get("chunk_index", i) }) print(f" - Retrieved {len(formatted_results)} relevant chunks") print(f" - Similarity scores: {[r['score'] for r in formatted_results]}") print(f" - Distance values: {[r['distance'] for r in formatted_results]}") else: print(f" - No source nodes found in response") except Exception as query_error: print(f" - ERROR during LlamaIndex query: {query_error}") print(f" - Error type: {type(query_error).__name__}") raise query_error if not formatted_results: raise HTTPException( status_code=404, detail=f"No relevant documents found for query '{query}' in index '{index_id}'. Try rephrasing your question or ensure documents are properly processed." ) return formatted_results except HTTPException: # Re-raise HTTPExceptions as-is raise except Exception as e: print(f"Unexpected error in query_documents: {e}") raise HTTPException( status_code=500, detail=f"Error querying documents: {str(e)}" ) def generate_response( self, query: str, context_chunks: List[str], index_id: str ) -> str: """Generate response using OpenAI with retrieved context""" try: # Prepare context context = "\n\n".join(context_chunks) # Create prompt prompt = f"""Based on the following context, answer the user's question. If the answer is not in the context, say "I don't have enough information to answer that question." Context: {context} Question: {query} Answer:""" # Generate response using OpenAI llm = OpenAI( model="gpt-4o", api_key=settings.openai_api_key, temperature=0.1 ) # Use sync completion response = llm.complete(prompt) return response.text except Exception as e: raise HTTPException( status_code=500, detail=f"Error generating response: {str(e)}" ) async def delete_document_embeddings( self, document_id: str, index_id: str ): """Delete document embeddings from vector store""" try: collection_name = f"index_{index_id}" print(f"🗑️ DEBUG - Deleting embeddings for document {document_id} from collection {collection_name}") collection = self.get_chroma_client().get_collection(name=collection_name) # Get collection count before deletion count_before = collection.count() print(f" - Collection count before deletion: {count_before}") # Strategy 1: Try to delete by document_id metadata try: results = collection.get(where={"document_id": document_id}) print(f" - Strategy 1 - Found {len(results['ids']) if results['ids'] else 0} vectors by document_id") if results["ids"]: collection.delete(ids=results["ids"]) count_after = collection.count() print(f" - Strategy 1 - Successfully deleted {count_before - count_after} vectors") if count_before != count_after: return # Success, exit early except Exception as e: print(f" - Strategy 1 failed: {e}") # Strategy 2: Try to delete by chunk_id pattern (document_id_*) try: # Get all vectors and filter by chunk_id pattern all_results = collection.get() matching_ids = [] if all_results["ids"] and all_results["metadatas"]: for vid, metadata in zip(all_results["ids"], all_results["metadatas"]): if metadata and "chunk_id" in metadata: if metadata["chunk_id"].startswith(f"{document_id}_"): matching_ids.append(vid) print(f" - Strategy 2 - Found {len(matching_ids)} vectors by chunk_id pattern") if matching_ids: collection.delete(ids=matching_ids) count_after = collection.count() print(f" - Strategy 2 - Successfully deleted {count_before - count_after} vectors") if count_before != count_after: return # Success, exit early except Exception as e: print(f" - Strategy 2 failed: {e}") # Strategy 3: Try to delete by any metadata containing document_id try: all_results = collection.get() matching_ids = [] if all_results["ids"] and all_results["metadatas"]: for vid, metadata in zip(all_results["ids"], all_results["metadatas"]): if metadata: # Check if any metadata value contains the document_id for key, value in metadata.items(): if str(value) == document_id: matching_ids.append(vid) break print(f" - Strategy 3 - Found {len(matching_ids)} vectors by metadata scan") if matching_ids: collection.delete(ids=matching_ids) count_after = collection.count() print(f" - Strategy 3 - Successfully deleted {count_before - count_after} vectors") if count_before != count_after: return # Success, exit early except Exception as e: print(f" - Strategy 3 failed: {e}") # If we get here, no vectors were deleted print(f" - WARNING: No vectors were deleted for document {document_id}") # Debug: Show some sample metadata to understand the structure try: sample_results = collection.get(limit=3) print(f" - DEBUG: Sample metadata structures:") for i, metadata in enumerate(sample_results.get("metadatas", [])[:3]): print(f" - Sample {i}: {metadata}") except Exception as e: print(f" - DEBUG: Could not get sample metadata: {e}") except Exception as e: print(f"Error deleting embeddings for document {document_id}: {e}") import traceback traceback.print_exc() def check_collection_exists(self, index_id: str) -> bool: """Check if ChromaDB collection exists for an index""" try: collection_name = f"index_{index_id}" collection = self.get_chroma_client().get_collection(name=collection_name) return True except Exception: return False def get_collection_info(self, index_id: str) -> Dict[str, Any]: """Get information about a ChromaDB collection""" try: collection_name = f"index_{index_id}" collection = self.get_chroma_client().get_collection(name=collection_name) return { "exists": True, "name": collection_name, "count": collection.count(), "metadata": collection.metadata } except Exception as e: return { "exists": False, "name": f"index_{index_id}", "count": 0, "error": str(e) } async def generate_document_summary(self, text: str, filename: str) -> str: """Generate AI summary of a document""" try: # Check text length and raise error if too long if len(text) > settings.max_summary_chars: error_msg = f"Document too large for summary: {len(text)} characters exceeds maximum of {settings.max_summary_chars} characters" print(f"Error: {error_msg}") return f"Error: {error_msg}" # Create summarization prompt prompt = f"""Please provide a concise summary of the following document "{filename}". Focus on the main points, key information, and important details. Keep the summary between 150-300 words. Document content: {text} Summary:""" # Generate summary using OpenAI via Settings.llm response = await Settings.llm.acomplete(prompt) summary = str(response).strip() # Fallback if summary is too short if len(summary) < 50: summary = "Unable to generate detailed summary. This document contains text that may require manual review." return summary except Exception as e: return f"Error generating summary: {str(e)}" # Global processor instance llama_processor = LlamaProcessor()