import os import uuid import shutil from typing import List, Dict, Any, Optional from pathlib import Path import aiofiles from fastapi import UploadFile, HTTPException from motor.motor_asyncio import AsyncIOMotorDatabase from ..config.settings import settings from ..models.document import DocumentCreate, DocumentInDB from ..models.user import UserInDB from ..utils.file_utils import validate_file, get_file_info class DocumentProcessor: def __init__(self): self.upload_dir = Path(settings.upload_dir) self.allowed_extensions = { '.pdf', '.docx', '.doc', '.txt', '.csv', '.json', '.html', '.md', '.rtf' } self.max_file_size = 50 * 1024 * 1024 # 50MB async def process_upload( self, file: UploadFile, index_id: str, user: UserInDB, db: AsyncIOMotorDatabase ) -> DocumentInDB: """Process uploaded file and save to database""" # Validate file await self._validate_file(file) # Generate unique filename file_extension = Path(file.filename).suffix.lower() unique_filename = f"{uuid.uuid4()}{file_extension}" # Create index-specific upload directory index_upload_dir = self.upload_dir / index_id index_upload_dir.mkdir(parents=True, exist_ok=True) # Save file file_path = index_upload_dir / unique_filename await self._save_file(file, file_path) # Create document record document_data = DocumentCreate( filename=unique_filename, original_filename=file.filename, file_size=file.size, content_type=file.content_type, index_id=index_id, uploaded_by=user.id ) document_dict = document_data.dict() document_dict["file_path"] = str(file_path) document_dict["processing_status"] = "pending" document_dict["embedding_status"] = "pending" document_dict["metadata"] = {} document_dict["chunk_count"] = 0 # Save to database result = await db.documents.insert_one(document_dict) document_dict["_id"] = result.inserted_id return DocumentInDB(**document_dict) async def _validate_file(self, file: UploadFile): """Validate uploaded file""" if not file.filename: raise HTTPException(status_code=400, detail="No file provided") # Check file extension file_extension = Path(file.filename).suffix.lower() if file_extension not in self.allowed_extensions: raise HTTPException( status_code=400, detail=f"File type {file_extension} not supported. Allowed types: {', '.join(self.allowed_extensions)}" ) # Check file size if file.size > self.max_file_size: raise HTTPException( status_code=400, detail=f"File too large. Maximum size: {self.max_file_size / (1024*1024):.1f}MB" ) async def _save_file(self, file: UploadFile, file_path: Path): """Save uploaded file to disk""" try: async with aiofiles.open(file_path, 'wb') as f: content = await file.read() await f.write(content) except Exception as e: raise HTTPException( status_code=500, detail=f"Error saving file: {str(e)}" ) async def delete_document( self, document_id: str, db: AsyncIOMotorDatabase ) -> bool: """Delete document and associated file with complete cleanup""" from bson import ObjectId # Get document document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: return False index_id = document["index_id"] # Delete embeddings from vector store try: from .llama_processor import llama_processor await llama_processor.delete_document_embeddings(document_id, index_id) except Exception as e: print(f"Error deleting embeddings for document {document_id}: {e}") # Delete file file_path = Path(document["file_path"]) if file_path.exists(): try: file_path.unlink() except Exception as e: print(f"Error deleting file {file_path}: {e}") # Note: Cache clearing removed - caching is disabled for data freshness # Delete document record result = await db.documents.delete_one({"_id": ObjectId(document_id)}) return result.deleted_count > 0 async def get_documents_by_index( self, index_id: str, db: AsyncIOMotorDatabase ) -> List[DocumentInDB]: """Get all documents for an index""" documents = [] cursor = db.documents.find({"index_id": index_id}) async for doc in cursor: documents.append(DocumentInDB(**doc)) return documents async def update_processing_status( self, document_id: str, status: str, metadata: Optional[Dict[str, Any]] = None, db: AsyncIOMotorDatabase = None ): """Update document processing status""" from bson import ObjectId from datetime import datetime update_data = { "processing_status": status, "updated_at": datetime.utcnow() } if metadata: update_data["metadata"] = metadata # Store parsed text if available if "parsed_text" in metadata: update_data["parsed_text"] = metadata["parsed_text"] if "chunk_count" in metadata: update_data["chunk_count"] = metadata["chunk_count"] await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": update_data} ) async def update_embedding_status( self, document_id: str, status: str, metadata: Optional[Dict[str, Any]] = None, db: AsyncIOMotorDatabase = None ): """Update document embedding status""" from bson import ObjectId from datetime import datetime update_data = { "embedding_status": status, "updated_at": datetime.utcnow() } if metadata: # Store vector information if available if "vector_ids" in metadata: update_data["vector_ids"] = metadata["vector_ids"] if "chunk_count" in metadata: update_data["chunk_count"] = metadata["chunk_count"] await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": update_data} ) # Global processor instance document_processor = DocumentProcessor()