semblance-dev/backend/app/services/customer_data_service.py
2025-12-19 19:26:16 +00:00

195 lines
No EOL
7.5 KiB
Python
Executable file

"""
Customer Data Service for parsing uploaded files using LlamaParse.
Handles file upload, parsing to markdown, and cleanup operations.
"""
import os
import uuid
import shutil
import tempfile
from typing import List, Optional
from werkzeug.datastructures import FileStorage
try:
from llama_cloud_services import LlamaParse
except ImportError:
LlamaParse = None
class CustomerDataServiceError(Exception):
"""Exception raised for errors in customer data processing."""
pass
class CustomerDataService:
"""Service for handling customer data upload and parsing."""
def __init__(self, api_key: str = "llx-HhMSCmLjYAuK7FcxJ0yBxAP4t4JY0tKx7XLyZGHJJWiUFZuX"):
"""Initialize the service with LlamaParse API key."""
if not LlamaParse:
raise CustomerDataServiceError("llama-cloud-services package not installed")
self.api_key = api_key
# Resolve to absolute path to avoid working directory issues
self.base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "persona_data"))
# Ensure base directory exists
os.makedirs(self.base_dir, exist_ok=True)
# Initialize LlamaParse in premium mode
self.parser = LlamaParse(
api_key=self.api_key,
result_type="markdown",
premium_mode=True, # Enable premium mode for best accuracy
parsing_instruction="Extract all customer data including demographics, behaviors, preferences, purchase history, and any other relevant customer information. Preserve data structure and relationships where possible.",
num_workers=4,
verbose=True,
language="en"
)
def generate_session_id(self) -> str:
"""Generate a unique session ID for this upload session."""
return str(uuid.uuid4())
async def upload_and_parse_files(self, files: List[FileStorage]) -> str:
"""
Upload files and parse them using LlamaParse.
Args:
files: List of uploaded files
Returns:
session_id: Unique identifier for this upload session
Raises:
CustomerDataServiceError: If upload or parsing fails
"""
if not files or len(files) == 0:
raise CustomerDataServiceError("No files provided")
session_id = self.generate_session_id()
session_dir = os.path.join(self.base_dir, session_id)
try:
# Create session directory
os.makedirs(session_dir, exist_ok=True)
# Save uploaded files
uploaded_files = []
for file in files:
if file.filename and file.filename.strip():
# Secure filename
filename = f"{session_id}_{file.filename}"
file_path = os.path.join(session_dir, filename)
try:
# Save file and verify it exists (Quart async version)
await file.save(file_path)
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
uploaded_files.append(file_path)
print(f"✅ Successfully saved file: {file_path} ({os.path.getsize(file_path)} bytes)")
else:
raise CustomerDataServiceError(f"Failed to save file: {file.filename}")
except CustomerDataServiceError:
raise # Re-raise our own errors
except Exception as e:
raise CustomerDataServiceError(f"Failed to save file {file.filename}: {str(e)}")
if not uploaded_files:
raise CustomerDataServiceError("No valid files uploaded")
# Parse files using LlamaParse
print(f"🔄 Starting LlamaParse for {len(uploaded_files)} files...")
for file_path in uploaded_files:
print(f"📄 File to parse: {file_path} (exists: {os.path.exists(file_path)})")
try:
parsed_documents = self.parser.load_data(uploaded_files)
print(f"✅ LlamaParse completed successfully. Generated {len(parsed_documents)} documents.")
except Exception as parse_error:
print(f"❌ LlamaParse failed: {str(parse_error)}")
# Check which files still exist before the error
for file_path in uploaded_files:
exists = os.path.exists(file_path)
size = os.path.getsize(file_path) if exists else 0
print(f"📄 File status: {file_path} - exists: {exists}, size: {size}")
raise CustomerDataServiceError(f"LlamaParse failed: {str(parse_error)}")
# Save parsed markdown files
for i, document in enumerate(parsed_documents):
markdown_filename = f"{session_id}_parsed_{i+1}.md"
markdown_path = os.path.join(session_dir, markdown_filename)
with open(markdown_path, 'w', encoding='utf-8') as f:
f.write(document.text)
return session_id
except Exception as e:
# Clean up on error
if os.path.exists(session_dir):
shutil.rmtree(session_dir, ignore_errors=True)
raise CustomerDataServiceError(f"Failed to process files: {str(e)}")
def get_parsed_markdown_content(self, session_id: str) -> Optional[str]:
"""
Get all parsed markdown content for a session.
Args:
session_id: The session identifier
Returns:
Combined markdown content from all parsed files, or None if not found
"""
if not session_id:
return None
session_dir = os.path.join(self.base_dir, session_id)
if not os.path.exists(session_dir):
return None
combined_content = []
# Find all markdown files for this session
for filename in os.listdir(session_dir):
if filename.endswith('.md') and 'parsed' in filename:
file_path = os.path.join(session_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
combined_content.append(f"## {filename}\n\n{content}")
except Exception as e:
continue # Skip files that can't be read
if combined_content:
return "\n\n---\n\n".join(combined_content)
return None
def cleanup_session(self, session_id: str) -> bool:
"""
Clean up all files for a session.
Args:
session_id: The session identifier
Returns:
True if cleanup was successful, False otherwise
"""
if not session_id:
return False
session_dir = os.path.join(self.base_dir, session_id)
if os.path.exists(session_dir):
try:
shutil.rmtree(session_dir)
return True
except Exception:
return False
return True # Nothing to clean up
# Global service instance
customer_data_service = CustomerDataService()