contract-query/backend/app/services/document_processor.py
2025-08-14 15:03:33 -05:00

209 lines
No EOL
6.9 KiB
Python

import os
import uuid
import shutil
from typing import List, Dict, Any, Optional
from pathlib import Path
import aiofiles
from fastapi import UploadFile, HTTPException
from motor.motor_asyncio import AsyncIOMotorDatabase
from ..config.settings import settings
from ..models.document import DocumentCreate, DocumentInDB
from ..models.user import UserInDB
from ..utils.file_utils import validate_file, get_file_info
class DocumentProcessor:
def __init__(self):
self.upload_dir = Path(settings.upload_dir)
self.allowed_extensions = {
'.pdf', '.docx', '.doc', '.txt', '.csv', '.json', '.html', '.md', '.rtf'
}
self.max_file_size = 50 * 1024 * 1024 # 50MB
async def process_upload(
self,
file: UploadFile,
index_id: str,
user: UserInDB,
db: AsyncIOMotorDatabase
) -> DocumentInDB:
"""Process uploaded file and save to database"""
# Validate file
await self._validate_file(file)
# Generate unique filename
file_extension = Path(file.filename).suffix.lower()
unique_filename = f"{uuid.uuid4()}{file_extension}"
# Create index-specific upload directory
index_upload_dir = self.upload_dir / index_id
index_upload_dir.mkdir(parents=True, exist_ok=True)
# Save file
file_path = index_upload_dir / unique_filename
await self._save_file(file, file_path)
# Create document record
document_data = DocumentCreate(
filename=unique_filename,
original_filename=file.filename,
file_size=file.size,
content_type=file.content_type,
index_id=index_id,
uploaded_by=user.id
)
document_dict = document_data.dict()
document_dict["file_path"] = str(file_path)
document_dict["processing_status"] = "pending"
document_dict["embedding_status"] = "pending"
document_dict["metadata"] = {}
document_dict["chunk_count"] = 0
# Save to database
result = await db.documents.insert_one(document_dict)
document_dict["_id"] = result.inserted_id
return DocumentInDB(**document_dict)
async def _validate_file(self, file: UploadFile):
"""Validate uploaded file"""
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Check file extension
file_extension = Path(file.filename).suffix.lower()
if file_extension not in self.allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"File type {file_extension} not supported. Allowed types: {', '.join(self.allowed_extensions)}"
)
# Check file size
if file.size > self.max_file_size:
raise HTTPException(
status_code=400,
detail=f"File too large. Maximum size: {self.max_file_size / (1024*1024):.1f}MB"
)
async def _save_file(self, file: UploadFile, file_path: Path):
"""Save uploaded file to disk"""
try:
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error saving file: {str(e)}"
)
async def delete_document(
self,
document_id: str,
db: AsyncIOMotorDatabase
) -> bool:
"""Delete document and associated file with complete cleanup"""
from bson import ObjectId
# Get document
document = await db.documents.find_one({"_id": ObjectId(document_id)})
if not document:
return False
index_id = document["index_id"]
# Delete embeddings from vector store
try:
from .llama_processor import llama_processor
await llama_processor.delete_document_embeddings(document_id, index_id)
except Exception as e:
print(f"Error deleting embeddings for document {document_id}: {e}")
# Delete file
file_path = Path(document["file_path"])
if file_path.exists():
try:
file_path.unlink()
except Exception as e:
print(f"Error deleting file {file_path}: {e}")
# Note: Cache clearing removed - caching is disabled for data freshness
# Delete document record
result = await db.documents.delete_one({"_id": ObjectId(document_id)})
return result.deleted_count > 0
async def get_documents_by_index(
self,
index_id: str,
db: AsyncIOMotorDatabase
) -> List[DocumentInDB]:
"""Get all documents for an index"""
documents = []
cursor = db.documents.find({"index_id": index_id})
async for doc in cursor:
documents.append(DocumentInDB(**doc))
return documents
async def update_processing_status(
self,
document_id: str,
status: str,
metadata: Optional[Dict[str, Any]] = None,
db: AsyncIOMotorDatabase = None
):
"""Update document processing status"""
from bson import ObjectId
from datetime import datetime
update_data = {
"processing_status": status,
"updated_at": datetime.utcnow()
}
if metadata:
update_data["metadata"] = metadata
# Store parsed text if available
if "parsed_text" in metadata:
update_data["parsed_text"] = metadata["parsed_text"]
if "chunk_count" in metadata:
update_data["chunk_count"] = metadata["chunk_count"]
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": update_data}
)
async def update_embedding_status(
self,
document_id: str,
status: str,
metadata: Optional[Dict[str, Any]] = None,
db: AsyncIOMotorDatabase = None
):
"""Update document embedding status"""
from bson import ObjectId
from datetime import datetime
update_data = {
"embedding_status": status,
"updated_at": datetime.utcnow()
}
if metadata:
# Store vector information if available
if "vector_ids" in metadata:
update_data["vector_ids"] = metadata["vector_ids"]
if "chunk_count" in metadata:
update_data["chunk_count"] = metadata["chunk_count"]
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": update_data}
)
# Global processor instance
document_processor = DocumentProcessor()