323 lines
No EOL
12 KiB
Python
323 lines
No EOL
12 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
|
|
from fastapi.responses import FileResponse
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
from typing import List
|
|
from bson import ObjectId
|
|
from datetime import datetime
|
|
import os
|
|
import asyncio
|
|
from urllib.parse import unquote
|
|
|
|
from ...config.database import get_database
|
|
from ...core.auth import get_current_active_user, require_index_access
|
|
from ...models.user import UserInDB
|
|
from ...models.document import Document, DocumentInDB
|
|
from ...models.contract_summary import ContractSummaryResponse
|
|
from ...services.document_processor import document_processor
|
|
from ...services.rag_service import rag_service
|
|
from ...services.llama_processor import llama_processor
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/upload", response_model=dict)
|
|
async def upload_document(
|
|
file: UploadFile = File(...),
|
|
index_id: str = Form(...),
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Upload a document to an index"""
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
# Process the upload using LlamaProcessor (includes contract summary processing)
|
|
document = await llama_processor.process_single_file(file, index_id, current_user, db)
|
|
|
|
return {
|
|
"message": "Document uploaded successfully",
|
|
"document_id": str(document.id),
|
|
"filename": document.filename,
|
|
"processing_status": "pending"
|
|
}
|
|
|
|
# Background processing is now handled by LlamaProcessor automatically
|
|
|
|
@router.get("/index/{index_id}", response_model=List[dict])
|
|
async def get_documents_by_index(
|
|
index_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get all documents for a specific index"""
|
|
# Decode URL-encoded index_id
|
|
decoded_index_id = unquote(index_id)
|
|
|
|
# Check if user has access to this index
|
|
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this index")
|
|
|
|
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
|
|
|
|
return [
|
|
{
|
|
"id": str(doc.id),
|
|
"filename": doc.filename,
|
|
"original_filename": doc.original_filename,
|
|
"file_size": doc.file_size,
|
|
"content_type": doc.content_type,
|
|
"processing_status": doc.processing_status,
|
|
"embedding_status": doc.embedding_status,
|
|
"summary_status": getattr(doc, 'summary_status', 'pending'),
|
|
"created_at": doc.created_at,
|
|
"updated_at": doc.updated_at,
|
|
"metadata": doc.metadata,
|
|
"chunk_count": doc.chunk_count
|
|
}
|
|
for doc in documents
|
|
]
|
|
|
|
@router.get("/{document_id}/download")
|
|
async def download_document(
|
|
document_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Download a document"""
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
doc = DocumentInDB(**document)
|
|
|
|
# Check if user has access to this document's index
|
|
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this document")
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(doc.file_path):
|
|
raise HTTPException(status_code=404, detail="File not found on disk")
|
|
|
|
return FileResponse(
|
|
path=doc.file_path,
|
|
filename=doc.original_filename,
|
|
media_type=doc.content_type
|
|
)
|
|
|
|
@router.get("/{document_id}/text")
|
|
async def get_document_text(
|
|
document_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get the parsed text content of a document"""
|
|
print(f"🔍 DEBUG - Getting document text for ID: {document_id}")
|
|
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
print(f"❌ DEBUG - Document not found: {document_id}")
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
print(f"✅ DEBUG - Document found: {document.get('original_filename', 'Unknown')}")
|
|
doc = DocumentInDB(**document)
|
|
|
|
# Check if user has access to this document's index
|
|
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
|
|
print(f"❌ DEBUG - Access denied to index: {doc.index_id}")
|
|
raise HTTPException(status_code=403, detail="Access denied to this document")
|
|
|
|
# Check if document has been processed
|
|
print(f"📊 DEBUG - Processing status: {doc.processing_status}")
|
|
if doc.processing_status != "completed":
|
|
raise HTTPException(status_code=400, detail="Document not yet processed")
|
|
|
|
# Check if parsed text exists
|
|
parsed_text = getattr(doc, 'parsed_text', None)
|
|
print(f"📝 DEBUG - Parsed text length: {len(parsed_text) if parsed_text else 0}")
|
|
if not parsed_text:
|
|
raise HTTPException(status_code=404, detail="Document text not available")
|
|
|
|
print(f"✅ DEBUG - Returning document text successfully")
|
|
return {
|
|
"success": True,
|
|
"document_id": str(doc.id),
|
|
"filename": doc.original_filename,
|
|
"text": parsed_text,
|
|
"text_length": len(parsed_text),
|
|
"processing_status": doc.processing_status,
|
|
"created_at": doc.created_at,
|
|
"updated_at": doc.updated_at
|
|
}
|
|
|
|
@router.get("/{document_id}", response_model=dict)
|
|
async def get_document(
|
|
document_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get a specific document"""
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
doc = DocumentInDB(**document)
|
|
|
|
# Check if user has access to this document's index
|
|
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this document")
|
|
|
|
return {
|
|
"id": str(doc.id),
|
|
"filename": doc.filename,
|
|
"original_filename": doc.original_filename,
|
|
"file_size": doc.file_size,
|
|
"content_type": doc.content_type,
|
|
"index_id": doc.index_id,
|
|
"processing_status": doc.processing_status,
|
|
"created_at": doc.created_at,
|
|
"updated_at": doc.updated_at,
|
|
"metadata": doc.metadata
|
|
}
|
|
|
|
@router.delete("/{document_id}")
|
|
async def delete_document(
|
|
document_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Delete a document"""
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
doc = DocumentInDB(**document)
|
|
|
|
# Only admins can delete documents
|
|
if current_user.role.value != "admin":
|
|
raise HTTPException(status_code=403, detail="Only administrators can delete documents")
|
|
|
|
# Delete the document
|
|
success = await document_processor.delete_document(document_id, db)
|
|
|
|
if success:
|
|
return {"message": "Document deleted successfully"}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to delete document")
|
|
|
|
@router.get("/{document_id}/summary", response_model=ContractSummaryResponse)
|
|
async def get_document_summary(
|
|
document_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Get structured contract summary of a document"""
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
doc = DocumentInDB(**document)
|
|
|
|
# Check if user has access to this document's index
|
|
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this document")
|
|
|
|
# Check if document has been processed
|
|
if doc.processing_status != "completed":
|
|
raise HTTPException(status_code=400, detail="Document not yet processed")
|
|
|
|
# Get summary status
|
|
summary_status = getattr(doc, 'summary_status', 'pending')
|
|
|
|
# If summary is completed, return structured summary
|
|
if summary_status == "completed" and hasattr(doc, 'contract_summary') and doc.contract_summary:
|
|
from ...models.contract_summary import ContractSummary
|
|
contract_summary = ContractSummary(**doc.contract_summary)
|
|
|
|
return ContractSummaryResponse(
|
|
success=True,
|
|
summary=contract_summary,
|
|
status=summary_status,
|
|
filename=doc.original_filename,
|
|
created_at=getattr(doc, 'summary_created_at', doc.created_at).isoformat() if getattr(doc, 'summary_created_at', doc.created_at) else None,
|
|
updated_at=doc.updated_at.isoformat() if doc.updated_at else None
|
|
)
|
|
|
|
# If summary is processing, return status
|
|
elif summary_status == "processing":
|
|
return ContractSummaryResponse(
|
|
success=False,
|
|
status=summary_status,
|
|
filename=doc.original_filename,
|
|
error="Contract summary is currently being processed. Please check back in a few moments."
|
|
)
|
|
|
|
# If summary failed, return error
|
|
elif summary_status == "failed":
|
|
error_msg = "Contract summary extraction failed."
|
|
if hasattr(doc, 'metadata') and doc.metadata and 'summary_error' in doc.metadata:
|
|
error_msg += f" Error: {doc.metadata['summary_error']}"
|
|
|
|
return ContractSummaryResponse(
|
|
success=False,
|
|
status=summary_status,
|
|
filename=doc.original_filename,
|
|
error=error_msg
|
|
)
|
|
|
|
# If summary is pending, return pending status
|
|
else:
|
|
return ContractSummaryResponse(
|
|
success=False,
|
|
status="pending",
|
|
filename=doc.original_filename,
|
|
error="Contract summary processing has not started yet. Please try again later."
|
|
)
|
|
|
|
@router.post("/{document_id}/summary/reprocess")
|
|
async def reprocess_document_summary(
|
|
document_id: str,
|
|
current_user: UserInDB = Depends(get_current_active_user),
|
|
db: AsyncIOMotorDatabase = Depends(get_database)
|
|
):
|
|
"""Reprocess contract summary for a document"""
|
|
document = await db.documents.find_one({"_id": ObjectId(document_id)})
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
doc = DocumentInDB(**document)
|
|
|
|
# Check if user has access to this document's index
|
|
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
|
|
raise HTTPException(status_code=403, detail="Access denied to this document")
|
|
|
|
# Check if document has been processed
|
|
if doc.processing_status != "completed" or not doc.parsed_text:
|
|
raise HTTPException(status_code=400, detail="Document not yet processed or text not available")
|
|
|
|
# Reset summary status and trigger reprocessing
|
|
from ...services.llama_processor import llama_processor
|
|
try:
|
|
# Reset summary status
|
|
await db.documents.update_one(
|
|
{"_id": ObjectId(document_id)},
|
|
{"$set": {
|
|
"summary_status": "pending",
|
|
"contract_summary": None,
|
|
"summary_created_at": None,
|
|
"updated_at": datetime.utcnow()
|
|
}}
|
|
)
|
|
|
|
# Trigger summary extraction asynchronously
|
|
asyncio.create_task(llama_processor._extract_contract_summary(
|
|
document_id, doc.parsed_text, doc.original_filename, db
|
|
))
|
|
|
|
return {
|
|
"message": "Contract summary reprocessing started",
|
|
"document_id": document_id,
|
|
"filename": doc.original_filename,
|
|
"status": "processing"
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error starting summary reprocessing: {str(e)}") |