from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form from fastapi.responses import FileResponse from motor.motor_asyncio import AsyncIOMotorDatabase from typing import List from bson import ObjectId from datetime import datetime import os import asyncio from urllib.parse import unquote from ...config.database import get_database from ...core.auth import get_current_active_user, require_index_access from ...models.user import UserInDB from ...models.document import Document, DocumentInDB from ...models.contract_summary import ContractSummaryResponse from ...services.document_processor import document_processor from ...services.rag_service import rag_service from ...services.llama_processor import llama_processor router = APIRouter() @router.post("/upload", response_model=dict) async def upload_document( file: UploadFile = File(...), index_id: str = Form(...), current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Upload a document to an index""" # Check if user has access to this index if current_user.role.value != "admin" and index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this index") # Process the upload using LlamaProcessor (includes contract summary processing) document = await llama_processor.process_single_file(file, index_id, current_user, db) return { "message": "Document uploaded successfully", "document_id": str(document.id), "filename": document.filename, "processing_status": "pending" } # Background processing is now handled by LlamaProcessor automatically @router.get("/index/{index_id}", response_model=List[dict]) async def get_documents_by_index( index_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get all documents for a specific index""" # Decode URL-encoded index_id decoded_index_id = unquote(index_id) # Check if user has access to this index if current_user.role.value != "admin" and decoded_index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this index") documents = await document_processor.get_documents_by_index(decoded_index_id, db) return [ { "id": str(doc.id), "filename": doc.filename, "original_filename": doc.original_filename, "file_size": doc.file_size, "content_type": doc.content_type, "processing_status": doc.processing_status, "embedding_status": doc.embedding_status, "summary_status": getattr(doc, 'summary_status', 'pending'), "created_at": doc.created_at, "updated_at": doc.updated_at, "metadata": doc.metadata, "chunk_count": doc.chunk_count } for doc in documents ] @router.get("/{document_id}/download") async def download_document( document_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Download a document""" document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: raise HTTPException(status_code=404, detail="Document not found") doc = DocumentInDB(**document) # Check if user has access to this document's index if current_user.role.value != "admin" and doc.index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this document") # Check if file exists if not os.path.exists(doc.file_path): raise HTTPException(status_code=404, detail="File not found on disk") return FileResponse( path=doc.file_path, filename=doc.original_filename, media_type=doc.content_type ) @router.get("/{document_id}/text") async def get_document_text( document_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get the parsed text content of a document""" print(f"🔍 DEBUG - Getting document text for ID: {document_id}") document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: print(f"❌ DEBUG - Document not found: {document_id}") raise HTTPException(status_code=404, detail="Document not found") print(f"✅ DEBUG - Document found: {document.get('original_filename', 'Unknown')}") doc = DocumentInDB(**document) # Check if user has access to this document's index if current_user.role.value != "admin" and doc.index_id not in current_user.index_access: print(f"❌ DEBUG - Access denied to index: {doc.index_id}") raise HTTPException(status_code=403, detail="Access denied to this document") # Check if document has been processed print(f"📊 DEBUG - Processing status: {doc.processing_status}") if doc.processing_status != "completed": raise HTTPException(status_code=400, detail="Document not yet processed") # Check if parsed text exists parsed_text = getattr(doc, 'parsed_text', None) print(f"📝 DEBUG - Parsed text length: {len(parsed_text) if parsed_text else 0}") if not parsed_text: raise HTTPException(status_code=404, detail="Document text not available") print(f"✅ DEBUG - Returning document text successfully") return { "success": True, "document_id": str(doc.id), "filename": doc.original_filename, "text": parsed_text, "text_length": len(parsed_text), "processing_status": doc.processing_status, "created_at": doc.created_at, "updated_at": doc.updated_at } @router.get("/{document_id}", response_model=dict) async def get_document( document_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get a specific document""" document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: raise HTTPException(status_code=404, detail="Document not found") doc = DocumentInDB(**document) # Check if user has access to this document's index if current_user.role.value != "admin" and doc.index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this document") return { "id": str(doc.id), "filename": doc.filename, "original_filename": doc.original_filename, "file_size": doc.file_size, "content_type": doc.content_type, "index_id": doc.index_id, "processing_status": doc.processing_status, "created_at": doc.created_at, "updated_at": doc.updated_at, "metadata": doc.metadata } @router.delete("/{document_id}") async def delete_document( document_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Delete a document""" document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: raise HTTPException(status_code=404, detail="Document not found") doc = DocumentInDB(**document) # Only admins can delete documents if current_user.role.value != "admin": raise HTTPException(status_code=403, detail="Only administrators can delete documents") # Delete the document success = await document_processor.delete_document(document_id, db) if success: return {"message": "Document deleted successfully"} else: raise HTTPException(status_code=500, detail="Failed to delete document") @router.get("/{document_id}/summary", response_model=ContractSummaryResponse) async def get_document_summary( document_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Get structured contract summary of a document""" document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: raise HTTPException(status_code=404, detail="Document not found") doc = DocumentInDB(**document) # Check if user has access to this document's index if current_user.role.value != "admin" and doc.index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this document") # Check if document has been processed if doc.processing_status != "completed": raise HTTPException(status_code=400, detail="Document not yet processed") # Get summary status summary_status = getattr(doc, 'summary_status', 'pending') # If summary is completed, return structured summary if summary_status == "completed" and hasattr(doc, 'contract_summary') and doc.contract_summary: from ...models.contract_summary import ContractSummary contract_summary = ContractSummary(**doc.contract_summary) return ContractSummaryResponse( success=True, summary=contract_summary, status=summary_status, filename=doc.original_filename, created_at=getattr(doc, 'summary_created_at', doc.created_at).isoformat() if getattr(doc, 'summary_created_at', doc.created_at) else None, updated_at=doc.updated_at.isoformat() if doc.updated_at else None ) # If summary is processing, return status elif summary_status == "processing": return ContractSummaryResponse( success=False, status=summary_status, filename=doc.original_filename, error="Contract summary is currently being processed. Please check back in a few moments." ) # If summary failed, return error elif summary_status == "failed": error_msg = "Contract summary extraction failed." if hasattr(doc, 'metadata') and doc.metadata and 'summary_error' in doc.metadata: error_msg += f" Error: {doc.metadata['summary_error']}" return ContractSummaryResponse( success=False, status=summary_status, filename=doc.original_filename, error=error_msg ) # If summary is pending, return pending status else: return ContractSummaryResponse( success=False, status="pending", filename=doc.original_filename, error="Contract summary processing has not started yet. Please try again later." ) @router.post("/{document_id}/summary/reprocess") async def reprocess_document_summary( document_id: str, current_user: UserInDB = Depends(get_current_active_user), db: AsyncIOMotorDatabase = Depends(get_database) ): """Reprocess contract summary for a document""" document = await db.documents.find_one({"_id": ObjectId(document_id)}) if not document: raise HTTPException(status_code=404, detail="Document not found") doc = DocumentInDB(**document) # Check if user has access to this document's index if current_user.role.value != "admin" and doc.index_id not in current_user.index_access: raise HTTPException(status_code=403, detail="Access denied to this document") # Check if document has been processed if doc.processing_status != "completed" or not doc.parsed_text: raise HTTPException(status_code=400, detail="Document not yet processed or text not available") # Reset summary status and trigger reprocessing from ...services.llama_processor import llama_processor try: # Reset summary status await db.documents.update_one( {"_id": ObjectId(document_id)}, {"$set": { "summary_status": "pending", "contract_summary": None, "summary_created_at": None, "updated_at": datetime.utcnow() }} ) # Trigger summary extraction asynchronously asyncio.create_task(llama_processor._extract_contract_summary( document_id, doc.parsed_text, doc.original_filename, db )) return { "message": "Contract summary reprocessing started", "document_id": document_id, "filename": doc.original_filename, "status": "processing" } except Exception as e: raise HTTPException(status_code=500, detail=f"Error starting summary reprocessing: {str(e)}")