contract-query/backend/app/api/v1/documents.py
2025-08-14 15:03:33 -05:00

323 lines
No EOL
12 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from fastapi.responses import FileResponse
from motor.motor_asyncio import AsyncIOMotorDatabase
from typing import List
from bson import ObjectId
from datetime import datetime
import os
import asyncio
from urllib.parse import unquote
from ...config.database import get_database
from ...core.auth import get_current_active_user, require_index_access
from ...models.user import UserInDB
from ...models.document import Document, DocumentInDB
from ...models.contract_summary import ContractSummaryResponse
from ...services.document_processor import document_processor
from ...services.rag_service import rag_service
from ...services.llama_processor import llama_processor
router = APIRouter()
@router.post("/upload", response_model=dict)
async def upload_document(
file: UploadFile = File(...),
index_id: str = Form(...),
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Upload a document to an index"""
# Check if user has access to this index
if current_user.role.value != "admin" and index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
# Process the upload using LlamaProcessor (includes contract summary processing)
document = await llama_processor.process_single_file(file, index_id, current_user, db)
return {
"message": "Document uploaded successfully",
"document_id": str(document.id),
"filename": document.filename,
"processing_status": "pending"
}
# Background processing is now handled by LlamaProcessor automatically
@router.get("/index/{index_id}", response_model=List[dict])
async def get_documents_by_index(
index_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get all documents for a specific index"""
# Decode URL-encoded index_id
decoded_index_id = unquote(index_id)
# Check if user has access to this index
if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this index")
documents = await document_processor.get_documents_by_index(decoded_index_id, db)
return [
{
"id": str(doc.id),
"filename": doc.filename,
"original_filename": doc.original_filename,
"file_size": doc.file_size,
"content_type": doc.content_type,
"processing_status": doc.processing_status,
"embedding_status": doc.embedding_status,
"summary_status": getattr(doc, 'summary_status', 'pending'),
"created_at": doc.created_at,
"updated_at": doc.updated_at,
"metadata": doc.metadata,
"chunk_count": doc.chunk_count
}
for doc in documents
]
@router.get("/{document_id}/download")
async def download_document(
document_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Download a document"""
document = await db.documents.find_one({"_id": ObjectId(document_id)})
if not document:
raise HTTPException(status_code=404, detail="Document not found")
doc = DocumentInDB(**document)
# Check if user has access to this document's index
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this document")
# Check if file exists
if not os.path.exists(doc.file_path):
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
path=doc.file_path,
filename=doc.original_filename,
media_type=doc.content_type
)
@router.get("/{document_id}/text")
async def get_document_text(
document_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get the parsed text content of a document"""
print(f"🔍 DEBUG - Getting document text for ID: {document_id}")
document = await db.documents.find_one({"_id": ObjectId(document_id)})
if not document:
print(f"❌ DEBUG - Document not found: {document_id}")
raise HTTPException(status_code=404, detail="Document not found")
print(f"✅ DEBUG - Document found: {document.get('original_filename', 'Unknown')}")
doc = DocumentInDB(**document)
# Check if user has access to this document's index
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
print(f"❌ DEBUG - Access denied to index: {doc.index_id}")
raise HTTPException(status_code=403, detail="Access denied to this document")
# Check if document has been processed
print(f"📊 DEBUG - Processing status: {doc.processing_status}")
if doc.processing_status != "completed":
raise HTTPException(status_code=400, detail="Document not yet processed")
# Check if parsed text exists
parsed_text = getattr(doc, 'parsed_text', None)
print(f"📝 DEBUG - Parsed text length: {len(parsed_text) if parsed_text else 0}")
if not parsed_text:
raise HTTPException(status_code=404, detail="Document text not available")
print(f"✅ DEBUG - Returning document text successfully")
return {
"success": True,
"document_id": str(doc.id),
"filename": doc.original_filename,
"text": parsed_text,
"text_length": len(parsed_text),
"processing_status": doc.processing_status,
"created_at": doc.created_at,
"updated_at": doc.updated_at
}
@router.get("/{document_id}", response_model=dict)
async def get_document(
document_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get a specific document"""
document = await db.documents.find_one({"_id": ObjectId(document_id)})
if not document:
raise HTTPException(status_code=404, detail="Document not found")
doc = DocumentInDB(**document)
# Check if user has access to this document's index
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this document")
return {
"id": str(doc.id),
"filename": doc.filename,
"original_filename": doc.original_filename,
"file_size": doc.file_size,
"content_type": doc.content_type,
"index_id": doc.index_id,
"processing_status": doc.processing_status,
"created_at": doc.created_at,
"updated_at": doc.updated_at,
"metadata": doc.metadata
}
@router.delete("/{document_id}")
async def delete_document(
document_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Delete a document"""
document = await db.documents.find_one({"_id": ObjectId(document_id)})
if not document:
raise HTTPException(status_code=404, detail="Document not found")
doc = DocumentInDB(**document)
# Only admins can delete documents
if current_user.role.value != "admin":
raise HTTPException(status_code=403, detail="Only administrators can delete documents")
# Delete the document
success = await document_processor.delete_document(document_id, db)
if success:
return {"message": "Document deleted successfully"}
else:
raise HTTPException(status_code=500, detail="Failed to delete document")
@router.get("/{document_id}/summary", response_model=ContractSummaryResponse)
async def get_document_summary(
document_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Get structured contract summary of a document"""
document = await db.documents.find_one({"_id": ObjectId(document_id)})
if not document:
raise HTTPException(status_code=404, detail="Document not found")
doc = DocumentInDB(**document)
# Check if user has access to this document's index
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this document")
# Check if document has been processed
if doc.processing_status != "completed":
raise HTTPException(status_code=400, detail="Document not yet processed")
# Get summary status
summary_status = getattr(doc, 'summary_status', 'pending')
# If summary is completed, return structured summary
if summary_status == "completed" and hasattr(doc, 'contract_summary') and doc.contract_summary:
from ...models.contract_summary import ContractSummary
contract_summary = ContractSummary(**doc.contract_summary)
return ContractSummaryResponse(
success=True,
summary=contract_summary,
status=summary_status,
filename=doc.original_filename,
created_at=getattr(doc, 'summary_created_at', doc.created_at).isoformat() if getattr(doc, 'summary_created_at', doc.created_at) else None,
updated_at=doc.updated_at.isoformat() if doc.updated_at else None
)
# If summary is processing, return status
elif summary_status == "processing":
return ContractSummaryResponse(
success=False,
status=summary_status,
filename=doc.original_filename,
error="Contract summary is currently being processed. Please check back in a few moments."
)
# If summary failed, return error
elif summary_status == "failed":
error_msg = "Contract summary extraction failed."
if hasattr(doc, 'metadata') and doc.metadata and 'summary_error' in doc.metadata:
error_msg += f" Error: {doc.metadata['summary_error']}"
return ContractSummaryResponse(
success=False,
status=summary_status,
filename=doc.original_filename,
error=error_msg
)
# If summary is pending, return pending status
else:
return ContractSummaryResponse(
success=False,
status="pending",
filename=doc.original_filename,
error="Contract summary processing has not started yet. Please try again later."
)
@router.post("/{document_id}/summary/reprocess")
async def reprocess_document_summary(
document_id: str,
current_user: UserInDB = Depends(get_current_active_user),
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Reprocess contract summary for a document"""
document = await db.documents.find_one({"_id": ObjectId(document_id)})
if not document:
raise HTTPException(status_code=404, detail="Document not found")
doc = DocumentInDB(**document)
# Check if user has access to this document's index
if current_user.role.value != "admin" and doc.index_id not in current_user.index_access:
raise HTTPException(status_code=403, detail="Access denied to this document")
# Check if document has been processed
if doc.processing_status != "completed" or not doc.parsed_text:
raise HTTPException(status_code=400, detail="Document not yet processed or text not available")
# Reset summary status and trigger reprocessing
from ...services.llama_processor import llama_processor
try:
# Reset summary status
await db.documents.update_one(
{"_id": ObjectId(document_id)},
{"$set": {
"summary_status": "pending",
"contract_summary": None,
"summary_created_at": None,
"updated_at": datetime.utcnow()
}}
)
# Trigger summary extraction asynchronously
asyncio.create_task(llama_processor._extract_contract_summary(
document_id, doc.parsed_text, doc.original_filename, db
))
return {
"message": "Contract summary reprocessing started",
"document_id": document_id,
"filename": doc.original_filename,
"status": "processing"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error starting summary reprocessing: {str(e)}")