loreal-video-optimizer/backend/app.py
2026-01-08 18:18:48 +05:30

675 lines
22 KiB
Python

"""
Flask backend for video optimization tool
"""
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
import os
import uuid
import json
import time
from datetime import datetime
from video_processor import VideoProcessor
from conversion_logger import ConversionLogger
from platform_specs import (
PLATFORM_SPECS,
FILENAME_PATTERNS,
ASPECT_RATIO_PATTERNS,
detect_platform_from_filename,
detect_aspect_ratio_from_filename,
get_all_platforms,
get_platform_formats,
get_platform_info
)
# Load environment variables from .env file
load_dotenv()
app = Flask(__name__)
# ==============================================================================
# ENVIRONMENT-AWARE CONFIGURATION
# ==============================================================================
# Read environment setting (development or production)
FLASK_ENV = os.getenv('FLASK_ENV', 'development')
IS_PRODUCTION = FLASK_ENV == 'production'
# Configure CORS based on environment
if IS_PRODUCTION:
# Production: Restrict CORS to frontend URL only
FRONTEND_URL = os.getenv('FRONTEND_URL', 'https://ai-sandbox.oliver.solutions/video-optimizer')
CORS(app, origins=[FRONTEND_URL])
else:
# Development: Allow all origins for easier testing
CORS(app)
# Store factory defaults (original specs from platform_specs.py)
import copy
FACTORY_DEFAULTS = copy.deepcopy(PLATFORM_SPECS)
FACTORY_FILENAME_PATTERNS = copy.deepcopy(FILENAME_PATTERNS)
FACTORY_ASPECT_RATIO_PATTERNS = copy.deepcopy(ASPECT_RATIO_PATTERNS)
# ==============================================================================
# FILE AND FOLDER CONFIGURATION
# ==============================================================================
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads')
OUTPUT_FOLDER = os.path.join(os.path.dirname(__file__), 'outputs')
LOGS_FOLDER = os.path.join(os.path.dirname(__file__), 'logs')
CONVERSION_LOGS_FOLDER = os.path.join(LOGS_FOLDER, 'conversions')
ALLOWED_EXTENSIONS = {'mp4', 'mov', 'avi', 'mkv', 'webm', 'flv', 'wmv', 'm4v'}
# Read max file size from environment (in MB), convert to bytes
MAX_FILE_SIZE_MB = int(os.getenv('MAX_FILE_SIZE_MB', '500'))
MAX_FILE_SIZE = MAX_FILE_SIZE_MB * 1024 * 1024
# Read file retention settings
FILE_RETENTION_HOURS = int(os.getenv('FILE_RETENTION_HOURS', '24'))
# Create folders if they don't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
os.makedirs(LOGS_FOLDER, exist_ok=True)
os.makedirs(CONVERSION_LOGS_FOLDER, exist_ok=True)
# ==============================================================================
# FLASK APPLICATION CONFIGURATION
# ==============================================================================
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['OUTPUT_FOLDER'] = OUTPUT_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
# Read backend port from environment
BACKEND_PORT = int(os.getenv('BACKEND_PORT', '5000'))
# ==============================================================================
# INITIALIZE CONVERSION LOGGER
# ==============================================================================
conversion_logger = ConversionLogger(CONVERSION_LOGS_FOLDER)
def allowed_file(filename):
"""Check if file extension is allowed"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/api/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
ffmpeg_installed = VideoProcessor.check_ffmpeg_installed()
return jsonify({
'status': 'ok',
'ffmpeg_installed': ffmpeg_installed,
'timestamp': datetime.now().isoformat()
})
@app.route('/api/config', methods=['GET'])
def get_config():
"""Get Azure AD configuration for Microsoft SSO"""
return jsonify({
'AZURE_CLIENT_ID': os.getenv('AZURE_CLIENT_ID'),
'AZURE_TENANT_ID': os.getenv('AZURE_TENANT_ID'),
'REDIRECT_URI': os.getenv('REDIRECT_URI', 'http://localhost:3000')
})
@app.route('/api/platforms', methods=['GET'])
def get_platforms():
"""Get all available platforms and their specifications"""
platforms_list = []
for platform_key in get_all_platforms():
platform_info = get_platform_info(platform_key)
platforms_list.append({
'key': platform_key,
'name': platform_info['name'],
'codec': platform_info['codec'],
'formats': platform_info['formats']
})
return jsonify({
'platforms': platforms_list
})
@app.route('/api/detect', methods=['POST'])
def detect_from_filename():
"""Detect platform and aspect ratio from filename"""
data = request.get_json()
filename = data.get('filename', '')
platform = detect_platform_from_filename(filename)
aspect_ratio = detect_aspect_ratio_from_filename(filename)
return jsonify({
'platform': platform,
'aspect_ratio': aspect_ratio,
'detected': platform is not None or aspect_ratio is not None
})
@app.route('/api/upload', methods=['POST'])
def upload_file():
"""Handle file upload and return video info"""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
try:
# Generate unique filename
original_filename = secure_filename(file.filename)
file_id = str(uuid.uuid4())
file_extension = original_filename.rsplit('.', 1)[1].lower()
unique_filename = f"{file_id}.{file_extension}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
# Save file
file.save(file_path)
# Probe video to get info
processor = VideoProcessor(file_path)
video_info = processor.get_video_info()
# Detect platform and aspect ratio from filename
platform = detect_platform_from_filename(original_filename)
aspect_ratio = detect_aspect_ratio_from_filename(original_filename)
return jsonify({
'success': True,
'file_id': file_id,
'filename': original_filename,
'video_info': video_info,
'detected_platform': platform,
'detected_aspect_ratio': aspect_ratio
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/convert', methods=['POST'])
def convert_video():
"""Convert video based on platform and aspect ratio"""
data = request.get_json()
file_id = data.get('file_id')
platform = data.get('platform')
aspect_ratio = data.get('aspect_ratio')
custom_bitrate = data.get('custom_bitrate')
user_email = data.get('user_email', 'anonymous@local') # Get user email from request
if not all([file_id, platform, aspect_ratio]):
return jsonify({'error': 'Missing required parameters'}), 400
# Find input file
input_files = [f for f in os.listdir(app.config['UPLOAD_FOLDER'])
if f.startswith(file_id)]
if not input_files:
return jsonify({'error': 'Input file not found'}), 404
input_path = os.path.join(app.config['UPLOAD_FOLDER'], input_files[0])
# Start timing the conversion
start_time = time.time()
try:
# Get platform info to determine output container
platform_info = get_platform_info(platform)
if not platform_info:
return jsonify({'error': 'Invalid platform'}), 400
output_extension = platform_info['container']
output_filename = f"{file_id}_optimized.{output_extension}"
output_path = os.path.join(app.config['OUTPUT_FOLDER'], output_filename)
# Get input file size before conversion
input_size = os.path.getsize(input_path)
# Process video
processor = VideoProcessor(input_path)
result = processor.convert_video(
platform=platform,
aspect_ratio=aspect_ratio,
output_path=output_path,
custom_bitrate=custom_bitrate
)
# Calculate conversion duration
conversion_duration = time.time() - start_time
# Calculate size reduction
output_size = result['output_size']
size_reduction = ((input_size - output_size) / input_size) * 100
# Log successful conversion
conversion_logger.log_conversion(
user_email=user_email,
platform=platform,
aspect_ratio=aspect_ratio,
input_file_size=input_size,
output_file_size=output_size,
conversion_duration=conversion_duration,
status='success',
file_id=file_id,
error_message=None
)
return jsonify({
'success': True,
'output_file_id': file_id,
'output_filename': output_filename,
'input_size': input_size,
'output_size': output_size,
'size_reduction_percent': round(size_reduction, 2),
'conversion_details': result
})
except Exception as e:
# Calculate conversion duration (even for failures)
conversion_duration = time.time() - start_time
# Get file sizes (use 0 for output if conversion failed)
try:
input_size = os.path.getsize(input_path)
except:
input_size = 0
# Log failed conversion
conversion_logger.log_conversion(
user_email=user_email,
platform=platform,
aspect_ratio=aspect_ratio,
input_file_size=input_size,
output_file_size=0,
conversion_duration=conversion_duration,
status='failure',
file_id=file_id,
error_message=str(e)
)
return jsonify({'error': str(e)}), 500
@app.route('/api/download/<file_type>/<file_id>', methods=['GET'])
def download_file(file_type, file_id):
"""Download original or converted file"""
try:
if file_type == 'original':
folder = app.config['UPLOAD_FOLDER']
files = [f for f in os.listdir(folder) if f.startswith(file_id) and not 'optimized' in f]
elif file_type == 'optimized':
folder = app.config['OUTPUT_FOLDER']
files = [f for f in os.listdir(folder) if f.startswith(file_id)]
else:
return jsonify({'error': 'Invalid file type'}), 400
if not files:
return jsonify({'error': 'File not found'}), 404
file_path = os.path.join(folder, files[0])
return send_file(file_path, as_attachment=True)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/stream/<file_type>/<file_id>', methods=['GET'])
def stream_file(file_type, file_id):
"""Stream video for playback"""
try:
if file_type == 'original':
folder = app.config['UPLOAD_FOLDER']
files = [f for f in os.listdir(folder) if f.startswith(file_id) and not 'optimized' in f]
elif file_type == 'optimized':
folder = app.config['OUTPUT_FOLDER']
files = [f for f in os.listdir(folder) if f.startswith(file_id)]
else:
return jsonify({'error': 'Invalid file type'}), 400
if not files:
return jsonify({'error': 'File not found'}), 404
file_path = os.path.join(folder, files[0])
return send_file(file_path, mimetype='video/mp4')
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/cleanup/<file_id>', methods=['DELETE'])
def cleanup_files(file_id):
"""Delete uploaded and converted files"""
try:
deleted = []
# Clean upload folder
for filename in os.listdir(app.config['UPLOAD_FOLDER']):
if filename.startswith(file_id):
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
os.remove(file_path)
deleted.append(filename)
# Clean output folder
for filename in os.listdir(app.config['OUTPUT_FOLDER']):
if filename.startswith(file_id):
file_path = os.path.join(app.config['OUTPUT_FOLDER'], filename)
os.remove(file_path)
deleted.append(filename)
return jsonify({
'success': True,
'deleted_files': deleted
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# ADMIN ENDPOINTS
# ============================================================================
SPECS_FILE = os.path.join(os.path.dirname(__file__), 'platform_specs.json')
NAMING_FILE = os.path.join(os.path.dirname(__file__), 'naming_conventions.json')
def save_specs_to_file(specs):
"""Save platform specifications to JSON file"""
try:
with open(SPECS_FILE, 'w') as f:
json.dump(specs, f, indent=2)
return True
except Exception as e:
print(f"Error saving specs: {e}")
return False
def load_specs_from_file():
"""Load platform specifications from JSON file"""
try:
if os.path.exists(SPECS_FILE):
with open(SPECS_FILE, 'r') as f:
return json.load(f)
return None
except Exception as e:
print(f"Error loading specs: {e}")
return None
def save_naming_conventions(platform_patterns, aspect_ratio_patterns):
"""Save naming conventions to JSON file"""
try:
data = {
'platform_patterns': platform_patterns,
'aspect_ratio_patterns': aspect_ratio_patterns
}
with open(NAMING_FILE, 'w') as f:
json.dump(data, f, indent=2)
return True
except Exception as e:
print(f"Error saving naming conventions: {e}")
return False
def load_naming_conventions():
"""Load naming conventions from JSON file"""
try:
if os.path.exists(NAMING_FILE):
with open(NAMING_FILE, 'r') as f:
return json.load(f)
return None
except Exception as e:
print(f"Error loading naming conventions: {e}")
return None
@app.route('/api/admin/platforms', methods=['POST'])
def admin_add_platform():
"""Add a new platform configuration"""
try:
data = request.get_json()
platform_key = data.get('key')
if not platform_key or platform_key in PLATFORM_SPECS:
return jsonify({'error': 'Invalid or duplicate platform key'}), 400
# Add to PLATFORM_SPECS
PLATFORM_SPECS[platform_key] = {
'name': data.get('name'),
'codec': data.get('codec'),
'container': data.get('container', 'mp4'),
'formats': data.get('formats', [])
}
# Save to file
save_specs_to_file(PLATFORM_SPECS)
return jsonify({
'success': True,
'message': f'Platform {platform_key} added successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/platforms/<platform_key>', methods=['PUT'])
def admin_update_platform(platform_key):
"""Update an existing platform configuration"""
try:
data = request.get_json()
if platform_key not in PLATFORM_SPECS:
return jsonify({'error': 'Platform not found'}), 404
# Update platform
PLATFORM_SPECS[platform_key] = {
'name': data.get('name'),
'codec': data.get('codec'),
'container': data.get('container', 'mp4'),
'formats': data.get('formats', [])
}
# Save to file
save_specs_to_file(PLATFORM_SPECS)
return jsonify({
'success': True,
'message': f'Platform {platform_key} updated successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/platforms/<platform_key>', methods=['DELETE'])
def admin_delete_platform(platform_key):
"""Delete a platform configuration"""
try:
if platform_key not in PLATFORM_SPECS:
return jsonify({'error': 'Platform not found'}), 404
# Remove from PLATFORM_SPECS
del PLATFORM_SPECS[platform_key]
# Save to file
save_specs_to_file(PLATFORM_SPECS)
return jsonify({
'success': True,
'message': f'Platform {platform_key} deleted successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/export', methods=['GET'])
def admin_export_specs():
"""Export all platform specifications"""
try:
return jsonify({
'success': True,
'specs': PLATFORM_SPECS,
'exported_at': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/import', methods=['POST'])
def admin_import_specs():
"""Import platform specifications from JSON"""
try:
data = request.get_json()
specs = data.get('specs')
if not specs or not isinstance(specs, dict):
return jsonify({'error': 'Invalid specifications format'}), 400
# Replace all specs
PLATFORM_SPECS.clear()
PLATFORM_SPECS.update(specs)
# Save to file
save_specs_to_file(PLATFORM_SPECS)
return jsonify({
'success': True,
'message': 'Specifications imported successfully',
'platforms_count': len(PLATFORM_SPECS)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/reset-factory', methods=['POST'])
def admin_reset_factory():
"""Reset platform specifications to factory defaults"""
try:
# Restore from factory defaults
PLATFORM_SPECS.clear()
PLATFORM_SPECS.update(copy.deepcopy(FACTORY_DEFAULTS))
FILENAME_PATTERNS.clear()
FILENAME_PATTERNS.update(copy.deepcopy(FACTORY_FILENAME_PATTERNS))
ASPECT_RATIO_PATTERNS.clear()
ASPECT_RATIO_PATTERNS.update(copy.deepcopy(FACTORY_ASPECT_RATIO_PATTERNS))
# Delete the saved JSON files if they exist
if os.path.exists(SPECS_FILE):
os.remove(SPECS_FILE)
if os.path.exists(NAMING_FILE):
os.remove(NAMING_FILE)
return jsonify({
'success': True,
'message': 'Platform specifications and naming conventions reset to factory defaults',
'platforms_count': len(PLATFORM_SPECS)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/naming-conventions', methods=['GET'])
def admin_get_naming_conventions():
"""Get current naming conventions"""
try:
return jsonify({
'success': True,
'platform_patterns': FILENAME_PATTERNS,
'aspect_ratio_patterns': ASPECT_RATIO_PATTERNS
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/naming-conventions', methods=['POST'])
def admin_save_naming_conventions():
"""Save naming conventions"""
try:
data = request.get_json()
platform_patterns = data.get('platform_patterns', {})
aspect_ratio_patterns = data.get('aspect_ratio_patterns', {})
# Update in-memory patterns
FILENAME_PATTERNS.clear()
FILENAME_PATTERNS.update(platform_patterns)
ASPECT_RATIO_PATTERNS.clear()
ASPECT_RATIO_PATTERNS.update(aspect_ratio_patterns)
# Save to file
save_naming_conventions(platform_patterns, aspect_ratio_patterns)
return jsonify({
'success': True,
'message': 'Naming conventions saved successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Load specs from file if exists
saved_specs = load_specs_from_file()
if saved_specs:
PLATFORM_SPECS.clear()
PLATFORM_SPECS.update(saved_specs)
print(f"Loaded {len(saved_specs)} platform configurations from file")
# Load naming conventions from file if exists
saved_naming = load_naming_conventions()
if saved_naming:
FILENAME_PATTERNS.clear()
FILENAME_PATTERNS.update(saved_naming.get('platform_patterns', {}))
ASPECT_RATIO_PATTERNS.clear()
ASPECT_RATIO_PATTERNS.update(saved_naming.get('aspect_ratio_patterns', {}))
print(f"Loaded naming conventions from file")
# Check FFmpeg installation
if not VideoProcessor.check_ffmpeg_installed():
print("WARNING: FFmpeg not found. Please install FFmpeg to use video conversion features.")
print("Install with: brew install ffmpeg (macOS) or apt-get install ffmpeg (Linux)")
print("=" * 80)
print("VIDEO OPTIMIZER - STARTING SERVER")
print("=" * 80)
print(f"Environment: {FLASK_ENV}")
print(f"Debug mode: {not IS_PRODUCTION}")
print(f"Backend port: {BACKEND_PORT}")
print(f"Upload folder: {UPLOAD_FOLDER}")
print(f"Output folder: {OUTPUT_FOLDER}")
print(f"Logs folder: {LOGS_FOLDER}")
print(f"Max file size: {MAX_FILE_SIZE_MB}MB")
print(f"File retention: {FILE_RETENTION_HOURS} hours")
print(f"Platforms configured: {len(PLATFORM_SPECS)}")
if IS_PRODUCTION:
print(f"CORS restricted to: {FRONTEND_URL}")
else:
print("CORS: All origins allowed (development mode)")
print("=" * 80)
# Run Flask app with environment-aware settings
app.run(
debug=(not IS_PRODUCTION),
host='0.0.0.0',
port=BACKEND_PORT
)