209 lines
No EOL
6.9 KiB
Python
209 lines
No EOL
6.9 KiB
Python
import os
|
|
import uuid
|
|
import shutil
|
|
from typing import List, Dict, Any, Optional
|
|
from pathlib import Path
|
|
import aiofiles
|
|
from fastapi import UploadFile, HTTPException
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
|
|
from ..config.settings import settings
|
|
from ..models.document import DocumentCreate, DocumentInDB
|
|
from ..models.user import UserInDB
|
|
from ..utils.file_utils import validate_file, get_file_info
|
|
|
|
class DocumentProcessor:
|
|
def __init__(self):
|
|
self.upload_dir = Path(settings.upload_dir)
|
|
self.allowed_extensions = {
|
|
'.pdf', '.docx', '.doc', '.txt', '.csv', '.json', '.html', '.md', '.rtf'
|
|
}
|
|
self.max_file_size = 50 * 1024 * 1024 # 50MB
|
|
|
|
async def process_upload(
|
|
self,
|
|
file: UploadFile,
|
|
index_id: str,
|
|
user: UserInDB,
|
|
db: AsyncIOMotorDatabase
|
|
) -> DocumentInDB:
|
|
"""Process uploaded file and save to database"""
|
|
# Validate file
|
|
await self._validate_file(file)
|
|
|
|
# Generate unique filename
|
|
file_extension = Path(file.filename).suffix.lower()
|
|
unique_filename = f"{uuid.uuid4()}{file_extension}"
|
|
|
|
# Create index-specific upload directory
|
|
index_upload_dir = self.upload_dir / index_id
|
|
index_upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save file
|
|
file_path = index_upload_dir / unique_filename
|
|
await self._save_file(file, file_path)
|
|
|
|
# Create document record
|
|
document_data = DocumentCreate(
|
|
filename=unique_filename,
|
|
original_filename=file.filename,
|
|
file_size=file.size,
|
|
content_type=file.content_type,
|
|
index_id=index_id,
|
|
uploaded_by=user.id
|
|
)
|
|
|
|
document_dict = document_data.dict()
|
|
document_dict["file_path"] = str(file_path)
|
|
document_dict["processing_status"] = "pending"
|
|
document_dict["embedding_status"] = "pending"
|
|
document_dict["metadata"] = {}
|
|
document_dict["chunk_count"] = 0
|
|
|
|
# Save to database
|
|
result = await db.documents.insert_one(document_dict)
|
|
document_dict["_id"] = result.inserted_id
|
|
|
|
return DocumentInDB(**document_dict)
|
|
|
|
async def _validate_file(self, file: UploadFile):
|
|
"""Validate uploaded file"""
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No file provided")
|
|
|
|
# Check file extension
|
|
file_extension = Path(file.filename).suffix.lower()
|
|
if file_extension not in self.allowed_extensions:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File type {file_extension} not supported. Allowed types: {', '.join(self.allowed_extensions)}"
|
|
)
|
|
|
|
# Check file size
|
|
if file.size > self.max_file_size:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File too large. Maximum size: {self.max_file_size / (1024*1024):.1f}MB"
|
|
)
|
|
|
|
async def _save_file(self, file: UploadFile, file_path: Path):
|
|
"""Save uploaded file to disk"""
|
|
try:
|
|
async with aiofiles.open(file_path, 'wb') as f:
|
|
content = await file.read()
|
|
await f.write(content)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error saving file: {str(e)}"
|
|
)
|
|
|
|
async def delete_document(
|
|
self,
|
|
document_id: str,
|
|
db: AsyncIOMotorDatabase
|
|
) -> bool:
|
|
"""Delete document and associated file with complete cleanup"""
|
|
from bson import ObjectId
|
|
|
|
# Get document
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
return False
|
|
|
|
index_id = document["index_id"]
|
|
|
|
# Delete embeddings from vector store
|
|
try:
|
|
from .llama_processor import llama_processor
|
|
await llama_processor.delete_document_embeddings(document_id, index_id)
|
|
except Exception as e:
|
|
print(f"Error deleting embeddings for document {document_id}: {e}")
|
|
|
|
# Delete file
|
|
file_path = Path(document["file_path"])
|
|
if file_path.exists():
|
|
try:
|
|
file_path.unlink()
|
|
except Exception as e:
|
|
print(f"Error deleting file {file_path}: {e}")
|
|
|
|
# Note: Cache clearing removed - caching is disabled for data freshness
|
|
|
|
# Delete document record
|
|
result = await db.documents.delete_one({"_id": ObjectId(document_id)})
|
|
return result.deleted_count > 0
|
|
|
|
async def get_documents_by_index(
|
|
self,
|
|
index_id: str,
|
|
db: AsyncIOMotorDatabase
|
|
) -> List[DocumentInDB]:
|
|
"""Get all documents for an index"""
|
|
documents = []
|
|
cursor = db.documents.find({"index_id": index_id})
|
|
|
|
async for doc in cursor:
|
|
documents.append(DocumentInDB(**doc))
|
|
|
|
return documents
|
|
|
|
async def update_processing_status(
|
|
self,
|
|
document_id: str,
|
|
status: str,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
db: AsyncIOMotorDatabase = None
|
|
):
|
|
"""Update document processing status"""
|
|
from bson import ObjectId
|
|
from datetime import datetime
|
|
|
|
update_data = {
|
|
"processing_status": status,
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
if metadata:
|
|
update_data["metadata"] = metadata
|
|
# Store parsed text if available
|
|
if "parsed_text" in metadata:
|
|
update_data["parsed_text"] = metadata["parsed_text"]
|
|
if "chunk_count" in metadata:
|
|
update_data["chunk_count"] = metadata["chunk_count"]
|
|
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": update_data}
|
|
)
|
|
|
|
async def update_embedding_status(
|
|
self,
|
|
document_id: str,
|
|
status: str,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
db: AsyncIOMotorDatabase = None
|
|
):
|
|
"""Update document embedding status"""
|
|
from bson import ObjectId
|
|
from datetime import datetime
|
|
|
|
update_data = {
|
|
"embedding_status": status,
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
if metadata:
|
|
# Store vector information if available
|
|
if "vector_ids" in metadata:
|
|
update_data["vector_ids"] = metadata["vector_ids"]
|
|
if "chunk_count" in metadata:
|
|
update_data["chunk_count"] = metadata["chunk_count"]
|
|
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": update_data}
|
|
)
|
|
|
|
# Global processor instance
|
|
document_processor = DocumentProcessor() |