#!/usr/bin/env python3 """ Oliver Metadata Tool - Web Interface Universal metadata creation and management tool for files. Flask-based web app for local or server deployment. Supports multiple metadata sources: Excel, AI, manual entry, and file import. """ from flask import Flask, render_template, request, jsonify, send_file, session, redirect, url_for from werkzeug.utils import secure_filename # noqa: F401 - kept as fallback from werkzeug.middleware.proxy_fix import ProxyFix from pathlib import Path import os import tempfile import threading import webbrowser from time import sleep import shutil import unicodedata import secrets import zipfile from datetime import datetime from src.file_detector import FileDetector, FileType from src.excel_metadata_lookup import ExcelMetadataLookup from src.config import Config from src.metadata_analyzer import MetadataAnalyzer from src.metadata_importer import MetadataImporter from src.template_manager import TemplateManager from src.auth import login_required, authenticate_user, create_user_session, destroy_user_session, get_current_user, is_sso_enabled, get_sso_instance, cleanup_sessions from src.database import Database def safe_filename(filename): """Sanitize filename while preserving Unicode characters (Chinese, Japanese, Korean).""" # Normalize unicode filename = unicodedata.normalize('NFC', filename) # Remove path separators and null bytes filename = filename.replace('/', '_').replace('\\', '_').replace('\x00', '') # Remove leading/trailing dots and spaces filename = filename.strip('. ') # If empty, use default if not filename: filename = 'unnamed_file' return filename from src.extractors.pdf_extractor import PDFExtractor from src.extractors.image_extractor import ImageExtractor from src.extractors.office_extractor import OfficeExtractor from src.extractors.video_extractor import VideoExtractor from src.updaters.pdf_updater import PDFUpdater from src.updaters.image_updater import ImageUpdater from src.updaters.office_updater import OfficeUpdater from src.updaters.video_updater import VideoUpdater app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size # Reverse proxy configuration # ProxyFix handles X-Forwarded-* headers from Apache/nginx reverse proxy app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # URL prefix for reverse proxy redirects URL_PREFIX = os.getenv('URL_PREFIX', '/solventum-image-metadata') # Docker mode detection DOCKER_MODE = os.getenv('DOCKER_MODE', 'false').lower() == 'true' # Upload folder configuration if DOCKER_MODE: # Use persistent directory in Docker UPLOAD_FOLDER = Path('/app/uploads') UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True) app.config['UPLOAD_FOLDER'] = str(UPLOAD_FOLDER) else: # Use temp directory for local development app.config['UPLOAD_FOLDER'] = tempfile.mkdtemp() # Session configuration app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', secrets.token_hex(32)) # Cookie settings for reverse proxy app.config['SESSION_COOKIE_PATH'] = URL_PREFIX app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Set Secure flag for HTTPS - detect if behind HTTPS proxy # For production sites behind HTTPS proxy, always use Secure flag app.config['SESSION_COOKIE_SECURE'] = os.getenv('SESSION_COOKIE_SECURE', 'true').lower() == 'true' # Session lifetime (7 days) app.config['PERMANENT_SESSION_LIFETIME'] = int(os.getenv('SESSION_LIFETIME_DAYS', '7')) * 86400 # Excel file path for metadata lookup EXCEL_PATH = Path(__file__).parent / "Celum ID to Adobe Asset Path Mapping Spreadsheet (1).xlsx" # Initialize metadata lookup from Excel metadata_lookup = None # Initialize AI analyzer (lazy initialization) ai_analyzer = None # Initialize extractors and updaters extractors = { FileType.PDF: PDFExtractor(), FileType.IMAGE: ImageExtractor(), FileType.OFFICE_DOC: OfficeExtractor(), FileType.OFFICE_SHEET: OfficeExtractor(), FileType.OFFICE_PRESENTATION: OfficeExtractor(), FileType.VIDEO: VideoExtractor() } updaters = { FileType.PDF: PDFUpdater(), FileType.IMAGE: ImageUpdater(), FileType.OFFICE_DOC: OfficeUpdater(), FileType.OFFICE_SHEET: OfficeUpdater(), FileType.OFFICE_PRESENTATION: OfficeUpdater(), FileType.VIDEO: VideoUpdater() } # Store file processing sessions sessions = {} # Store imported metadata from external files imported_metadata = {} def cleanup_session_files(session_id: str): """ Clean up files associated with a session. Removes uploaded files from disk to free up space. """ if session_id not in sessions: return session_data = sessions[session_id] files = session_data.get('files', []) for file_info in files: filepath = file_info.get('filepath') if filepath and os.path.exists(filepath): try: os.remove(filepath) app.logger.info(f"Cleaned up file: {filepath}") except Exception as e: app.logger.warning(f"Failed to cleanup file {filepath}: {e}") # Remove session from memory sessions.pop(session_id, None) def cleanup_old_files(max_age_hours: int = 24): """ Clean up files older than max_age_hours from upload folder. Runs automatically to prevent disk space issues. """ try: upload_folder = Path(app.config['UPLOAD_FOLDER']) now = datetime.now().timestamp() max_age_seconds = max_age_hours * 3600 cleaned = 0 for filepath in upload_folder.glob('*'): if filepath.is_file(): file_age = now - filepath.stat().st_mtime if file_age > max_age_seconds: try: filepath.unlink() cleaned += 1 app.logger.info(f"Cleaned up old file: {filepath.name}") except Exception as e: app.logger.warning(f"Failed to cleanup {filepath.name}: {e}") if cleaned > 0: app.logger.info(f"Cleaned up {cleaned} old file(s)") except Exception as e: app.logger.error(f"Error in cleanup_old_files: {e}") def get_metadata_lookup(): """Get or create metadata lookup instance.""" global metadata_lookup if metadata_lookup is None: metadata_lookup = ExcelMetadataLookup(str(EXCEL_PATH)) return metadata_lookup def get_ai_analyzer(): """Get or create AI analyzer instance.""" global ai_analyzer if ai_analyzer is None: if Config.OPENAI_API_KEY: try: ai_analyzer = MetadataAnalyzer() logger = __import__('logging').getLogger(__name__) logger.info("AI analyzer initialized successfully") except Exception as e: logger = __import__('logging').getLogger(__name__) logger.error(f"Failed to initialize AI analyzer: {e}") return None else: return None return ai_analyzer def render_login(error=None, info=None): """Helper to render login page with Azure SSO parameters.""" sso = get_sso_instance() return render_template('login.html', error=error, info=info, sso_enabled=is_sso_enabled(), azure_client_id=sso.client_id if sso.enabled else '', azure_tenant_id=sso.tenant_id if sso.enabled else '', azure_redirect_uri=sso.redirect_uri if sso.enabled else '') @app.route('/login', methods=['GET', 'POST']) def login(): """Login page and handler.""" if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') if not username or not password: return render_login(error='Please enter both username and password') # Authenticate user result = authenticate_user(username, password) if result['success']: user = result['user'] # Create session session_id = create_user_session( user=user, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) if session_id: # Set Flask session session.permanent = True session['user_id'] = user['id'] session['username'] = user['username'] session['session_id'] = session_id # Redirect to home page with reverse proxy prefix return redirect(f'{URL_PREFIX}/') else: return render_login(error='Failed to create session') else: return render_login(error=result.get('error')) # GET request - show login form return render_login() @app.route('/logout') def logout(): """Logout user and cleanup session files.""" user_id = session.get('user_id') session_id = session.get('session_id') # Clean up all file processing sessions for this user # (In-memory sessions don't have user_id, so we clean all) sessions_to_cleanup = list(sessions.keys()) for sid in sessions_to_cleanup: cleanup_session_files(sid) if session_id: destroy_user_session(session_id, user_id) session.clear() return redirect(f'{URL_PREFIX}/login') @app.route('/login/microsoft') def login_microsoft(): """Redirect to Microsoft SSO with PKCE.""" sso = get_sso_instance() if not sso.enabled: return render_login(error='Microsoft SSO not configured') # Generate state for CSRF protection state = secrets.token_urlsafe(16) # Get auth flow (includes PKCE code_verifier) auth_flow = sso.get_auth_url(state=state) if auth_flow and 'auth_uri' in auth_flow: # Store the entire flow in session (needed for PKCE verification) session['oauth_flow'] = auth_flow return redirect(auth_flow['auth_uri']) else: return render_login(error='Failed to generate SSO URL') @app.route('/auth/callback') def auth_callback(): """Handle Microsoft SSO callback with PKCE.""" import logging logger = logging.getLogger(__name__) logger.info(f"Auth callback received. Args: {dict(request.args)}") sso = get_sso_instance() # Get stored auth flow (contains PKCE code_verifier) auth_flow = session.get('oauth_flow') logger.info(f"Auth flow from session: {'Found' if auth_flow else 'NOT FOUND'}") if not auth_flow: logger.error("No oauth_flow in session - session may have been lost") return render_login(error='Session expired, please try again') # Check for error in response if request.args.get('error'): error_desc = request.args.get('error_description', 'Unknown error') logger.error(f"SSO error from Azure: {error_desc}") return render_login(error=f'SSO failed: {error_desc}') # Exchange code for token using PKCE flow logger.info("Exchanging code for token...") result = sso.acquire_token(dict(request.args), auth_flow) logger.info(f"Token result: {'success' if result and 'access_token' in result else 'FAILED'}") if result and 'error' in result: logger.error(f"Token error: {result.get('error_description', result.get('error'))}") if result and 'access_token' in result: # Get user info from Microsoft Graph user_info = sso.get_user_info(result['access_token']) if user_info: # Create or update user user = sso.create_or_update_user(user_info) if user: # Create session session_id = create_user_session( user=user, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) if session_id: # Set Flask session session.permanent = True session['user_id'] = user['id'] session['username'] = user['username'] session['session_id'] = session_id # Clear oauth flow from session session.pop('oauth_flow', None) return redirect(f'{URL_PREFIX}/') # Log error details if available error_msg = result.get('error_description', 'SSO authentication failed') if result else 'SSO authentication failed' return render_login(error=error_msg) @app.route('/') def index(): """Main page - also handles OAuth callback for SPA.""" import logging logger = logging.getLogger(__name__) sso = get_sso_instance() # Check if this is an OAuth callback (code in query params) - SPA flow if request.args.get('code'): logger.info("OAuth callback detected on root path - rendering SPA token exchange page") # Check for error in response if request.args.get('error'): error_desc = request.args.get('error_description', 'Unknown error') logger.error(f"SSO error: {error_desc}") return render_login(error=f'SSO failed: {error_desc}') # Render page with JavaScript to exchange token (SPA flow) return render_template('oauth_callback.html', client_id=sso.client_id, tenant_id=sso.tenant_id, redirect_uri=sso.redirect_uri, url_prefix=URL_PREFIX) # Normal page load - require login if 'user_id' not in session: return redirect(f'{URL_PREFIX}/login') # Check if session is still valid session_id = session.get('session_id') if session_id: from src.database import Database db = Database() db_session = db.get_session(session_id) if not db_session: session.clear() return redirect(f'{URL_PREFIX}/login') user = get_current_user() return render_template('index.html', username=user['username'] if user else None, docker_mode=DOCKER_MODE) @app.route('/auth/token', methods=['POST']) def auth_token(): """Receive access token from SPA JavaScript and create session.""" import logging logger = logging.getLogger(__name__) data = request.get_json() access_token = data.get('access_token') if not access_token: return jsonify({'error': 'No access token provided'}), 400 sso = get_sso_instance() # Get user info from Microsoft Graph user_info = sso.get_user_info(access_token) logger.info(f"User info from Graph: {user_info}") if user_info: # Create or update user user = sso.create_or_update_user(user_info) if user: # Create session session_id = create_user_session( user=user, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) if session_id: # Set Flask session session.permanent = True session['user_id'] = user['id'] session['username'] = user['username'] session['session_id'] = session_id logger.info(f"User {user['username']} logged in via SPA SSO") return jsonify({'success': True, 'redirect': f'{URL_PREFIX}/'}) return jsonify({'error': 'Failed to authenticate user'}), 401 @app.route('/upload', methods=['POST']) @login_required def upload_file(): """Handle multiple file uploads and metadata lookup from Excel.""" if 'files' not in request.files: return jsonify({'error': 'No files provided'}), 400 files = request.files.getlist('files') if not files or files[0].filename == '': return jsonify({'error': 'No files selected'}), 400 # Get metadata source choice (excel, manual, ai, import) metadata_source = request.form.get('metadata_source', 'excel') import_session_id = request.form.get('import_session_id', '') # For import source results = [] session_id = str(len(sessions) + 1) sessions[session_id] = { 'files': [], 'metadata_source': metadata_source, 'import_session_id': import_session_id } # Get metadata lookup (only if using Excel source) excel_session_id = request.form.get('excel_session_id') lookup = None if metadata_source == 'excel': if excel_session_id and excel_session_id in imported_metadata: # Use uploaded Excel file lookup = imported_metadata[excel_session_id] else: # Try default Excel file if available try: lookup = get_metadata_lookup() except: return jsonify({'error': 'Please upload an Excel file first using the Upload Excel File button'}), 400 # Get imported metadata (only if using import source) import_map = None if metadata_source == 'import' and import_session_id and import_session_id in imported_metadata: import_map = imported_metadata[import_session_id] importer = MetadataImporter() elif metadata_source == 'import': # Import source selected but no import session available return jsonify({'error': 'Please import a metadata file first using the Import button'}), 400 for file in files: try: # Save uploaded file filename = safe_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Detect file type file_type = FileDetector.detect_file_type(filepath) if file_type == FileType.UNSUPPORTED: results.append({ 'filename': filename, 'error': 'Unsupported file type' }) continue # Get extractor for this file type extractor = extractors.get(file_type) if not extractor: results.append({ 'filename': filename, 'error': 'No extractor available' }) continue # Read current metadata from file old_metadata = extractor.read_metadata(filepath) # Generate metadata based on chosen source excel_found = False new_metadata = {'title': '', 'subject': '', 'keywords': ''} if metadata_source == 'excel' and lookup: # Lookup metadata from Excel by filename excel_data = lookup.lookup_by_filename(filename) if excel_data: new_metadata = { 'title': excel_data.get('title', ''), 'subject': excel_data.get('description', ''), 'keywords': '' } excel_found = True else: # No Excel data found - use filename as fallback new_metadata = { 'title': Path(filename).stem, 'subject': f'No metadata found in Excel for {filename}', 'keywords': '' } elif metadata_source == 'manual': # Return empty metadata for user to fill manually new_metadata = { 'title': Path(filename).stem, # Suggest filename 'subject': '', 'keywords': '' } elif metadata_source == 'ai': # AI generation using MetadataAnalyzer analyzer = get_ai_analyzer() if analyzer: try: # Extract content from file content = extractor.extract_content(str(filepath)) if not content or len(content.strip()) < 10: # Not enough content for AI analysis new_metadata = { 'title': Path(filename).stem, 'subject': 'Insufficient content for AI analysis', 'keywords': '', '_ai_error': 'Not enough text content extracted' } else: # Generate metadata with AI new_metadata = analyzer.analyze_content(content, filename, file_type) # Log token usage if available if '_tokens_used' in new_metadata: import logging logging.getLogger(__name__).info( f"AI tokens used for {filename}: {new_metadata['_tokens_used']}" ) except Exception as e: import logging logging.getLogger(__name__).error(f"AI generation failed for {filename}: {e}") new_metadata = { 'title': Path(filename).stem, 'subject': f'AI generation error: {str(e)}', 'keywords': '', '_ai_error': str(e) } else: # AI not configured new_metadata = { 'title': Path(filename).stem, 'subject': 'AI generation not available (OpenAI API key not configured)', 'keywords': '', '_ai_error': 'OpenAI API key not configured' } elif metadata_source == 'import': # Import from external file (CSV, Excel, JSON) if import_map and importer: # Look up metadata for this file imported = importer.get_metadata_for_file(import_map, filename) if imported: new_metadata = imported excel_found = True # Mark as found in import else: # No metadata found in import file new_metadata = { 'title': Path(filename).stem, 'subject': f'No metadata found in imported file for {filename}', 'keywords': '' } else: # Import source not available new_metadata = { 'title': Path(filename).stem, 'subject': 'Import metadata not loaded', 'keywords': '' } file_info = { 'success': True, 'filename': filename, 'file_type': file_type.value, 'current_metadata': old_metadata, 'suggested_metadata': new_metadata, 'filepath': filepath, 'metadata_source': metadata_source, 'excel_found': excel_found } results.append(file_info) sessions[session_id]['files'].append(file_info) except Exception as e: results.append({ 'filename': file.filename, 'error': str(e) }) return jsonify({ 'success': True, 'session_id': session_id, 'files': results }) @app.route('/update', methods=['POST']) @login_required def update_metadata(): """Update file metadata using suggested metadata from session.""" data = request.json filepath = data.get('filepath') session_id = data.get('session_id') file_index = data.get('file_index') output_dir = data.get('output_dir', '') # User-selected output directory if not filepath or not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 # Validate session if not session_id or session_id not in sessions: return jsonify({'error': 'Invalid or expired session'}), 400 # Validate file index if file_index is None or file_index >= len(sessions[session_id]['files']): return jsonify({'error': 'Invalid file index'}), 400 try: # Get file info from session file_info = sessions[session_id]['files'][file_index] # Get suggested metadata from session new_metadata = file_info.get('suggested_metadata', {}) if not new_metadata or not new_metadata.get('title'): return jsonify({'error': 'No metadata available for this file'}), 400 # Detect file type file_type = FileDetector.detect_file_type(filepath) if file_type == FileType.UNSUPPORTED: return jsonify({'error': 'Unsupported file type'}), 400 # Get updater updater = updaters.get(file_type) if not updater: return jsonify({'error': 'No updater available for this file type'}), 400 filename = Path(filepath).name # In Docker mode, always update in-place (user will download via browser) # In local mode, allow copying to output directory if not DOCKER_MODE and output_dir and os.path.isdir(output_dir): output_path = os.path.join(output_dir, filename) shutil.copy2(filepath, output_path) target_file = output_path else: # Update in-place for Docker or when no output_dir specified target_file = filepath # Update the file metadata success = updater.update_metadata(target_file, new_metadata, backup=False) if not success: return jsonify({'error': 'Failed to update metadata'}), 500 # Verify update verified = updater.verify_metadata(target_file, new_metadata) return jsonify({ 'success': True, 'message': 'Metadata updated successfully', 'verified': verified, 'output_path': target_file, 'metadata': new_metadata }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/update-manual', methods=['POST']) @login_required def update_manual_metadata(): """Update file with manually entered metadata.""" data = request.json session_id = data.get('session_id') file_index = data.get('file_index') # Validate and sanitize metadata custom_metadata = { 'title': data.get('title', '').strip()[:200], 'subject': data.get('subject', '').strip()[:300], 'keywords': data.get('keywords', '').strip()[:500], 'author': data.get('author', '').strip()[:100], 'copyright': data.get('copyright', '').strip()[:150], 'comments': data.get('comments', '').strip()[:500] } # Add custom fields if provided custom_fields = data.get('custom_fields', {}) if custom_fields and isinstance(custom_fields, dict): for field_name, field_value in custom_fields.items(): # Sanitize custom field names and values safe_name = str(field_name).strip()[:50] safe_value = str(field_value).strip()[:200] if safe_name and safe_value: custom_metadata[safe_name] = safe_value # Validate session if not session_id or session_id not in sessions: return jsonify({'error': 'Invalid or expired session'}), 400 # Validate file index if file_index is None or file_index >= len(sessions[session_id]['files']): return jsonify({'error': 'Invalid file index'}), 400 try: # Get file info from session file_info = sessions[session_id]['files'][file_index] filepath = file_info.get('filepath') if not filepath or not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 # Detect file type file_type = FileDetector.detect_file_type(filepath) if file_type == FileType.UNSUPPORTED: return jsonify({'error': 'Unsupported file type'}), 400 # Get updater for this file type updater = updaters.get(file_type) if not updater: return jsonify({'error': 'No updater available for this file type'}), 400 # Update metadata success = updater.update_metadata(filepath, custom_metadata, backup=True) if not success: return jsonify({'error': 'Failed to update metadata'}), 500 # Update session with new metadata sessions[session_id]['files'][file_index]['suggested_metadata'] = custom_metadata # Verify update verified = updater.verify_metadata(filepath, custom_metadata) return jsonify({ 'status': 'success', 'message': 'Metadata updated successfully', 'verified': verified, 'metadata': custom_metadata }) except Exception as e: return jsonify({'error': f'Error updating metadata: {str(e)}'}), 500 @app.route('/download/') @login_required def download_file(filename): """Download processed file.""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], safe_filename(filename)) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return jsonify({'error': 'File not found'}), 404 @app.route('/download-selected', methods=['POST']) @login_required def download_selected_files(): """Download selected files from session as ZIP archive.""" try: data = request.json app.logger.info(f"download-selected called with data: {data}") session_id = data.get('session_id') file_indices = data.get('file_indices', []) app.logger.info(f"session_id: {session_id}, file_indices: {file_indices}") app.logger.info(f"Available sessions: {list(sessions.keys())}") if session_id not in sessions: app.logger.error(f"Session not found: {session_id}") return jsonify({'error': 'Session not found'}), 404 if not file_indices: app.logger.error("No files selected") return jsonify({'error': 'No files selected'}), 400 session_data = sessions[session_id] all_files = session_data.get('files', []) app.logger.info(f"Found {len(all_files)} files in session") if not all_files: app.logger.error("No files in session") return jsonify({'error': 'No files in session'}), 404 # Create a temporary ZIP file timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') zip_filename = f'oliver_metadata_files_{timestamp}.zip' zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename) app.logger.info(f"Creating ZIP at: {zip_path}") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for index in file_indices: if 0 <= index < len(all_files): file_info = all_files[index] filepath = file_info['filepath'] filename = file_info['filename'] app.logger.info(f"Adding file {index}: {filename} from {filepath}") if os.path.exists(filepath): # Add file to ZIP with its original name zipf.write(filepath, filename) app.logger.info(f"Added {filename} to ZIP") else: app.logger.warning(f"File not found: {filepath}") app.logger.info(f"ZIP created successfully, sending file") # Send the ZIP file and delete it after sending return send_file( zip_path, as_attachment=True, download_name=zip_filename, mimetype='application/zip' ) except Exception as e: app.logger.error(f"Error in download_selected_files: {str(e)}", exc_info=True) if 'zip_path' in locals() and os.path.exists(zip_path): os.remove(zip_path) return jsonify({'error': f'Error creating ZIP archive: {str(e)}'}), 500 @app.route('/cleanup-session/', methods=['POST']) @login_required def cleanup_session(session_id): """Clean up session files manually.""" try: cleanup_session_files(session_id) return jsonify({'success': True, 'message': 'Session cleaned up successfully'}) except Exception as e: app.logger.error(f"Error cleaning up session: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/upload-excel', methods=['POST']) @login_required def upload_excel(): """Upload Excel file for Excel Lookup metadata source.""" if 'excel_file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['excel_file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 try: import pandas as pd # Save temp file excel_filename = safe_filename(file.filename) temp_path = Path(app.config['UPLOAD_FOLDER']) / excel_filename file.save(str(temp_path)) # Preview Excel structure instead of loading directly excel_file = pd.ExcelFile(str(temp_path)) sheet_names = excel_file.sheet_names # Get columns and sample data from first sheet preview_data = {} for sheet_name in sheet_names[:5]: # Limit to first 5 sheets df = pd.read_excel(excel_file, sheet_name=sheet_name, nrows=5) preview_data[sheet_name] = { 'columns': df.columns.tolist(), 'sample_data': df.head(3).fillna('').to_dict('records') } # Store file path temporarily for later configuration excel_session_id = f"excel_{secrets.token_urlsafe(8)}" if 'excel_files' not in imported_metadata: imported_metadata['excel_files'] = {} imported_metadata['excel_files'][excel_session_id] = { 'path': str(temp_path), 'filename': excel_filename, 'sheet_names': sheet_names } return jsonify({ 'success': True, 'excel_session_id': excel_session_id, 'filename': excel_filename, 'sheets': sheet_names, 'preview': preview_data, 'message': f'Excel file uploaded. Please configure column mapping.' }) except Exception as e: import logging logging.getLogger(__name__).error(f"Excel upload failed: {e}") return jsonify({'error': f'Excel upload failed: {str(e)}'}), 500 @app.route('/preview-excel-sheet', methods=['POST']) @login_required def preview_excel_sheet(): """Preview a specific sheet from uploaded Excel file.""" try: import pandas as pd data = request.json excel_session_id = data.get('excel_session_id') sheet_name = data.get('sheet_name') if not excel_session_id or excel_session_id not in imported_metadata.get('excel_files', {}): return jsonify({'error': 'Invalid session ID'}), 400 excel_info = imported_metadata['excel_files'][excel_session_id] excel_path = excel_info['path'] # Read the specific sheet df = pd.read_excel(excel_path, sheet_name=sheet_name, nrows=10) return jsonify({ 'success': True, 'columns': df.columns.tolist(), 'sample_data': df.head(5).fillna('').to_dict('records') }) except Exception as e: import logging logging.getLogger(__name__).error(f"Sheet preview failed: {e}") return jsonify({'error': f'Sheet preview failed: {str(e)}'}), 500 @app.route('/configure-excel-mapping', methods=['POST']) @login_required def configure_excel_mapping(): """Configure Excel column mapping and load metadata.""" try: import pandas as pd data = request.json excel_session_id = data.get('excel_session_id') sheet_name = data.get('sheet_name') column_mapping = data.get('column_mapping', {}) # {filename: 'col', title: 'col', ...} if not excel_session_id or excel_session_id not in imported_metadata.get('excel_files', {}): return jsonify({'error': 'Invalid session ID'}), 400 excel_info = imported_metadata['excel_files'][excel_session_id] excel_path = excel_info['path'] # Read the configured sheet df = pd.read_excel(excel_path, sheet_name=sheet_name) # Build metadata map using configured columns metadata_map = {} filename_col = column_mapping.get('filename') title_col = column_mapping.get('title') description_col = column_mapping.get('description') keywords_col = column_mapping.get('keywords') if not filename_col: return jsonify({'error': 'Filename column is required'}), 400 for _, row in df.iterrows(): filename = row.get(filename_col) if pd.notna(filename) and str(filename).strip(): # Get filename without extension for indexing (case-insensitive) filename_stem = Path(str(filename).strip()).stem.lower() metadata = { 'title': str(row.get(title_col, '')).strip() if title_col and pd.notna(row.get(title_col)) else '', 'description': str(row.get(description_col, '')).strip() if description_col and pd.notna(row.get(description_col)) else '', 'keywords': str(row.get(keywords_col, '')).strip() if keywords_col and pd.notna(row.get(keywords_col)) else '', 'original_filename': str(filename).strip() } metadata_map[filename_stem] = metadata # Create a simple lookup object class ConfiguredExcelLookup: def __init__(self, metadata_map): self.metadata_map = metadata_map self.filename_to_metadata = metadata_map def lookup_by_filename(self, filename: str): filename_stem = Path(filename).stem.lower() return self.metadata_map.get(filename_stem) lookup = ConfiguredExcelLookup(metadata_map) # Store configured lookup imported_metadata[excel_session_id] = lookup # Get stats stats = { 'total_records': len(metadata_map), 'with_title': sum(1 for v in metadata_map.values() if v.get('title')), 'with_description': sum(1 for v in metadata_map.values() if v.get('description')), 'with_keywords': sum(1 for v in metadata_map.values() if v.get('keywords')) } return jsonify({ 'success': True, 'excel_session_id': excel_session_id, 'stats': stats, 'message': f'Configured mapping for {stats["total_records"]} records from sheet "{sheet_name}"' }) except Exception as e: import logging logging.getLogger(__name__).error(f"Excel configuration failed: {e}") return jsonify({'error': f'Excel configuration failed: {str(e)}'}), 500 @app.route('/import-metadata', methods=['POST']) @login_required def import_metadata(): """Upload import file and preview structure for mapping.""" if 'import_file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['import_file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 try: import pandas as pd # Save temp file import_filename = safe_filename(file.filename) temp_path = Path(app.config['UPLOAD_FOLDER']) / import_filename file.save(str(temp_path)) file_ext = temp_path.suffix.lower() # Read file and get structure if file_ext == '.csv': df = pd.read_csv(str(temp_path), nrows=5, encoding='utf-8') elif file_ext in ['.xlsx', '.xls']: df = pd.read_excel(str(temp_path), nrows=5) elif file_ext == '.json': import json with open(str(temp_path), 'r', encoding='utf-8') as f: data = json.load(f) # Convert to DataFrame if isinstance(data, list): df = pd.DataFrame(data[:5]) elif isinstance(data, dict): df = pd.DataFrame([data]) else: return jsonify({'error': 'Invalid JSON format'}), 400 else: return jsonify({'error': f'Unsupported file format: {file_ext}'}), 400 columns = df.columns.tolist() sample_data = df.fillna('').to_dict('records') # Store file path for later configuration import_session_id = f"import_{secrets.token_urlsafe(8)}" if 'import_files' not in imported_metadata: imported_metadata['import_files'] = {} imported_metadata['import_files'][import_session_id] = { 'path': str(temp_path), 'filename': import_filename, 'file_type': file_ext } return jsonify({ 'success': True, 'import_session_id': import_session_id, 'filename': import_filename, 'columns': columns, 'sample_data': sample_data, 'message': f'Import file uploaded. Please configure column mapping.' }) except Exception as e: import logging logging.getLogger(__name__).error(f"Import upload failed: {e}") return jsonify({'error': f'Import upload failed: {str(e)}'}), 500 @app.route('/configure-import-mapping', methods=['POST']) @login_required def configure_import_mapping(): """Configure import column mapping and load metadata.""" try: import pandas as pd import json data = request.json import_session_id = data.get('import_session_id') column_mapping = data.get('column_mapping', {}) if not import_session_id or import_session_id not in imported_metadata.get('import_files', {}): return jsonify({'error': 'Invalid session ID'}), 400 import_info = imported_metadata['import_files'][import_session_id] import_path = import_info['path'] file_ext = import_info['file_type'] # Read the full file if file_ext == '.csv': df = pd.read_csv(import_path, encoding='utf-8') elif file_ext in ['.xlsx', '.xls']: df = pd.read_excel(import_path) elif file_ext == '.json': with open(import_path, 'r', encoding='utf-8') as f: json_data = json.load(f) if isinstance(json_data, list): df = pd.DataFrame(json_data) else: df = pd.DataFrame([json_data]) # Build metadata map using configured columns metadata_map = {} filename_col = column_mapping.get('filename') title_col = column_mapping.get('title') subject_col = column_mapping.get('subject') keywords_col = column_mapping.get('keywords') if not filename_col: return jsonify({'error': 'Filename column is required'}), 400 for _, row in df.iterrows(): filename = row.get(filename_col) if pd.notna(filename) and str(filename).strip(): filename_stem = Path(str(filename).strip()).stem.lower() metadata = { 'title': str(row.get(title_col, '')).strip() if title_col and pd.notna(row.get(title_col)) else '', 'subject': str(row.get(subject_col, '')).strip() if subject_col and pd.notna(row.get(subject_col)) else '', 'keywords': str(row.get(keywords_col, '')).strip() if keywords_col and pd.notna(row.get(keywords_col)) else '', 'original_filename': str(filename).strip() } metadata_map[filename_stem] = metadata # Store configured metadata map imported_metadata[import_session_id] = metadata_map # Clean up temp file Path(import_path).unlink(missing_ok=True) # Get stats stats = { 'total_records': len(metadata_map), 'with_title': sum(1 for v in metadata_map.values() if v.get('title')), 'with_subject': sum(1 for v in metadata_map.values() if v.get('subject')), 'with_keywords': sum(1 for v in metadata_map.values() if v.get('keywords')) } return jsonify({ 'success': True, 'import_session_id': import_session_id, 'stats': stats, 'message': f'Configured mapping for {stats["total_records"]} records' }) except Exception as e: import logging logging.getLogger(__name__).error(f"Import configuration failed: {e}") return jsonify({'error': f'Import configuration failed: {str(e)}'}), 500 @app.route('/preview-import', methods=['POST']) @login_required def preview_import(): """Preview file structure and suggest field mappings.""" if 'import_file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['import_file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 try: # Save temp file import_filename = safe_filename(file.filename) temp_path = Path(app.config['UPLOAD_FOLDER']) / import_filename file.save(str(temp_path)) # Preview file structure importer = MetadataImporter() columns, sample_rows, suggestions = importer.preview_file_structure(str(temp_path)) # Clean up temp file temp_path.unlink() # Format suggestions for frontend formatted_suggestions = {} for source_field, suggestion_data in suggestions.items(): formatted_suggestions[source_field] = { 'best_match': suggestion_data['best_match'], 'confidence': round(suggestion_data['confidence'], 2), 'alternatives': [ {'field': alt['field'], 'confidence': round(alt['confidence'], 2)} for alt in suggestion_data.get('alternatives', []) ] } return jsonify({ 'success': True, 'columns': columns, 'sample_rows': sample_rows[:5], # Limit to 5 rows 'suggestions': formatted_suggestions, 'filename': import_filename }) except Exception as e: import logging logging.getLogger(__name__).error(f"Preview failed: {e}") # Try to clean up temp file try: if temp_path.exists(): temp_path.unlink() except: pass return jsonify({'error': f'Preview failed: {str(e)}'}), 500 @app.route('/stats') @login_required def get_stats(): """Get Excel metadata statistics.""" try: lookup = get_metadata_lookup() stats = lookup.get_stats() return jsonify({ 'success': True, 'stats': stats }) except Exception as e: return jsonify({'error': str(e)}), 500 # Template Management Endpoints template_manager = TemplateManager() @app.route('/templates/list', methods=['GET']) @login_required def list_templates(): """List all available templates.""" try: templates = template_manager.list_templates() return jsonify({ 'success': True, 'templates': templates }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/save', methods=['POST']) @login_required def save_template(): """Save a new template.""" try: data = request.json name = data.get('name', '').strip() if not name: return jsonify({'error': 'Template name is required'}), 400 template = template_manager.create_template( name=name, title_template=data.get('title', ''), subject_template=data.get('subject', ''), keywords_template=data.get('keywords', ''), description=data.get('description', '') ) success = template_manager.save_template(template) if success: return jsonify({ 'success': True, 'message': f'Template "{name}" saved successfully', 'template': template }) else: return jsonify({'error': 'Failed to save template'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/load/', methods=['GET']) @login_required def load_template(name): """Load a template by name.""" try: template = template_manager.load_template(name) if template: return jsonify({ 'success': True, 'template': template }) else: return jsonify({'error': f'Template "{name}" not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/delete/', methods=['DELETE']) @login_required def delete_template(name): """Delete a template.""" try: success = template_manager.delete_template(name) if success: return jsonify({ 'success': True, 'message': f'Template "{name}" deleted successfully' }) else: return jsonify({'error': f'Template "{name}" not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/apply', methods=['POST']) @login_required def apply_template(): """Apply a template to generate metadata for files.""" try: data = request.json template_name = data.get('template_name', '').strip() file_indices = data.get('file_indices', []) session_id = data.get('session_id') custom_vars = data.get('custom_vars', {}) if not template_name: return jsonify({'error': 'Template name is required'}), 400 if not session_id or session_id not in sessions: return jsonify({'error': 'Invalid or expired session'}), 400 # Load template template = template_manager.load_template(template_name) if not template: return jsonify({'error': f'Template "{template_name}" not found'}), 404 # Apply template to each file results = [] for file_index in file_indices: if file_index >= len(sessions[session_id]['files']): continue file_info = sessions[session_id]['files'][file_index] filename = file_info.get('filename', 'unknown') # Generate metadata from template metadata = template_manager.apply_template( template=template, filename=filename, user='web_user', custom_vars=custom_vars ) # Update file metadata in session sessions[session_id]['files'][file_index]['suggested_metadata'] = metadata results.append({ 'file_index': file_index, 'filename': filename, 'metadata': metadata }) return jsonify({ 'success': True, 'message': f'Template applied to {len(results)} file(s)', 'results': results }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/preview', methods=['POST']) @login_required def preview_template(): """Preview template output with sample data.""" try: data = request.json template = { 'name': 'preview', 'title': data.get('title', ''), 'subject': data.get('subject', ''), 'keywords': data.get('keywords', '') } sample_filename = data.get('sample_filename', 'example.pdf') custom_vars = data.get('custom_vars', {}) preview = template_manager.preview_template( template=template, sample_filename=sample_filename, user='web_user', custom_vars=custom_vars ) # Also get available variables available_vars = template_manager.get_available_variables() return jsonify({ 'success': True, 'preview': preview, 'available_variables': available_vars }) except Exception as e: return jsonify({'error': str(e)}), 500 def open_browser(): """Open browser after short delay.""" sleep(1.5) webbrowser.open('http://localhost:5001') if __name__ == '__main__': print("="*60) print(f"{Config.APP_NAME} v{Config.APP_VERSION} - Web Interface") print("="*60) # Check dependencies print("\n🔍 Checking dependencies...") # Check Excel file if not EXCEL_PATH.exists(): print(f"⚠️ Warning: Excel file not found at {EXCEL_PATH}") print(" Excel metadata lookup will not be available") print(" Please ensure the Excel file is in the project root") else: print(f"✓ Excel file found: {EXCEL_PATH.name}") # Check OpenAI API key (optional) if Config.OPENAI_API_KEY: print("✓ OpenAI API key configured (AI metadata generation available)") else: print("ℹ️ OpenAI API key not configured (AI generation disabled)") # Check ExifTool (optional) if Config.check_exiftool(): print("✓ ExifTool available for enhanced metadata operations") else: print("ℹ️ ExifTool not installed (using Python libraries)") print("\nMetadata sources available:") print(" • Excel lookup (Celum ID mapping)") if Config.OPENAI_API_KEY: print(" • AI generation (OpenAI)") print(" • Manual entry") print(" • File import (CSV/Excel/JSON)") print("\nStarting server...") # Docker mode configuration if DOCKER_MODE: print("Running in Docker mode") print("Server will be accessible at http://0.0.0.0:5001") host = '0.0.0.0' else: print("Opening browser at http://localhost:5001") host = '127.0.0.1' # Open browser in background (only in local mode) threading.Thread(target=open_browser, daemon=True).start() print("\nPress Ctrl+C to stop the server") print("="*60) # Clean up old files on startup if DOCKER_MODE: print("\n🧹 Cleaning up old files...") cleanup_old_files(max_age_hours=24) # Run Flask app app.run(debug=False, port=5001, host=host)