oliver-metadata-tool/web_app.py
SamoilenkoVadym 3deaa5ef40 Initial commit: Oliver Metadata Tool (FastAPI)
Complete Flask → FastAPI migration with:
- FastAPI app with session auth, Azure AD SSO, rate limiting
- SQLite-backed session store (survives restarts)
- Bulk AI metadata generation with SSE progress
- Admin panel (user management, audit log, AI usage)
- Subpath deployment support (ROOT_PATH config)
- Docker + deploy.sh for production deployment
- Test suite (auth, upload, templates, imports, admin, sessions)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-02-09 21:23:42 +00:00

1381 lines
49 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Oliver Metadata Tool - Web Interface
Universal metadata creation and management tool for files.
Flask-based web app for local or server deployment.
Supports multiple metadata sources: Excel, AI, manual entry, and file import.
"""
from flask import Flask, render_template, request, jsonify, send_file, session, redirect, url_for
from werkzeug.utils import secure_filename # noqa: F401 - kept as fallback
from pathlib import Path
import os
import tempfile
import threading
import webbrowser
from time import sleep
import shutil
import unicodedata
import secrets
import zipfile
from datetime import datetime
from src.file_detector import FileDetector, FileType
from src.excel_metadata_lookup import ExcelMetadataLookup
from src.config import Config
from src.metadata_analyzer import MetadataAnalyzer
from src.metadata_importer import MetadataImporter
from src.template_manager import TemplateManager
from src.auth import login_required, authenticate_user, create_user_session, destroy_user_session, get_current_user, is_sso_enabled, get_sso_instance, cleanup_sessions
from src.database import Database
def safe_filename(filename):
"""Sanitize filename while preserving Unicode characters (Chinese, Japanese, Korean)."""
# Normalize unicode
filename = unicodedata.normalize('NFC', filename)
# Remove path separators and null bytes
filename = filename.replace('/', '_').replace('\\', '_').replace('\x00', '')
# Remove leading/trailing dots and spaces
filename = filename.strip('. ')
# If empty, use default
if not filename:
filename = 'unnamed_file'
return filename
from src.extractors.pdf_extractor import PDFExtractor
from src.extractors.image_extractor import ImageExtractor
from src.extractors.office_extractor import OfficeExtractor
from src.extractors.video_extractor import VideoExtractor
from src.updaters.pdf_updater import PDFUpdater
from src.updaters.image_updater import ImageUpdater
from src.updaters.office_updater import OfficeUpdater
from src.updaters.video_updater import VideoUpdater
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size
# Docker mode detection
DOCKER_MODE = os.getenv('DOCKER_MODE', 'false').lower() == 'true'
# Upload folder configuration
if DOCKER_MODE:
# Use persistent directory in Docker
UPLOAD_FOLDER = Path('/app/uploads')
UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True)
app.config['UPLOAD_FOLDER'] = str(UPLOAD_FOLDER)
else:
# Use temp directory for local development
app.config['UPLOAD_FOLDER'] = tempfile.mkdtemp()
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', secrets.token_hex(32))
# Excel file path for metadata lookup
EXCEL_PATH = Path(__file__).parent / "Celum ID to Adobe Asset Path Mapping Spreadsheet (1).xlsx"
# Initialize metadata lookup from Excel
metadata_lookup = None
# Initialize AI analyzer (lazy initialization)
ai_analyzer = None
# Initialize extractors and updaters
extractors = {
FileType.PDF: PDFExtractor(),
FileType.IMAGE: ImageExtractor(),
FileType.OFFICE_DOC: OfficeExtractor(),
FileType.OFFICE_SHEET: OfficeExtractor(),
FileType.OFFICE_PRESENTATION: OfficeExtractor(),
FileType.VIDEO: VideoExtractor()
}
updaters = {
FileType.PDF: PDFUpdater(),
FileType.IMAGE: ImageUpdater(),
FileType.OFFICE_DOC: OfficeUpdater(),
FileType.OFFICE_SHEET: OfficeUpdater(),
FileType.OFFICE_PRESENTATION: OfficeUpdater(),
FileType.VIDEO: VideoUpdater()
}
# Store file processing sessions
sessions = {}
# Store imported metadata from external files
imported_metadata = {}
def cleanup_session_files(session_id: str):
"""
Clean up files associated with a session.
Removes uploaded files from disk to free up space.
"""
if session_id not in sessions:
return
session_data = sessions[session_id]
files = session_data.get('files', [])
for file_info in files:
filepath = file_info.get('filepath')
if filepath and os.path.exists(filepath):
try:
os.remove(filepath)
app.logger.info(f"Cleaned up file: {filepath}")
except Exception as e:
app.logger.warning(f"Failed to cleanup file {filepath}: {e}")
# Remove session from memory
sessions.pop(session_id, None)
def cleanup_old_files(max_age_hours: int = 24):
"""
Clean up files older than max_age_hours from upload folder.
Runs automatically to prevent disk space issues.
"""
try:
upload_folder = Path(app.config['UPLOAD_FOLDER'])
now = datetime.now().timestamp()
max_age_seconds = max_age_hours * 3600
cleaned = 0
for filepath in upload_folder.glob('*'):
if filepath.is_file():
file_age = now - filepath.stat().st_mtime
if file_age > max_age_seconds:
try:
filepath.unlink()
cleaned += 1
app.logger.info(f"Cleaned up old file: {filepath.name}")
except Exception as e:
app.logger.warning(f"Failed to cleanup {filepath.name}: {e}")
if cleaned > 0:
app.logger.info(f"Cleaned up {cleaned} old file(s)")
except Exception as e:
app.logger.error(f"Error in cleanup_old_files: {e}")
def get_metadata_lookup():
"""Get or create metadata lookup instance."""
global metadata_lookup
if metadata_lookup is None:
metadata_lookup = ExcelMetadataLookup(str(EXCEL_PATH))
return metadata_lookup
def get_ai_analyzer():
"""Get or create AI analyzer instance."""
global ai_analyzer
if ai_analyzer is None:
if Config.OPENAI_API_KEY:
try:
ai_analyzer = MetadataAnalyzer()
logger = __import__('logging').getLogger(__name__)
logger.info("AI analyzer initialized successfully")
except Exception as e:
logger = __import__('logging').getLogger(__name__)
logger.error(f"Failed to initialize AI analyzer: {e}")
return None
else:
return None
return ai_analyzer
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page and handler."""
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
if not username or not password:
return render_template('login.html', error='Please enter both username and password', sso_enabled=is_sso_enabled())
# Authenticate user
result = authenticate_user(username, password)
if result['success']:
user = result['user']
# Create session
session_id = create_user_session(
user=user,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
if session_id:
# Set Flask session
session['user_id'] = user['id']
session['username'] = user['username']
session['session_id'] = session_id
# Redirect to original destination or home
next_url = request.args.get('next', url_for('index'))
return redirect(next_url)
else:
return render_template('login.html', error='Failed to create session', sso_enabled=is_sso_enabled())
else:
return render_template('login.html', error=result.get('error'), sso_enabled=is_sso_enabled())
# GET request - show login form
return render_template('login.html', sso_enabled=is_sso_enabled())
@app.route('/logout')
def logout():
"""Logout user and cleanup session files."""
user_id = session.get('user_id')
session_id = session.get('session_id')
# Clean up all file processing sessions for this user
# (In-memory sessions don't have user_id, so we clean all)
sessions_to_cleanup = list(sessions.keys())
for sid in sessions_to_cleanup:
cleanup_session_files(sid)
if session_id:
destroy_user_session(session_id, user_id)
session.clear()
return redirect(url_for('login'))
@app.route('/login/microsoft')
def login_microsoft():
"""Redirect to Microsoft SSO."""
sso = get_sso_instance()
if not sso.enabled:
return render_template('login.html', error='Microsoft SSO not configured', sso_enabled=False)
# Generate state for CSRF protection
state = secrets.token_urlsafe(16)
session['oauth_state'] = state
auth_url = sso.get_auth_url(state=state)
if auth_url:
return redirect(auth_url)
else:
return render_template('login.html', error='Failed to generate SSO URL', sso_enabled=is_sso_enabled())
@app.route('/auth/callback')
def auth_callback():
"""Handle Microsoft SSO callback."""
sso = get_sso_instance()
# Verify state
if request.args.get('state') != session.get('oauth_state'):
return render_template('login.html', error='Invalid state parameter', sso_enabled=is_sso_enabled())
code = request.args.get('code')
if not code:
error_desc = request.args.get('error_description', 'No authorization code')
return render_template('login.html', error=f'SSO failed: {error_desc}', sso_enabled=is_sso_enabled())
# Exchange code for token
result = sso.acquire_token(code)
if result and 'access_token' in result:
# Get user info from Microsoft Graph
user_info = sso.get_user_info(result['access_token'])
if user_info:
# Create or update user
user = sso.create_or_update_user(user_info)
if user:
# Create session
session_id = create_user_session(
user=user,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
if session_id:
# Set Flask session
session['user_id'] = user['id']
session['username'] = user['username']
session['session_id'] = session_id
return redirect(url_for('index'))
return render_template('login.html', error='SSO authentication failed', sso_enabled=is_sso_enabled())
@app.route('/')
@login_required
def index():
"""Main page."""
user = get_current_user()
return render_template('index.html',
username=user['username'] if user else None,
docker_mode=DOCKER_MODE)
@app.route('/upload', methods=['POST'])
@login_required
def upload_file():
"""Handle multiple file uploads and metadata lookup from Excel."""
if 'files' not in request.files:
return jsonify({'error': 'No files provided'}), 400
files = request.files.getlist('files')
if not files or files[0].filename == '':
return jsonify({'error': 'No files selected'}), 400
# Get metadata source choice (excel, manual, ai, import)
metadata_source = request.form.get('metadata_source', 'excel')
import_session_id = request.form.get('import_session_id', '') # For import source
results = []
session_id = str(len(sessions) + 1)
sessions[session_id] = {
'files': [],
'metadata_source': metadata_source,
'import_session_id': import_session_id
}
# Get metadata lookup (only if using Excel source)
excel_session_id = request.form.get('excel_session_id')
lookup = None
if metadata_source == 'excel':
if excel_session_id and excel_session_id in imported_metadata:
# Use uploaded Excel file
lookup = imported_metadata[excel_session_id]
else:
# Try default Excel file if available
try:
lookup = get_metadata_lookup()
except:
return jsonify({'error': 'Please upload an Excel file first using the Upload Excel File button'}), 400
# Get imported metadata (only if using import source)
import_map = None
if metadata_source == 'import' and import_session_id and import_session_id in imported_metadata:
import_map = imported_metadata[import_session_id]
importer = MetadataImporter()
elif metadata_source == 'import':
# Import source selected but no import session available
return jsonify({'error': 'Please import a metadata file first using the Import button'}), 400
for file in files:
try:
# Save uploaded file
filename = safe_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# Detect file type
file_type = FileDetector.detect_file_type(filepath)
if file_type == FileType.UNSUPPORTED:
results.append({
'filename': filename,
'error': 'Unsupported file type'
})
continue
# Get extractor for this file type
extractor = extractors.get(file_type)
if not extractor:
results.append({
'filename': filename,
'error': 'No extractor available'
})
continue
# Read current metadata from file
old_metadata = extractor.read_metadata(filepath)
# Generate metadata based on chosen source
excel_found = False
new_metadata = {'title': '', 'subject': '', 'keywords': ''}
if metadata_source == 'excel' and lookup:
# Lookup metadata from Excel by filename
excel_data = lookup.lookup_by_filename(filename)
if excel_data:
new_metadata = {
'title': excel_data.get('title', ''),
'subject': excel_data.get('description', ''),
'keywords': ''
}
excel_found = True
else:
# No Excel data found - use filename as fallback
new_metadata = {
'title': Path(filename).stem,
'subject': f'No metadata found in Excel for {filename}',
'keywords': ''
}
elif metadata_source == 'manual':
# Return empty metadata for user to fill manually
new_metadata = {
'title': Path(filename).stem, # Suggest filename
'subject': '',
'keywords': ''
}
elif metadata_source == 'ai':
# AI generation using MetadataAnalyzer
analyzer = get_ai_analyzer()
if analyzer:
try:
# Extract content from file
content = extractor.extract_content(str(filepath))
if not content or len(content.strip()) < 10:
# Not enough content for AI analysis
new_metadata = {
'title': Path(filename).stem,
'subject': 'Insufficient content for AI analysis',
'keywords': '',
'_ai_error': 'Not enough text content extracted'
}
else:
# Generate metadata with AI
new_metadata = analyzer.analyze_content(content, filename, file_type)
# Log token usage if available
if '_tokens_used' in new_metadata:
import logging
logging.getLogger(__name__).info(
f"AI tokens used for {filename}: {new_metadata['_tokens_used']}"
)
except Exception as e:
import logging
logging.getLogger(__name__).error(f"AI generation failed for {filename}: {e}")
new_metadata = {
'title': Path(filename).stem,
'subject': f'AI generation error: {str(e)}',
'keywords': '',
'_ai_error': str(e)
}
else:
# AI not configured
new_metadata = {
'title': Path(filename).stem,
'subject': 'AI generation not available (OpenAI API key not configured)',
'keywords': '',
'_ai_error': 'OpenAI API key not configured'
}
elif metadata_source == 'import':
# Import from external file (CSV, Excel, JSON)
if import_map and importer:
# Look up metadata for this file
imported = importer.get_metadata_for_file(import_map, filename)
if imported:
new_metadata = imported
excel_found = True # Mark as found in import
else:
# No metadata found in import file
new_metadata = {
'title': Path(filename).stem,
'subject': f'No metadata found in imported file for {filename}',
'keywords': ''
}
else:
# Import source not available
new_metadata = {
'title': Path(filename).stem,
'subject': 'Import metadata not loaded',
'keywords': ''
}
file_info = {
'success': True,
'filename': filename,
'file_type': file_type.value,
'current_metadata': old_metadata,
'suggested_metadata': new_metadata,
'filepath': filepath,
'metadata_source': metadata_source,
'excel_found': excel_found
}
results.append(file_info)
sessions[session_id]['files'].append(file_info)
except Exception as e:
results.append({
'filename': file.filename,
'error': str(e)
})
return jsonify({
'success': True,
'session_id': session_id,
'files': results
})
@app.route('/update', methods=['POST'])
@login_required
def update_metadata():
"""Update file metadata using suggested metadata from session."""
data = request.json
filepath = data.get('filepath')
session_id = data.get('session_id')
file_index = data.get('file_index')
output_dir = data.get('output_dir', '') # User-selected output directory
if not filepath or not os.path.exists(filepath):
return jsonify({'error': 'File not found'}), 404
# Validate session
if not session_id or session_id not in sessions:
return jsonify({'error': 'Invalid or expired session'}), 400
# Validate file index
if file_index is None or file_index >= len(sessions[session_id]['files']):
return jsonify({'error': 'Invalid file index'}), 400
try:
# Get file info from session
file_info = sessions[session_id]['files'][file_index]
# Get suggested metadata from session
new_metadata = file_info.get('suggested_metadata', {})
if not new_metadata or not new_metadata.get('title'):
return jsonify({'error': 'No metadata available for this file'}), 400
# Detect file type
file_type = FileDetector.detect_file_type(filepath)
if file_type == FileType.UNSUPPORTED:
return jsonify({'error': 'Unsupported file type'}), 400
# Get updater
updater = updaters.get(file_type)
if not updater:
return jsonify({'error': 'No updater available for this file type'}), 400
filename = Path(filepath).name
# In Docker mode, always update in-place (user will download via browser)
# In local mode, allow copying to output directory
if not DOCKER_MODE and output_dir and os.path.isdir(output_dir):
output_path = os.path.join(output_dir, filename)
shutil.copy2(filepath, output_path)
target_file = output_path
else:
# Update in-place for Docker or when no output_dir specified
target_file = filepath
# Update the file metadata
success = updater.update_metadata(target_file, new_metadata, backup=False)
if not success:
return jsonify({'error': 'Failed to update metadata'}), 500
# Verify update
verified = updater.verify_metadata(target_file, new_metadata)
return jsonify({
'success': True,
'message': 'Metadata updated successfully',
'verified': verified,
'output_path': target_file,
'metadata': new_metadata
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/update-manual', methods=['POST'])
@login_required
def update_manual_metadata():
"""Update file with manually entered metadata."""
data = request.json
session_id = data.get('session_id')
file_index = data.get('file_index')
# Validate and sanitize metadata
custom_metadata = {
'title': data.get('title', '').strip()[:200],
'subject': data.get('subject', '').strip()[:300],
'keywords': data.get('keywords', '').strip()[:500],
'author': data.get('author', '').strip()[:100],
'copyright': data.get('copyright', '').strip()[:150],
'comments': data.get('comments', '').strip()[:500]
}
# Add custom fields if provided
custom_fields = data.get('custom_fields', {})
if custom_fields and isinstance(custom_fields, dict):
for field_name, field_value in custom_fields.items():
# Sanitize custom field names and values
safe_name = str(field_name).strip()[:50]
safe_value = str(field_value).strip()[:200]
if safe_name and safe_value:
custom_metadata[safe_name] = safe_value
# Validate session
if not session_id or session_id not in sessions:
return jsonify({'error': 'Invalid or expired session'}), 400
# Validate file index
if file_index is None or file_index >= len(sessions[session_id]['files']):
return jsonify({'error': 'Invalid file index'}), 400
try:
# Get file info from session
file_info = sessions[session_id]['files'][file_index]
filepath = file_info.get('filepath')
if not filepath or not os.path.exists(filepath):
return jsonify({'error': 'File not found'}), 404
# Detect file type
file_type = FileDetector.detect_file_type(filepath)
if file_type == FileType.UNSUPPORTED:
return jsonify({'error': 'Unsupported file type'}), 400
# Get updater for this file type
updater = updaters.get(file_type)
if not updater:
return jsonify({'error': 'No updater available for this file type'}), 400
# Update metadata
success = updater.update_metadata(filepath, custom_metadata, backup=True)
if not success:
return jsonify({'error': 'Failed to update metadata'}), 500
# Update session with new metadata
sessions[session_id]['files'][file_index]['suggested_metadata'] = custom_metadata
# Verify update
verified = updater.verify_metadata(filepath, custom_metadata)
return jsonify({
'status': 'success',
'message': 'Metadata updated successfully',
'verified': verified,
'metadata': custom_metadata
})
except Exception as e:
return jsonify({'error': f'Error updating metadata: {str(e)}'}), 500
@app.route('/download/<filename>')
@login_required
def download_file(filename):
"""Download processed file."""
filepath = os.path.join(app.config['UPLOAD_FOLDER'], safe_filename(filename))
if os.path.exists(filepath):
return send_file(filepath, as_attachment=True)
return jsonify({'error': 'File not found'}), 404
@app.route('/download-selected', methods=['POST'])
@login_required
def download_selected_files():
"""Download selected files from session as ZIP archive."""
try:
data = request.json
app.logger.info(f"download-selected called with data: {data}")
session_id = data.get('session_id')
file_indices = data.get('file_indices', [])
app.logger.info(f"session_id: {session_id}, file_indices: {file_indices}")
app.logger.info(f"Available sessions: {list(sessions.keys())}")
if session_id not in sessions:
app.logger.error(f"Session not found: {session_id}")
return jsonify({'error': 'Session not found'}), 404
if not file_indices:
app.logger.error("No files selected")
return jsonify({'error': 'No files selected'}), 400
session_data = sessions[session_id]
all_files = session_data.get('files', [])
app.logger.info(f"Found {len(all_files)} files in session")
if not all_files:
app.logger.error("No files in session")
return jsonify({'error': 'No files in session'}), 404
# Create a temporary ZIP file
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
zip_filename = f'oliver_metadata_files_{timestamp}.zip'
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename)
app.logger.info(f"Creating ZIP at: {zip_path}")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for index in file_indices:
if 0 <= index < len(all_files):
file_info = all_files[index]
filepath = file_info['filepath']
filename = file_info['filename']
app.logger.info(f"Adding file {index}: {filename} from {filepath}")
if os.path.exists(filepath):
# Add file to ZIP with its original name
zipf.write(filepath, filename)
app.logger.info(f"Added {filename} to ZIP")
else:
app.logger.warning(f"File not found: {filepath}")
app.logger.info(f"ZIP created successfully, sending file")
# Send the ZIP file and delete it after sending
return send_file(
zip_path,
as_attachment=True,
download_name=zip_filename,
mimetype='application/zip'
)
except Exception as e:
app.logger.error(f"Error in download_selected_files: {str(e)}", exc_info=True)
if 'zip_path' in locals() and os.path.exists(zip_path):
os.remove(zip_path)
return jsonify({'error': f'Error creating ZIP archive: {str(e)}'}), 500
@app.route('/cleanup-session/<session_id>', methods=['POST'])
@login_required
def cleanup_session(session_id):
"""Clean up session files manually."""
try:
cleanup_session_files(session_id)
return jsonify({'success': True, 'message': 'Session cleaned up successfully'})
except Exception as e:
app.logger.error(f"Error cleaning up session: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/upload-excel', methods=['POST'])
@login_required
def upload_excel():
"""Upload Excel file for Excel Lookup metadata source."""
if 'excel_file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['excel_file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
try:
import pandas as pd
# Save temp file
excel_filename = safe_filename(file.filename)
temp_path = Path(app.config['UPLOAD_FOLDER']) / excel_filename
file.save(str(temp_path))
# Preview Excel structure instead of loading directly
excel_file = pd.ExcelFile(str(temp_path))
sheet_names = excel_file.sheet_names
# Get columns and sample data from first sheet
preview_data = {}
for sheet_name in sheet_names[:5]: # Limit to first 5 sheets
df = pd.read_excel(excel_file, sheet_name=sheet_name, nrows=5)
preview_data[sheet_name] = {
'columns': df.columns.tolist(),
'sample_data': df.head(3).fillna('').to_dict('records')
}
# Store file path temporarily for later configuration
excel_session_id = f"excel_{secrets.token_urlsafe(8)}"
if 'excel_files' not in imported_metadata:
imported_metadata['excel_files'] = {}
imported_metadata['excel_files'][excel_session_id] = {
'path': str(temp_path),
'filename': excel_filename,
'sheet_names': sheet_names
}
return jsonify({
'success': True,
'excel_session_id': excel_session_id,
'filename': excel_filename,
'sheets': sheet_names,
'preview': preview_data,
'message': f'Excel file uploaded. Please configure column mapping.'
})
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Excel upload failed: {e}")
return jsonify({'error': f'Excel upload failed: {str(e)}'}), 500
@app.route('/preview-excel-sheet', methods=['POST'])
@login_required
def preview_excel_sheet():
"""Preview a specific sheet from uploaded Excel file."""
try:
import pandas as pd
data = request.json
excel_session_id = data.get('excel_session_id')
sheet_name = data.get('sheet_name')
if not excel_session_id or excel_session_id not in imported_metadata.get('excel_files', {}):
return jsonify({'error': 'Invalid session ID'}), 400
excel_info = imported_metadata['excel_files'][excel_session_id]
excel_path = excel_info['path']
# Read the specific sheet
df = pd.read_excel(excel_path, sheet_name=sheet_name, nrows=10)
return jsonify({
'success': True,
'columns': df.columns.tolist(),
'sample_data': df.head(5).fillna('').to_dict('records')
})
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Sheet preview failed: {e}")
return jsonify({'error': f'Sheet preview failed: {str(e)}'}), 500
@app.route('/configure-excel-mapping', methods=['POST'])
@login_required
def configure_excel_mapping():
"""Configure Excel column mapping and load metadata."""
try:
import pandas as pd
data = request.json
excel_session_id = data.get('excel_session_id')
sheet_name = data.get('sheet_name')
column_mapping = data.get('column_mapping', {}) # {filename: 'col', title: 'col', ...}
if not excel_session_id or excel_session_id not in imported_metadata.get('excel_files', {}):
return jsonify({'error': 'Invalid session ID'}), 400
excel_info = imported_metadata['excel_files'][excel_session_id]
excel_path = excel_info['path']
# Read the configured sheet
df = pd.read_excel(excel_path, sheet_name=sheet_name)
# Build metadata map using configured columns
metadata_map = {}
filename_col = column_mapping.get('filename')
title_col = column_mapping.get('title')
description_col = column_mapping.get('description')
keywords_col = column_mapping.get('keywords')
if not filename_col:
return jsonify({'error': 'Filename column is required'}), 400
for _, row in df.iterrows():
filename = row.get(filename_col)
if pd.notna(filename) and str(filename).strip():
# Get filename without extension for indexing (case-insensitive)
filename_stem = Path(str(filename).strip()).stem.lower()
metadata = {
'title': str(row.get(title_col, '')).strip() if title_col and pd.notna(row.get(title_col)) else '',
'description': str(row.get(description_col, '')).strip() if description_col and pd.notna(row.get(description_col)) else '',
'keywords': str(row.get(keywords_col, '')).strip() if keywords_col and pd.notna(row.get(keywords_col)) else '',
'original_filename': str(filename).strip()
}
metadata_map[filename_stem] = metadata
# Create a simple lookup object
class ConfiguredExcelLookup:
def __init__(self, metadata_map):
self.metadata_map = metadata_map
self.filename_to_metadata = metadata_map
def lookup_by_filename(self, filename: str):
filename_stem = Path(filename).stem.lower()
return self.metadata_map.get(filename_stem)
lookup = ConfiguredExcelLookup(metadata_map)
# Store configured lookup
imported_metadata[excel_session_id] = lookup
# Get stats
stats = {
'total_records': len(metadata_map),
'with_title': sum(1 for v in metadata_map.values() if v.get('title')),
'with_description': sum(1 for v in metadata_map.values() if v.get('description')),
'with_keywords': sum(1 for v in metadata_map.values() if v.get('keywords'))
}
return jsonify({
'success': True,
'excel_session_id': excel_session_id,
'stats': stats,
'message': f'Configured mapping for {stats["total_records"]} records from sheet "{sheet_name}"'
})
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Excel configuration failed: {e}")
return jsonify({'error': f'Excel configuration failed: {str(e)}'}), 500
@app.route('/import-metadata', methods=['POST'])
@login_required
def import_metadata():
"""Upload import file and preview structure for mapping."""
if 'import_file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['import_file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
try:
import pandas as pd
# Save temp file
import_filename = safe_filename(file.filename)
temp_path = Path(app.config['UPLOAD_FOLDER']) / import_filename
file.save(str(temp_path))
file_ext = temp_path.suffix.lower()
# Read file and get structure
if file_ext == '.csv':
df = pd.read_csv(str(temp_path), nrows=5, encoding='utf-8')
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(str(temp_path), nrows=5)
elif file_ext == '.json':
import json
with open(str(temp_path), 'r', encoding='utf-8') as f:
data = json.load(f)
# Convert to DataFrame
if isinstance(data, list):
df = pd.DataFrame(data[:5])
elif isinstance(data, dict):
df = pd.DataFrame([data])
else:
return jsonify({'error': 'Invalid JSON format'}), 400
else:
return jsonify({'error': f'Unsupported file format: {file_ext}'}), 400
columns = df.columns.tolist()
sample_data = df.fillna('').to_dict('records')
# Store file path for later configuration
import_session_id = f"import_{secrets.token_urlsafe(8)}"
if 'import_files' not in imported_metadata:
imported_metadata['import_files'] = {}
imported_metadata['import_files'][import_session_id] = {
'path': str(temp_path),
'filename': import_filename,
'file_type': file_ext
}
return jsonify({
'success': True,
'import_session_id': import_session_id,
'filename': import_filename,
'columns': columns,
'sample_data': sample_data,
'message': f'Import file uploaded. Please configure column mapping.'
})
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Import upload failed: {e}")
return jsonify({'error': f'Import upload failed: {str(e)}'}), 500
@app.route('/configure-import-mapping', methods=['POST'])
@login_required
def configure_import_mapping():
"""Configure import column mapping and load metadata."""
try:
import pandas as pd
import json
data = request.json
import_session_id = data.get('import_session_id')
column_mapping = data.get('column_mapping', {})
if not import_session_id or import_session_id not in imported_metadata.get('import_files', {}):
return jsonify({'error': 'Invalid session ID'}), 400
import_info = imported_metadata['import_files'][import_session_id]
import_path = import_info['path']
file_ext = import_info['file_type']
# Read the full file
if file_ext == '.csv':
df = pd.read_csv(import_path, encoding='utf-8')
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(import_path)
elif file_ext == '.json':
with open(import_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
if isinstance(json_data, list):
df = pd.DataFrame(json_data)
else:
df = pd.DataFrame([json_data])
# Build metadata map using configured columns
metadata_map = {}
filename_col = column_mapping.get('filename')
title_col = column_mapping.get('title')
subject_col = column_mapping.get('subject')
keywords_col = column_mapping.get('keywords')
if not filename_col:
return jsonify({'error': 'Filename column is required'}), 400
for _, row in df.iterrows():
filename = row.get(filename_col)
if pd.notna(filename) and str(filename).strip():
filename_stem = Path(str(filename).strip()).stem.lower()
metadata = {
'title': str(row.get(title_col, '')).strip() if title_col and pd.notna(row.get(title_col)) else '',
'subject': str(row.get(subject_col, '')).strip() if subject_col and pd.notna(row.get(subject_col)) else '',
'keywords': str(row.get(keywords_col, '')).strip() if keywords_col and pd.notna(row.get(keywords_col)) else '',
'original_filename': str(filename).strip()
}
metadata_map[filename_stem] = metadata
# Store configured metadata map
imported_metadata[import_session_id] = metadata_map
# Clean up temp file
Path(import_path).unlink(missing_ok=True)
# Get stats
stats = {
'total_records': len(metadata_map),
'with_title': sum(1 for v in metadata_map.values() if v.get('title')),
'with_subject': sum(1 for v in metadata_map.values() if v.get('subject')),
'with_keywords': sum(1 for v in metadata_map.values() if v.get('keywords'))
}
return jsonify({
'success': True,
'import_session_id': import_session_id,
'stats': stats,
'message': f'Configured mapping for {stats["total_records"]} records'
})
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Import configuration failed: {e}")
return jsonify({'error': f'Import configuration failed: {str(e)}'}), 500
@app.route('/preview-import', methods=['POST'])
@login_required
def preview_import():
"""Preview file structure and suggest field mappings."""
if 'import_file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['import_file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
try:
# Save temp file
import_filename = safe_filename(file.filename)
temp_path = Path(app.config['UPLOAD_FOLDER']) / import_filename
file.save(str(temp_path))
# Preview file structure
importer = MetadataImporter()
columns, sample_rows, suggestions = importer.preview_file_structure(str(temp_path))
# Clean up temp file
temp_path.unlink()
# Format suggestions for frontend
formatted_suggestions = {}
for source_field, suggestion_data in suggestions.items():
formatted_suggestions[source_field] = {
'best_match': suggestion_data['best_match'],
'confidence': round(suggestion_data['confidence'], 2),
'alternatives': [
{'field': alt['field'], 'confidence': round(alt['confidence'], 2)}
for alt in suggestion_data.get('alternatives', [])
]
}
return jsonify({
'success': True,
'columns': columns,
'sample_rows': sample_rows[:5], # Limit to 5 rows
'suggestions': formatted_suggestions,
'filename': import_filename
})
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Preview failed: {e}")
# Try to clean up temp file
try:
if temp_path.exists():
temp_path.unlink()
except:
pass
return jsonify({'error': f'Preview failed: {str(e)}'}), 500
@app.route('/stats')
@login_required
def get_stats():
"""Get Excel metadata statistics."""
try:
lookup = get_metadata_lookup()
stats = lookup.get_stats()
return jsonify({
'success': True,
'stats': stats
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Template Management Endpoints
template_manager = TemplateManager()
@app.route('/templates/list', methods=['GET'])
@login_required
def list_templates():
"""List all available templates."""
try:
templates = template_manager.list_templates()
return jsonify({
'success': True,
'templates': templates
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/templates/save', methods=['POST'])
@login_required
def save_template():
"""Save a new template."""
try:
data = request.json
name = data.get('name', '').strip()
if not name:
return jsonify({'error': 'Template name is required'}), 400
template = template_manager.create_template(
name=name,
title_template=data.get('title', ''),
subject_template=data.get('subject', ''),
keywords_template=data.get('keywords', ''),
description=data.get('description', '')
)
success = template_manager.save_template(template)
if success:
return jsonify({
'success': True,
'message': f'Template "{name}" saved successfully',
'template': template
})
else:
return jsonify({'error': 'Failed to save template'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/templates/load/<name>', methods=['GET'])
@login_required
def load_template(name):
"""Load a template by name."""
try:
template = template_manager.load_template(name)
if template:
return jsonify({
'success': True,
'template': template
})
else:
return jsonify({'error': f'Template "{name}" not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/templates/delete/<name>', methods=['DELETE'])
@login_required
def delete_template(name):
"""Delete a template."""
try:
success = template_manager.delete_template(name)
if success:
return jsonify({
'success': True,
'message': f'Template "{name}" deleted successfully'
})
else:
return jsonify({'error': f'Template "{name}" not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/templates/apply', methods=['POST'])
@login_required
def apply_template():
"""Apply a template to generate metadata for files."""
try:
data = request.json
template_name = data.get('template_name', '').strip()
file_indices = data.get('file_indices', [])
session_id = data.get('session_id')
custom_vars = data.get('custom_vars', {})
if not template_name:
return jsonify({'error': 'Template name is required'}), 400
if not session_id or session_id not in sessions:
return jsonify({'error': 'Invalid or expired session'}), 400
# Load template
template = template_manager.load_template(template_name)
if not template:
return jsonify({'error': f'Template "{template_name}" not found'}), 404
# Apply template to each file
results = []
for file_index in file_indices:
if file_index >= len(sessions[session_id]['files']):
continue
file_info = sessions[session_id]['files'][file_index]
filename = file_info.get('filename', 'unknown')
# Generate metadata from template
metadata = template_manager.apply_template(
template=template,
filename=filename,
user='web_user',
custom_vars=custom_vars
)
# Update file metadata in session
sessions[session_id]['files'][file_index]['suggested_metadata'] = metadata
results.append({
'file_index': file_index,
'filename': filename,
'metadata': metadata
})
return jsonify({
'success': True,
'message': f'Template applied to {len(results)} file(s)',
'results': results
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/templates/preview', methods=['POST'])
@login_required
def preview_template():
"""Preview template output with sample data."""
try:
data = request.json
template = {
'name': 'preview',
'title': data.get('title', ''),
'subject': data.get('subject', ''),
'keywords': data.get('keywords', '')
}
sample_filename = data.get('sample_filename', 'example.pdf')
custom_vars = data.get('custom_vars', {})
preview = template_manager.preview_template(
template=template,
sample_filename=sample_filename,
user='web_user',
custom_vars=custom_vars
)
# Also get available variables
available_vars = template_manager.get_available_variables()
return jsonify({
'success': True,
'preview': preview,
'available_variables': available_vars
})
except Exception as e:
return jsonify({'error': str(e)}), 500
def open_browser():
"""Open browser after short delay."""
sleep(1.5)
webbrowser.open('http://localhost:5001')
if __name__ == '__main__':
print("="*60)
print(f"{Config.APP_NAME} v{Config.APP_VERSION} - Web Interface")
print("="*60)
# Check dependencies
print("\n🔍 Checking dependencies...")
# Check Excel file
if not EXCEL_PATH.exists():
print(f"⚠️ Warning: Excel file not found at {EXCEL_PATH}")
print(" Excel metadata lookup will not be available")
print(" Please ensure the Excel file is in the project root")
else:
print(f"✓ Excel file found: {EXCEL_PATH.name}")
# Check OpenAI API key (optional)
if Config.OPENAI_API_KEY:
print("✓ OpenAI API key configured (AI metadata generation available)")
else:
print(" OpenAI API key not configured (AI generation disabled)")
# Check ExifTool (optional)
if Config.check_exiftool():
print("✓ ExifTool available for enhanced metadata operations")
else:
print(" ExifTool not installed (using Python libraries)")
print("\nMetadata sources available:")
print(" • Excel lookup (Celum ID mapping)")
if Config.OPENAI_API_KEY:
print(" • AI generation (OpenAI)")
print(" • Manual entry")
print(" • File import (CSV/Excel/JSON)")
print("\nStarting server...")
# Docker mode configuration
if DOCKER_MODE:
print("Running in Docker mode")
print("Server will be accessible at http://0.0.0.0:5001")
host = '0.0.0.0'
else:
print("Opening browser at http://localhost:5001")
host = '127.0.0.1'
# Open browser in background (only in local mode)
threading.Thread(target=open_browser, daemon=True).start()
print("\nPress Ctrl+C to stop the server")
print("="*60)
# Clean up old files on startup
if DOCKER_MODE:
print("\n🧹 Cleaning up old files...")
cleanup_old_files(max_age_hours=24)
# Run Flask app
app.run(debug=False, port=5001, host=host)