""" Customer Data Service for parsing uploaded files using LlamaParse. Handles file upload, parsing to markdown, and cleanup operations. """ import os import uuid import shutil import tempfile from typing import List, Optional from werkzeug.datastructures import FileStorage try: from llama_cloud_services import LlamaParse except ImportError: LlamaParse = None class CustomerDataServiceError(Exception): """Exception raised for errors in customer data processing.""" pass class CustomerDataService: """Service for handling customer data upload and parsing.""" def __init__(self, api_key: str = "llx-HhMSCmLjYAuK7FcxJ0yBxAP4t4JY0tKx7XLyZGHJJWiUFZuX"): """Initialize the service with LlamaParse API key.""" if not LlamaParse: raise CustomerDataServiceError("llama-cloud-services package not installed") self.api_key = api_key # Resolve to absolute path to avoid working directory issues self.base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "persona_data")) # Ensure base directory exists os.makedirs(self.base_dir, exist_ok=True) # Initialize LlamaParse in premium mode self.parser = LlamaParse( api_key=self.api_key, result_type="markdown", premium_mode=True, # Enable premium mode for best accuracy parsing_instruction="Extract all customer data including demographics, behaviors, preferences, purchase history, and any other relevant customer information. Preserve data structure and relationships where possible.", num_workers=4, verbose=True, language="en" ) def generate_session_id(self) -> str: """Generate a unique session ID for this upload session.""" return str(uuid.uuid4()) async def upload_and_parse_files(self, files: List[FileStorage]) -> str: """ Upload files and parse them using LlamaParse. Args: files: List of uploaded files Returns: session_id: Unique identifier for this upload session Raises: CustomerDataServiceError: If upload or parsing fails """ if not files or len(files) == 0: raise CustomerDataServiceError("No files provided") session_id = self.generate_session_id() session_dir = os.path.join(self.base_dir, session_id) try: # Create session directory os.makedirs(session_dir, exist_ok=True) # Save uploaded files uploaded_files = [] for file in files: if file.filename and file.filename.strip(): # Secure filename filename = f"{session_id}_{file.filename}" file_path = os.path.join(session_dir, filename) try: # Save file and verify it exists (Quart async version) await file.save(file_path) if os.path.exists(file_path) and os.path.getsize(file_path) > 0: uploaded_files.append(file_path) print(f"✅ Successfully saved file: {file_path} ({os.path.getsize(file_path)} bytes)") else: raise CustomerDataServiceError(f"Failed to save file: {file.filename}") except CustomerDataServiceError: raise # Re-raise our own errors except Exception as e: raise CustomerDataServiceError(f"Failed to save file {file.filename}: {str(e)}") if not uploaded_files: raise CustomerDataServiceError("No valid files uploaded") # Parse files using LlamaParse print(f"🔄 Starting LlamaParse for {len(uploaded_files)} files...") for file_path in uploaded_files: print(f"📄 File to parse: {file_path} (exists: {os.path.exists(file_path)})") try: parsed_documents = self.parser.load_data(uploaded_files) print(f"✅ LlamaParse completed successfully. Generated {len(parsed_documents)} documents.") except Exception as parse_error: print(f"❌ LlamaParse failed: {str(parse_error)}") # Check which files still exist before the error for file_path in uploaded_files: exists = os.path.exists(file_path) size = os.path.getsize(file_path) if exists else 0 print(f"📄 File status: {file_path} - exists: {exists}, size: {size}") raise CustomerDataServiceError(f"LlamaParse failed: {str(parse_error)}") # Save parsed markdown files for i, document in enumerate(parsed_documents): markdown_filename = f"{session_id}_parsed_{i+1}.md" markdown_path = os.path.join(session_dir, markdown_filename) with open(markdown_path, 'w', encoding='utf-8') as f: f.write(document.text) return session_id except Exception as e: # Clean up on error if os.path.exists(session_dir): shutil.rmtree(session_dir, ignore_errors=True) raise CustomerDataServiceError(f"Failed to process files: {str(e)}") def get_parsed_markdown_content(self, session_id: str) -> Optional[str]: """ Get all parsed markdown content for a session. Args: session_id: The session identifier Returns: Combined markdown content from all parsed files, or None if not found """ if not session_id: return None session_dir = os.path.join(self.base_dir, session_id) if not os.path.exists(session_dir): return None combined_content = [] # Find all markdown files for this session for filename in os.listdir(session_dir): if filename.endswith('.md') and 'parsed' in filename: file_path = os.path.join(session_dir, filename) try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read().strip() if content: combined_content.append(f"## {filename}\n\n{content}") except Exception as e: continue # Skip files that can't be read if combined_content: return "\n\n---\n\n".join(combined_content) return None def cleanup_session(self, session_id: str) -> bool: """ Clean up all files for a session. Args: session_id: The session identifier Returns: True if cleanup was successful, False otherwise """ if not session_id: return False session_dir = os.path.join(self.base_dir, session_id) if os.path.exists(session_dir): try: shutil.rmtree(session_dir) return True except Exception: return False return True # Nothing to clean up # Global service instance customer_data_service = CustomerDataService()