195 lines
No EOL
7.5 KiB
Python
Executable file
195 lines
No EOL
7.5 KiB
Python
Executable file
"""
|
|
Customer Data Service for parsing uploaded files using LlamaParse.
|
|
Handles file upload, parsing to markdown, and cleanup operations.
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
import shutil
|
|
import tempfile
|
|
from typing import List, Optional
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
try:
|
|
from llama_cloud_services import LlamaParse
|
|
except ImportError:
|
|
LlamaParse = None
|
|
|
|
|
|
class CustomerDataServiceError(Exception):
|
|
"""Exception raised for errors in customer data processing."""
|
|
pass
|
|
|
|
|
|
class CustomerDataService:
|
|
"""Service for handling customer data upload and parsing."""
|
|
|
|
def __init__(self, api_key: str = "llx-HhMSCmLjYAuK7FcxJ0yBxAP4t4JY0tKx7XLyZGHJJWiUFZuX"):
|
|
"""Initialize the service with LlamaParse API key."""
|
|
if not LlamaParse:
|
|
raise CustomerDataServiceError("llama-cloud-services package not installed")
|
|
|
|
self.api_key = api_key
|
|
# Resolve to absolute path to avoid working directory issues
|
|
self.base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "persona_data"))
|
|
|
|
# Ensure base directory exists
|
|
os.makedirs(self.base_dir, exist_ok=True)
|
|
|
|
# Initialize LlamaParse in premium mode
|
|
self.parser = LlamaParse(
|
|
api_key=self.api_key,
|
|
result_type="markdown",
|
|
premium_mode=True, # Enable premium mode for best accuracy
|
|
parsing_instruction="Extract all customer data including demographics, behaviors, preferences, purchase history, and any other relevant customer information. Preserve data structure and relationships where possible.",
|
|
num_workers=4,
|
|
verbose=True,
|
|
language="en"
|
|
)
|
|
|
|
def generate_session_id(self) -> str:
|
|
"""Generate a unique session ID for this upload session."""
|
|
return str(uuid.uuid4())
|
|
|
|
async def upload_and_parse_files(self, files: List[FileStorage]) -> str:
|
|
"""
|
|
Upload files and parse them using LlamaParse.
|
|
|
|
Args:
|
|
files: List of uploaded files
|
|
|
|
Returns:
|
|
session_id: Unique identifier for this upload session
|
|
|
|
Raises:
|
|
CustomerDataServiceError: If upload or parsing fails
|
|
"""
|
|
if not files or len(files) == 0:
|
|
raise CustomerDataServiceError("No files provided")
|
|
|
|
session_id = self.generate_session_id()
|
|
session_dir = os.path.join(self.base_dir, session_id)
|
|
|
|
try:
|
|
# Create session directory
|
|
os.makedirs(session_dir, exist_ok=True)
|
|
|
|
# Save uploaded files
|
|
uploaded_files = []
|
|
for file in files:
|
|
if file.filename and file.filename.strip():
|
|
# Secure filename
|
|
filename = f"{session_id}_{file.filename}"
|
|
file_path = os.path.join(session_dir, filename)
|
|
|
|
try:
|
|
# Save file and verify it exists (Quart async version)
|
|
await file.save(file_path)
|
|
|
|
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
|
|
uploaded_files.append(file_path)
|
|
print(f"✅ Successfully saved file: {file_path} ({os.path.getsize(file_path)} bytes)")
|
|
else:
|
|
raise CustomerDataServiceError(f"Failed to save file: {file.filename}")
|
|
except CustomerDataServiceError:
|
|
raise # Re-raise our own errors
|
|
except Exception as e:
|
|
raise CustomerDataServiceError(f"Failed to save file {file.filename}: {str(e)}")
|
|
|
|
if not uploaded_files:
|
|
raise CustomerDataServiceError("No valid files uploaded")
|
|
|
|
# Parse files using LlamaParse
|
|
print(f"🔄 Starting LlamaParse for {len(uploaded_files)} files...")
|
|
for file_path in uploaded_files:
|
|
print(f"📄 File to parse: {file_path} (exists: {os.path.exists(file_path)})")
|
|
|
|
try:
|
|
parsed_documents = self.parser.load_data(uploaded_files)
|
|
print(f"✅ LlamaParse completed successfully. Generated {len(parsed_documents)} documents.")
|
|
except Exception as parse_error:
|
|
print(f"❌ LlamaParse failed: {str(parse_error)}")
|
|
# Check which files still exist before the error
|
|
for file_path in uploaded_files:
|
|
exists = os.path.exists(file_path)
|
|
size = os.path.getsize(file_path) if exists else 0
|
|
print(f"📄 File status: {file_path} - exists: {exists}, size: {size}")
|
|
raise CustomerDataServiceError(f"LlamaParse failed: {str(parse_error)}")
|
|
|
|
# Save parsed markdown files
|
|
for i, document in enumerate(parsed_documents):
|
|
markdown_filename = f"{session_id}_parsed_{i+1}.md"
|
|
markdown_path = os.path.join(session_dir, markdown_filename)
|
|
with open(markdown_path, 'w', encoding='utf-8') as f:
|
|
f.write(document.text)
|
|
|
|
return session_id
|
|
|
|
except Exception as e:
|
|
# Clean up on error
|
|
if os.path.exists(session_dir):
|
|
shutil.rmtree(session_dir, ignore_errors=True)
|
|
raise CustomerDataServiceError(f"Failed to process files: {str(e)}")
|
|
|
|
def get_parsed_markdown_content(self, session_id: str) -> Optional[str]:
|
|
"""
|
|
Get all parsed markdown content for a session.
|
|
|
|
Args:
|
|
session_id: The session identifier
|
|
|
|
Returns:
|
|
Combined markdown content from all parsed files, or None if not found
|
|
"""
|
|
if not session_id:
|
|
return None
|
|
|
|
session_dir = os.path.join(self.base_dir, session_id)
|
|
if not os.path.exists(session_dir):
|
|
return None
|
|
|
|
combined_content = []
|
|
|
|
# Find all markdown files for this session
|
|
for filename in os.listdir(session_dir):
|
|
if filename.endswith('.md') and 'parsed' in filename:
|
|
file_path = os.path.join(session_dir, filename)
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read().strip()
|
|
if content:
|
|
combined_content.append(f"## {filename}\n\n{content}")
|
|
except Exception as e:
|
|
continue # Skip files that can't be read
|
|
|
|
if combined_content:
|
|
return "\n\n---\n\n".join(combined_content)
|
|
|
|
return None
|
|
|
|
def cleanup_session(self, session_id: str) -> bool:
|
|
"""
|
|
Clean up all files for a session.
|
|
|
|
Args:
|
|
session_id: The session identifier
|
|
|
|
Returns:
|
|
True if cleanup was successful, False otherwise
|
|
"""
|
|
if not session_id:
|
|
return False
|
|
|
|
session_dir = os.path.join(self.base_dir, session_id)
|
|
if os.path.exists(session_dir):
|
|
try:
|
|
shutil.rmtree(session_dir)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
return True # Nothing to clean up
|
|
|
|
|
|
# Global service instance
|
|
customer_data_service = CustomerDataService() |