#!/usr/bin/env python3 """ Oliver Metadata Tool - Web Interface Universal metadata creation and management tool for files. Flask-based web app for local or server deployment. Supports multiple metadata sources: Excel, AI, manual entry, and file import. """ from flask import Flask, render_template, request, jsonify, send_file from werkzeug.utils import secure_filename # noqa: F401 - kept as fallback from pathlib import Path import os import tempfile import threading import webbrowser from time import sleep import shutil import unicodedata import secrets from src.file_detector import FileDetector, FileType from src.excel_metadata_lookup import ExcelMetadataLookup from src.config import Config from src.metadata_analyzer import MetadataAnalyzer from src.metadata_importer import MetadataImporter from src.template_manager import TemplateManager from src.auth import login_required, authenticate_user, create_user_session, destroy_user_session, get_current_user, is_sso_enabled, get_sso_instance, cleanup_sessions from src.database import Database def safe_filename(filename): """Sanitize filename while preserving Unicode characters (Chinese, Japanese, Korean).""" # Normalize unicode filename = unicodedata.normalize('NFC', filename) # Remove path separators and null bytes filename = filename.replace('/', '_').replace('\\', '_').replace('\x00', '') # Remove leading/trailing dots and spaces filename = filename.strip('. ') # If empty, use default if not filename: filename = 'unnamed_file' return filename from src.extractors.pdf_extractor import PDFExtractor from src.extractors.image_extractor import ImageExtractor from src.extractors.office_extractor import OfficeExtractor from src.extractors.video_extractor import VideoExtractor from src.updaters.pdf_updater import PDFUpdater from src.updaters.image_updater import ImageUpdater from src.updaters.office_updater import OfficeUpdater from src.updaters.video_updater import VideoUpdater app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size app.config['UPLOAD_FOLDER'] = tempfile.mkdtemp() app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', secrets.token_hex(32)) # Excel file path for metadata lookup EXCEL_PATH = Path(__file__).parent / "Celum ID to Adobe Asset Path Mapping Spreadsheet (1).xlsx" # Initialize metadata lookup from Excel metadata_lookup = None # Initialize AI analyzer (lazy initialization) ai_analyzer = None # Initialize extractors and updaters extractors = { FileType.PDF: PDFExtractor(), FileType.IMAGE: ImageExtractor(), FileType.OFFICE_DOC: OfficeExtractor(), FileType.OFFICE_SHEET: OfficeExtractor(), FileType.OFFICE_PRESENTATION: OfficeExtractor(), FileType.VIDEO: VideoExtractor() } updaters = { FileType.PDF: PDFUpdater(), FileType.IMAGE: ImageUpdater(), FileType.OFFICE_DOC: OfficeUpdater(), FileType.OFFICE_SHEET: OfficeUpdater(), FileType.OFFICE_PRESENTATION: OfficeUpdater(), FileType.VIDEO: VideoUpdater() } # Store file processing sessions sessions = {} # Store imported metadata from external files imported_metadata = {} def get_metadata_lookup(): """Get or create metadata lookup instance.""" global metadata_lookup if metadata_lookup is None: metadata_lookup = ExcelMetadataLookup(str(EXCEL_PATH)) return metadata_lookup def get_ai_analyzer(): """Get or create AI analyzer instance.""" global ai_analyzer if ai_analyzer is None: if Config.OPENAI_API_KEY: try: ai_analyzer = MetadataAnalyzer() logger = __import__('logging').getLogger(__name__) logger.info("AI analyzer initialized successfully") except Exception as e: logger = __import__('logging').getLogger(__name__) logger.error(f"Failed to initialize AI analyzer: {e}") return None else: return None return ai_analyzer @app.route('/login', methods=['GET', 'POST']) def login(): """Login page and handler.""" if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') if not username or not password: return render_template('login.html', error='Please enter both username and password', sso_enabled=is_sso_enabled()) # Authenticate user result = authenticate_user(username, password) if result['success']: user = result['user'] # Create session session_id = create_user_session( user=user, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) if session_id: # Set Flask session session['user_id'] = user['id'] session['username'] = user['username'] session['session_id'] = session_id # Redirect to original destination or home next_url = request.args.get('next', url_for('index')) return redirect(next_url) else: return render_template('login.html', error='Failed to create session', sso_enabled=is_sso_enabled()) else: return render_template('login.html', error=result.get('error'), sso_enabled=is_sso_enabled()) # GET request - show login form return render_template('login.html', sso_enabled=is_sso_enabled()) @app.route('/logout') def logout(): """Logout user.""" user_id = session.get('user_id') session_id = session.get('session_id') if session_id: destroy_user_session(session_id, user_id) session.clear() return redirect(url_for('login')) @app.route('/login/microsoft') def login_microsoft(): """Redirect to Microsoft SSO.""" sso = get_sso_instance() if not sso.enabled: return render_template('login.html', error='Microsoft SSO not configured', sso_enabled=False) # Generate state for CSRF protection state = secrets.token_urlsafe(16) session['oauth_state'] = state auth_url = sso.get_auth_url(state=state) if auth_url: return redirect(auth_url) else: return render_template('login.html', error='Failed to generate SSO URL', sso_enabled=is_sso_enabled()) @app.route('/auth/callback') def auth_callback(): """Handle Microsoft SSO callback.""" sso = get_sso_instance() # Verify state if request.args.get('state') != session.get('oauth_state'): return render_template('login.html', error='Invalid state parameter', sso_enabled=is_sso_enabled()) code = request.args.get('code') if not code: error_desc = request.args.get('error_description', 'No authorization code') return render_template('login.html', error=f'SSO failed: {error_desc}', sso_enabled=is_sso_enabled()) # Exchange code for token result = sso.acquire_token(code) if result and 'access_token' in result: # Get user info from Microsoft Graph user_info = sso.get_user_info(result['access_token']) if user_info: # Create or update user user = sso.create_or_update_user(user_info) if user: # Create session session_id = create_user_session( user=user, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) if session_id: # Set Flask session session['user_id'] = user['id'] session['username'] = user['username'] session['session_id'] = session_id return redirect(url_for('index')) return render_template('login.html', error='SSO authentication failed', sso_enabled=is_sso_enabled()) @app.route('/') @login_required def index(): """Main page.""" user = get_current_user() return render_template('index.html', username=user['username'] if user else None) @app.route('/upload', methods=['POST']) @login_required def upload_file(): """Handle multiple file uploads and metadata lookup from Excel.""" if 'files' not in request.files: return jsonify({'error': 'No files provided'}), 400 files = request.files.getlist('files') if not files or files[0].filename == '': return jsonify({'error': 'No files selected'}), 400 # Get metadata source choice (excel, manual, ai, import) metadata_source = request.form.get('metadata_source', 'excel') import_session_id = request.form.get('import_session_id', '') # For import source results = [] session_id = str(len(sessions) + 1) sessions[session_id] = { 'files': [], 'metadata_source': metadata_source, 'import_session_id': import_session_id } # Get metadata lookup (only if using Excel source) lookup = get_metadata_lookup() if metadata_source == 'excel' else None # Get imported metadata (only if using import source) import_map = None if metadata_source == 'import' and import_session_id and import_session_id in imported_metadata: import_map = imported_metadata[import_session_id] importer = MetadataImporter() elif metadata_source == 'import': # Import source selected but no import session available return jsonify({'error': 'Please import a metadata file first using the Import button'}), 400 for file in files: try: # Save uploaded file filename = safe_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Detect file type file_type = FileDetector.detect_file_type(filepath) if file_type == FileType.UNSUPPORTED: results.append({ 'filename': filename, 'error': 'Unsupported file type' }) continue # Get extractor for this file type extractor = extractors.get(file_type) if not extractor: results.append({ 'filename': filename, 'error': 'No extractor available' }) continue # Read current metadata from file old_metadata = extractor.read_metadata(filepath) # Generate metadata based on chosen source excel_found = False new_metadata = {'title': '', 'subject': '', 'keywords': ''} if metadata_source == 'excel' and lookup: # Lookup metadata from Excel by filename excel_data = lookup.lookup_by_filename(filename) if excel_data: new_metadata = { 'title': excel_data.get('title', ''), 'subject': excel_data.get('description', ''), 'keywords': '' } excel_found = True else: # No Excel data found - use filename as fallback new_metadata = { 'title': Path(filename).stem, 'subject': f'No metadata found in Excel for {filename}', 'keywords': '' } elif metadata_source == 'manual': # Return empty metadata for user to fill manually new_metadata = { 'title': Path(filename).stem, # Suggest filename 'subject': '', 'keywords': '' } elif metadata_source == 'ai': # AI generation using MetadataAnalyzer analyzer = get_ai_analyzer() if analyzer: try: # Extract content from file content = extractor.extract_content(str(filepath)) if not content or len(content.strip()) < 10: # Not enough content for AI analysis new_metadata = { 'title': Path(filename).stem, 'subject': 'Insufficient content for AI analysis', 'keywords': '', '_ai_error': 'Not enough text content extracted' } else: # Generate metadata with AI new_metadata = analyzer.analyze_content(content, filename, file_type) # Log token usage if available if '_tokens_used' in new_metadata: import logging logging.getLogger(__name__).info( f"AI tokens used for {filename}: {new_metadata['_tokens_used']}" ) except Exception as e: import logging logging.getLogger(__name__).error(f"AI generation failed for {filename}: {e}") new_metadata = { 'title': Path(filename).stem, 'subject': f'AI generation error: {str(e)}', 'keywords': '', '_ai_error': str(e) } else: # AI not configured new_metadata = { 'title': Path(filename).stem, 'subject': 'AI generation not available (OpenAI API key not configured)', 'keywords': '', '_ai_error': 'OpenAI API key not configured' } elif metadata_source == 'import': # Import from external file (CSV, Excel, JSON) if import_map and importer: # Look up metadata for this file imported = importer.get_metadata_for_file(import_map, filename) if imported: new_metadata = imported excel_found = True # Mark as found in import else: # No metadata found in import file new_metadata = { 'title': Path(filename).stem, 'subject': f'No metadata found in imported file for {filename}', 'keywords': '' } else: # Import source not available new_metadata = { 'title': Path(filename).stem, 'subject': 'Import metadata not loaded', 'keywords': '' } file_info = { 'success': True, 'filename': filename, 'file_type': file_type.value, 'current_metadata': old_metadata, 'suggested_metadata': new_metadata, 'filepath': filepath, 'metadata_source': metadata_source, 'excel_found': excel_found } results.append(file_info) sessions[session_id]['files'].append(file_info) except Exception as e: results.append({ 'filename': file.filename, 'error': str(e) }) return jsonify({ 'success': True, 'session_id': session_id, 'files': results }) @app.route('/update', methods=['POST']) @login_required def update_metadata(): """Update file metadata from Excel and save to chosen location.""" data = request.json filepath = data.get('filepath') output_dir = data.get('output_dir', '') # User-selected output directory if not filepath or not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 try: # Detect file type file_type = FileDetector.detect_file_type(filepath) if file_type == FileType.UNSUPPORTED: return jsonify({'error': 'Unsupported file type'}), 400 # Get updater updater = updaters.get(file_type) if not updater: return jsonify({'error': 'No updater available for this file type'}), 400 # Lookup metadata from Excel filename = Path(filepath).name lookup = get_metadata_lookup() excel_data = lookup.lookup_by_filename(filename) if excel_data: new_metadata = { 'title': excel_data.get('title', ''), 'subject': excel_data.get('description', ''), # External Description/Alt Text 'keywords': '' } else: return jsonify({'error': f'No metadata found in Excel for {filename}'}), 400 # Copy file to output directory if specified if output_dir and os.path.isdir(output_dir): output_path = os.path.join(output_dir, filename) shutil.copy2(filepath, output_path) target_file = output_path else: target_file = filepath # Update the file metadata WITHOUT changing filename success = updater.update_metadata(target_file, new_metadata, backup=False) if not success: return jsonify({'error': 'Failed to update metadata'}), 500 # Verify update verified = updater.verify_metadata(target_file, new_metadata) return jsonify({ 'success': True, 'message': 'Metadata updated successfully', 'verified': verified, 'output_path': target_file, 'metadata': new_metadata }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/update-manual', methods=['POST']) @login_required def update_manual_metadata(): """Update file with manually entered metadata.""" data = request.json session_id = data.get('session_id') file_index = data.get('file_index') # Validate and sanitize metadata custom_metadata = { 'title': data.get('title', '').strip()[:200], 'subject': data.get('subject', '').strip()[:300], 'keywords': data.get('keywords', '').strip()[:500] } # Validate session if not session_id or session_id not in sessions: return jsonify({'error': 'Invalid or expired session'}), 400 # Validate file index if file_index is None or file_index >= len(sessions[session_id]['files']): return jsonify({'error': 'Invalid file index'}), 400 try: # Get file info from session file_info = sessions[session_id]['files'][file_index] filepath = file_info.get('filepath') if not filepath or not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 # Detect file type file_type = FileDetector.detect_file_type(filepath) if file_type == FileType.UNSUPPORTED: return jsonify({'error': 'Unsupported file type'}), 400 # Get updater for this file type updater = updaters.get(file_type) if not updater: return jsonify({'error': 'No updater available for this file type'}), 400 # Update metadata success = updater.update_metadata(filepath, custom_metadata, backup=True) if not success: return jsonify({'error': 'Failed to update metadata'}), 500 # Update session with new metadata sessions[session_id]['files'][file_index]['suggested_metadata'] = custom_metadata # Verify update verified = updater.verify_metadata(filepath, custom_metadata) return jsonify({ 'status': 'success', 'message': 'Metadata updated successfully', 'verified': verified, 'metadata': custom_metadata }) except Exception as e: return jsonify({'error': f'Error updating metadata: {str(e)}'}), 500 @app.route('/download/') @login_required def download_file(filename): """Download processed file.""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], safe_filename(filename)) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return jsonify({'error': 'File not found'}), 404 @app.route('/import-metadata', methods=['POST']) @login_required def import_metadata(): """Import metadata from external file (CSV, Excel, JSON).""" if 'import_file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['import_file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 try: # Save temp file import_filename = safe_filename(file.filename) temp_path = Path(app.config['UPLOAD_FOLDER']) / import_filename file.save(str(temp_path)) # Import based on file type importer = MetadataImporter() file_ext = temp_path.suffix.lower() if file_ext == '.csv': metadata_map = importer.import_from_csv(str(temp_path)) elif file_ext in ['.xlsx', '.xls']: metadata_map = importer.import_from_excel(str(temp_path)) elif file_ext == '.json': metadata_map = importer.import_from_json(str(temp_path)) else: return jsonify({'error': f'Unsupported file format: {file_ext}. Supported: .csv, .xlsx, .xls, .json'}), 400 # Validate import stats = importer.validate_import(metadata_map) # Store in global dict with unique session ID import_session_id = f"import_{len(imported_metadata) + 1}" imported_metadata[import_session_id] = metadata_map # Clean up temp file temp_path.unlink() return jsonify({ 'success': True, 'import_session_id': import_session_id, 'stats': stats, 'message': f'Imported {stats["total_records"]} metadata records from {import_filename}' }) except Exception as e: import logging logging.getLogger(__name__).error(f"Import failed: {e}") return jsonify({'error': f'Import failed: {str(e)}'}), 500 @app.route('/preview-import', methods=['POST']) @login_required def preview_import(): """Preview file structure and suggest field mappings.""" if 'import_file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['import_file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 try: # Save temp file import_filename = safe_filename(file.filename) temp_path = Path(app.config['UPLOAD_FOLDER']) / import_filename file.save(str(temp_path)) # Preview file structure importer = MetadataImporter() columns, sample_rows, suggestions = importer.preview_file_structure(str(temp_path)) # Clean up temp file temp_path.unlink() # Format suggestions for frontend formatted_suggestions = {} for source_field, suggestion_data in suggestions.items(): formatted_suggestions[source_field] = { 'best_match': suggestion_data['best_match'], 'confidence': round(suggestion_data['confidence'], 2), 'alternatives': [ {'field': alt['field'], 'confidence': round(alt['confidence'], 2)} for alt in suggestion_data.get('alternatives', []) ] } return jsonify({ 'success': True, 'columns': columns, 'sample_rows': sample_rows[:5], # Limit to 5 rows 'suggestions': formatted_suggestions, 'filename': import_filename }) except Exception as e: import logging logging.getLogger(__name__).error(f"Preview failed: {e}") # Try to clean up temp file try: if temp_path.exists(): temp_path.unlink() except: pass return jsonify({'error': f'Preview failed: {str(e)}'}), 500 @app.route('/stats') @login_required def get_stats(): """Get Excel metadata statistics.""" try: lookup = get_metadata_lookup() stats = lookup.get_stats() return jsonify({ 'success': True, 'stats': stats }) except Exception as e: return jsonify({'error': str(e)}), 500 # Template Management Endpoints template_manager = TemplateManager() @app.route('/templates/list', methods=['GET']) @login_required def list_templates(): """List all available templates.""" try: templates = template_manager.list_templates() return jsonify({ 'success': True, 'templates': templates }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/save', methods=['POST']) @login_required def save_template(): """Save a new template.""" try: data = request.json name = data.get('name', '').strip() if not name: return jsonify({'error': 'Template name is required'}), 400 template = template_manager.create_template( name=name, title_template=data.get('title', ''), subject_template=data.get('subject', ''), keywords_template=data.get('keywords', ''), description=data.get('description', '') ) success = template_manager.save_template(template) if success: return jsonify({ 'success': True, 'message': f'Template "{name}" saved successfully', 'template': template }) else: return jsonify({'error': 'Failed to save template'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/load/', methods=['GET']) @login_required def load_template(name): """Load a template by name.""" try: template = template_manager.load_template(name) if template: return jsonify({ 'success': True, 'template': template }) else: return jsonify({'error': f'Template "{name}" not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/delete/', methods=['DELETE']) @login_required def delete_template(name): """Delete a template.""" try: success = template_manager.delete_template(name) if success: return jsonify({ 'success': True, 'message': f'Template "{name}" deleted successfully' }) else: return jsonify({'error': f'Template "{name}" not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/apply', methods=['POST']) @login_required def apply_template(): """Apply a template to generate metadata for files.""" try: data = request.json template_name = data.get('template_name', '').strip() file_indices = data.get('file_indices', []) session_id = data.get('session_id') custom_vars = data.get('custom_vars', {}) if not template_name: return jsonify({'error': 'Template name is required'}), 400 if not session_id or session_id not in sessions: return jsonify({'error': 'Invalid or expired session'}), 400 # Load template template = template_manager.load_template(template_name) if not template: return jsonify({'error': f'Template "{template_name}" not found'}), 404 # Apply template to each file results = [] for file_index in file_indices: if file_index >= len(sessions[session_id]['files']): continue file_info = sessions[session_id]['files'][file_index] filename = file_info.get('filename', 'unknown') # Generate metadata from template metadata = template_manager.apply_template( template=template, filename=filename, user='web_user', custom_vars=custom_vars ) # Update file metadata in session sessions[session_id]['files'][file_index]['suggested_metadata'] = metadata results.append({ 'file_index': file_index, 'filename': filename, 'metadata': metadata }) return jsonify({ 'success': True, 'message': f'Template applied to {len(results)} file(s)', 'results': results }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/templates/preview', methods=['POST']) @login_required def preview_template(): """Preview template output with sample data.""" try: data = request.json template = { 'name': 'preview', 'title': data.get('title', ''), 'subject': data.get('subject', ''), 'keywords': data.get('keywords', '') } sample_filename = data.get('sample_filename', 'example.pdf') custom_vars = data.get('custom_vars', {}) preview = template_manager.preview_template( template=template, sample_filename=sample_filename, user='web_user', custom_vars=custom_vars ) # Also get available variables available_vars = template_manager.get_available_variables() return jsonify({ 'success': True, 'preview': preview, 'available_variables': available_vars }) except Exception as e: return jsonify({'error': str(e)}), 500 def open_browser(): """Open browser after short delay.""" sleep(1.5) webbrowser.open('http://localhost:5001') if __name__ == '__main__': print("="*60) print(f"{Config.APP_NAME} v{Config.APP_VERSION} - Web Interface") print("="*60) # Check dependencies print("\nšŸ” Checking dependencies...") # Check Excel file if not EXCEL_PATH.exists(): print(f"āš ļø Warning: Excel file not found at {EXCEL_PATH}") print(" Excel metadata lookup will not be available") print(" Please ensure the Excel file is in the project root") else: print(f"āœ“ Excel file found: {EXCEL_PATH.name}") # Check OpenAI API key (optional) if Config.OPENAI_API_KEY: print("āœ“ OpenAI API key configured (AI metadata generation available)") else: print("ā„¹ļø OpenAI API key not configured (AI generation disabled)") # Check ExifTool (optional) if Config.check_exiftool(): print("āœ“ ExifTool available for enhanced metadata operations") else: print("ā„¹ļø ExifTool not installed (using Python libraries)") print("\nMetadata sources available:") print(" • Excel lookup (Celum ID mapping)") if Config.OPENAI_API_KEY: print(" • AI generation (OpenAI)") print(" • Manual entry") print(" • File import (CSV/Excel/JSON)") print("\nStarting server...") print("Opening browser at http://localhost:5001") print("\nPress Ctrl+C to stop the server") print("="*60) # Open browser in background threading.Thread(target=open_browser, daemon=True).start() # Run Flask app app.run(debug=False, port=5001, host='127.0.0.1')