video-master-adapt/app.py
nickviljoen 891c36bbfb Add standalone desktop application with web interface
Major Features:
- 🖥️ Standalone desktop app (VideoMatcher.app) - double-click to run
- 🎨 Black & gold branded UI (Montserrat font, #FFC407 accent)
- 📁 Local file browser for master/adaptation folders
-  Fast mode processing (10-20x faster, disables AKAZE/AI Vision)
- 🤖 Smart AI Vision fallback (auto-retry when no matches found)
- 📊 Real-time progress bars (fingerprinting & matching)
- 💾 Local processing (no cloud, no authentication)
- 📤 CSV export with master filenames

Web Application (Enterprise):
- 🌐 Flask web app with Azure AD authentication
- 📦 Box.com integration for cloud storage
- 🐳 Docker support for deployment
- 🔐 JWT validation with httpOnly cookies
- 🎯 REST API endpoints

Enhancements:
- Fixed master filename lookup (was showing "Unknown")
- Automatic fingerprint recovery (detects missing files)
- Improved CSV format (master file next to adaptation)
- Port conflict handling (auto-finds available port)
- Environment variable fixes for standalone mode

Documentation:
- Updated README with standalone app section
- Added 10+ guide documents (UI improvements, fingerprint recovery, etc.)
- Build instructions with PyInstaller
- Comprehensive troubleshooting guide

Technical:
- PyInstaller build configuration (video_matcher.spec)
- Launcher with environment setup (launcher.py)
- Mock authentication for standalone mode
- Video matcher service layer
- Metadata parser and AKAZE video matching

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-31 09:49:04 +02:00

1079 lines
34 KiB
Python

"""
Flask application for Video Master Detection web application.
Provides authentication, Box integration, and video matching capabilities.
"""
import logging
import os
import time
from pathlib import Path
from flask import Flask, render_template, request, jsonify, g
from auth_middleware import AuthMiddleware
import config as app_config
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Check if running in standalone mode
STANDALONE_MODE = os.environ.get('STANDALONE_MODE') == '1'
DISABLE_AUTH = os.environ.get('DISABLE_AUTH') == '1'
def create_app():
"""Create and configure Flask application."""
app = Flask(__name__)
app.config.from_object(app_config.Config)
# Initialize authentication middleware (disabled in standalone mode)
if not DISABLE_AUTH and not STANDALONE_MODE:
auth = AuthMiddleware(app)
logger.info("Authentication enabled")
else:
# Create mock auth middleware for standalone mode
class MockAuth:
def require_auth(self, f):
return f
def set_auth_token(self, token):
return jsonify({'authenticated': True})
def clear_auth_token(self):
return jsonify({'authenticated': False})
def get_auth_status(self):
return {'authenticated': True, 'user': {'email': 'local@user'}}
auth = MockAuth()
logger.info("Authentication disabled (standalone mode)")
app.auth = auth
return app
# Initialize Flask app
app = create_app()
auth = app.auth
# Global lazy-loaded clients
_box_client = None
_matcher_service = None
# Global progress tracking for standalone mode
_matching_progress = {
'active': False,
'current': 0,
'total': 0,
'current_video': '',
'status': 'idle'
}
_fingerprinting_progress = {
'active': False,
'current': 0,
'total': 0,
'current_video': '',
'status': 'idle'
}
def get_box_client():
"""Get or initialize Box client (lazy loading)."""
global _box_client
if _box_client is None:
try:
from box_video_client import BoxVideoClient
_box_client = BoxVideoClient(
config_path=app.config['BOX_CONFIG_PATH'],
root_folder_id=app.config['BOX_ROOT_FOLDER_ID']
)
logger.info("Box client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Box client: {e}")
raise
return _box_client
def get_matcher_service():
"""Get or initialize VideoMatcher service (lazy loading)."""
global _matcher_service
if _matcher_service is None:
try:
from video_matcher_service import VideoMatcherService
# In standalone mode, use faster settings (disable AKAZE and AI vision)
if STANDALONE_MODE:
logger.info("Initializing VideoMatcher in FAST mode (standalone)")
_matcher_service = VideoMatcherService(
data_dir=app.config['DATA_DIR'],
temp_dir=app.config['VIDEO_TEMP_DIR'],
enable_ai_vision=False, # Disable OpenAI API calls (slow)
use_akaze=False # Disable AKAZE feature detection (slow)
)
else:
logger.info("Initializing VideoMatcher in FULL mode (server)")
_matcher_service = VideoMatcherService(
data_dir=app.config['DATA_DIR'],
temp_dir=app.config['VIDEO_TEMP_DIR'],
enable_ai_vision=True, # Enable for server use
use_akaze=True # Enable for server use
)
logger.info("VideoMatcher service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize VideoMatcher service: {e}")
raise
return _matcher_service
# ============================================================================
# ROUTES - Main Pages
# ============================================================================
@app.route('/')
def index():
"""Render home page. Standalone mode or authenticated mode."""
if STANDALONE_MODE:
return render_template('standalone.html')
else:
return render_template('index.html')
# ============================================================================
# ROUTES - Authentication
# ============================================================================
@app.route('/auth/login', methods=['POST'])
def login():
"""
Process Azure AD token and set httpOnly cookie.
Request body:
{
"token": "<Azure AD ID token>"
}
Returns:
JSON with authentication status and user info
"""
try:
data = request.get_json()
token = data.get('token')
if not token:
return jsonify({'error': 'Token required'}), 400
# Validate token and set httpOnly cookie
return auth.set_auth_token(token)
except Exception as e:
logger.error(f"Login error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/auth/logout', methods=['POST'])
def logout():
"""
Clear authentication cookie.
Returns:
JSON with logout confirmation
"""
try:
return auth.clear_auth_token()
except Exception as e:
logger.error(f"Logout error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/auth/status', methods=['GET'])
def auth_status():
"""
Check authentication status.
Returns:
JSON with authentication status and user info (if authenticated)
"""
try:
return jsonify(auth.get_auth_status())
except Exception as e:
logger.error(f"Auth status error: {e}")
return jsonify({'authenticated': False, 'error': str(e)}), 500
# ============================================================================
# ROUTES - Local File System (Standalone Mode)
# ============================================================================
@app.route('/local/browse', methods=['POST'])
def browse_local_folder():
"""
Browse local filesystem for folder selection.
Request body:
{
"path": "/path/to/folder" or null for roots/home
}
Returns:
JSON with list of folders and video files
"""
try:
import platform
from pathlib import Path
data = request.get_json()
folder_path = data.get('path')
# If no path provided, return home directory and common locations
if not folder_path:
home = str(Path.home())
system = platform.system()
roots = {
'home': home,
'desktop': str(Path.home() / 'Desktop') if (Path.home() / 'Desktop').exists() else None,
'documents': str(Path.home() / 'Documents') if (Path.home() / 'Documents').exists() else None,
'downloads': str(Path.home() / 'Downloads') if (Path.home() / 'Downloads').exists() else None,
}
# Add system-specific roots
if system == 'Darwin': # macOS
roots['volumes'] = '/Volumes' if Path('/Volumes').exists() else None
elif system == 'Windows':
import string
drives = [f"{d}:\\" for d in string.ascii_uppercase if Path(f"{d}:\\").exists()]
roots['drives'] = drives
# Clean out None values
roots = {k: v for k, v in roots.items() if v is not None}
return jsonify({
'current_path': None,
'roots': roots,
'folders': [],
'files': []
})
# Validate path exists
path = Path(folder_path)
if not path.exists():
return jsonify({'error': 'Path does not exist'}), 400
if not path.is_dir():
return jsonify({'error': 'Path is not a directory'}), 400
# List folders and video files
folders = []
files = []
video_extensions = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v'}
try:
for item in sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
try:
if item.is_dir():
folders.append({
'name': item.name,
'path': str(item)
})
elif item.is_file() and item.suffix.lower() in video_extensions:
size_mb = item.stat().st_size / (1024 * 1024)
files.append({
'name': item.name,
'path': str(item),
'size_mb': round(size_mb, 2),
'extension': item.suffix.lower()
})
except (PermissionError, OSError):
continue # Skip items we can't access
except PermissionError:
return jsonify({'error': 'Permission denied'}), 403
return jsonify({
'current_path': str(path),
'parent_path': str(path.parent) if path.parent != path else None,
'folders': folders,
'files': files,
'video_count': len(files)
})
except Exception as e:
logger.error(f"Error browsing folder: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/local/scan-masters', methods=['POST'])
def scan_master_folder():
"""
Scan a folder for master videos and check if fingerprints exist.
Request body:
{
"folder_path": "/path/to/masters"
}
Returns:
JSON with list of scanned masters and fingerprinting status
"""
try:
from pathlib import Path
data = request.get_json()
folder_path = data.get('folder_path')
if not folder_path:
return jsonify({'error': 'Folder path required'}), 400
path = Path(folder_path)
if not path.exists() or not path.is_dir():
return jsonify({'error': 'Invalid folder path'}), 400
# Get matcher service
matcher = get_matcher_service()
# Scan for video files
video_extensions = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v'}
video_files = []
for ext in video_extensions:
video_files.extend(path.glob(f"*{ext}"))
logger.info(f"Found {len(video_files)} videos in {folder_path}")
# Get existing masters
existing_masters = {m['path']: m for m in matcher.list_masters()}
# Prepare results
scanned = [] # Videos that need fingerprinting
skipped = [] # Videos with valid fingerprints
for video_file in video_files:
video_path = str(video_file)
# Check if in database
if video_path in existing_masters:
master = existing_masters[video_path]
fingerprint_id = master.get('fingerprint_id')
# Check if fingerprint actually exists on disk
fingerprint_path = Path(matcher.data_dir) / 'fingerprints' / f"{fingerprint_id}.json"
if fingerprint_path.exists():
# Fingerprint exists, skip this video
skipped.append({
'path': video_path,
'filename': video_file.name,
'reason': 'Already fingerprinted',
'master_id': master['master_id']
})
logger.info(f"✓ Fingerprint exists for {video_file.name}")
else:
# In database but fingerprint missing, needs re-fingerprinting
scanned.append({
'path': video_path,
'filename': video_file.name,
'size_mb': round(video_file.stat().st_size / (1024 * 1024), 2),
'reason': 'Missing fingerprint (will re-process)'
})
logger.warning(f"⚠ Fingerprint missing for {video_file.name}, will re-create")
else:
# New video not in database
scanned.append({
'path': video_path,
'filename': video_file.name,
'size_mb': round(video_file.stat().st_size / (1024 * 1024), 2),
'reason': 'New video'
})
return jsonify({
'folder_path': folder_path,
'total_found': len(video_files),
'new_videos': len(scanned),
'already_added': len(skipped),
'scanned': scanned,
'skipped': skipped
})
except Exception as e:
logger.error(f"Error scanning master folder: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/local/add-masters', methods=['POST'])
def add_master_videos():
"""
Add or re-fingerprint master videos with progress tracking.
Request body:
{
"video_paths": ["/path/to/video1.mp4", "/path/to/video2.mp4"]
}
Returns:
JSON with fingerprinting results
"""
global _fingerprinting_progress
try:
data = request.get_json()
video_paths = data.get('video_paths', [])
if not video_paths:
return jsonify({'error': 'No video paths provided'}), 400
matcher = get_matcher_service()
results = []
errors = []
# Initialize progress tracking
_fingerprinting_progress = {
'active': True,
'current': 0,
'total': len(video_paths),
'current_video': '',
'status': 'processing'
}
for i, video_path in enumerate(video_paths, 1):
try:
# Update progress
video_name = Path(video_path).name
_fingerprinting_progress['current'] = i
_fingerprinting_progress['current_video'] = video_name
# Check if master already exists
existing_masters = {m['path']: m for m in matcher.list_masters()}
if video_path in existing_masters:
# Master exists but fingerprint missing - need to re-fingerprint
logger.info(f"Re-fingerprinting existing master ({i}/{len(video_paths)}): {video_name}")
master_id = matcher.add_master(video_path)
results.append({
'path': video_path,
'master_id': master_id,
'status': 're-fingerprinted'
})
logger.info(f"Re-fingerprinted master: {master_id}")
else:
# New master
logger.info(f"Fingerprinting new master ({i}/{len(video_paths)}): {video_name}")
master_id = matcher.add_master(video_path)
results.append({
'path': video_path,
'master_id': master_id,
'status': 'new'
})
logger.info(f"Added new master: {master_id}")
except Exception as e:
logger.error(f"Error processing master {video_path}: {e}")
errors.append({
'path': video_path,
'error': str(e)
})
# Clear progress
_fingerprinting_progress = {
'active': False,
'current': len(video_paths),
'total': len(video_paths),
'current_video': '',
'status': 'completed'
}
return jsonify({
'success': len(results),
'failed': len(errors),
'results': results,
'errors': errors
})
except Exception as e:
logger.error(f"Error adding masters: {e}")
_fingerprinting_progress['active'] = False
_fingerprinting_progress['status'] = 'error'
return jsonify({'error': str(e)}), 500
@app.route('/local/add-masters-progress', methods=['GET'])
def get_fingerprinting_progress():
"""
Get current fingerprinting progress.
Returns:
JSON with progress information
"""
return jsonify(_fingerprinting_progress)
@app.route('/local/scan-adaptations', methods=['POST'])
def scan_adaptation_folders():
"""
Scan folder(s) for adaptation videos.
Request body:
{
"folder_paths": ["/path/to/adaptations1", "/path/to/adaptations2"]
}
Returns:
JSON with list of found adaptation videos
"""
try:
from pathlib import Path
data = request.get_json()
folder_paths = data.get('folder_paths', [])
if not folder_paths:
return jsonify({'error': 'No folder paths provided'}), 400
video_extensions = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v'}
all_videos = []
for folder_path in folder_paths:
path = Path(folder_path)
if not path.exists() or not path.is_dir():
logger.warning(f"Skipping invalid path: {folder_path}")
continue
# Scan for videos
for ext in video_extensions:
for video_file in path.glob(f"*{ext}"):
all_videos.append({
'path': str(video_file),
'filename': video_file.name,
'folder': str(path),
'size_mb': round(video_file.stat().st_size / (1024 * 1024), 2)
})
logger.info(f"Found {len(all_videos)} adaptation videos across {len(folder_paths)} folders")
return jsonify({
'folder_paths': folder_paths,
'total_videos': len(all_videos),
'videos': all_videos
})
except Exception as e:
logger.error(f"Error scanning adaptation folders: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/local/match', methods=['POST'])
def match_local_videos():
"""
Match adaptation videos from local filesystem against masters.
Request body:
{
"adaptation_paths": ["/path/to/video1.mp4", "/path/to/video2.mp4"],
"threshold": 0.80, // optional
"frame_threshold": 0.80, // optional
"min_avg_similarity": 0.90 // optional
}
Returns:
JSON with matching results
"""
global _matching_progress
try:
data = request.get_json()
adaptation_paths = data.get('adaptation_paths', [])
threshold = float(data.get('threshold', 0.80))
frame_threshold = float(data.get('frame_threshold', 0.80))
min_avg_similarity = float(data.get('min_avg_similarity', 0.90))
if not adaptation_paths:
return jsonify({'error': 'No adaptation paths provided'}), 400
matcher = get_matcher_service()
logger.info(f"Starting local matching for {len(adaptation_paths)} adaptations")
# Initialize progress tracking
_matching_progress = {
'active': True,
'current': 0,
'total': len(adaptation_paths),
'current_video': '',
'status': 'processing'
}
# Process videos with progress updates
results = []
for i, adaptation_path in enumerate(adaptation_paths, 1):
try:
# Update progress
video_name = Path(adaptation_path).name
_matching_progress['current'] = i
_matching_progress['current_video'] = video_name
logger.info(f"Processing {i}/{len(adaptation_paths)}: {video_name}")
match_result = matcher.match_video(
video_path=adaptation_path,
threshold=threshold,
frame_threshold=frame_threshold,
min_avg_similarity=min_avg_similarity
)
results.append(match_result)
except Exception as e:
logger.error(f"Error matching {adaptation_path}: {e}")
results.append({
'adaptation_path': adaptation_path,
'error': str(e),
'matched': False
})
# Calculate summary statistics
matched_count = sum(1 for r in results if r.get('matched'))
unmatched_count = len(results) - matched_count
ai_fallback_count = sum(1 for r in results if r.get('match_method') == 'ai_vision_fallback')
logger.info(f"Matching complete: {matched_count} matched, {unmatched_count} unmatched, {ai_fallback_count} via AI fallback")
# Clear progress
_matching_progress = {
'active': False,
'current': len(adaptation_paths),
'total': len(adaptation_paths),
'current_video': '',
'status': 'completed'
}
return jsonify({
'total': len(results),
'matched': matched_count,
'unmatched': unmatched_count,
'ai_fallback_used': ai_fallback_count,
'results': results,
'completed_at': time.time()
})
except Exception as e:
logger.error(f"Error in local matching: {e}")
_matching_progress['active'] = False
_matching_progress['status'] = 'error'
return jsonify({'error': str(e)}), 500
@app.route('/local/match-progress', methods=['GET'])
def get_match_progress():
"""
Get current matching progress.
Returns:
JSON with progress information
"""
return jsonify(_matching_progress)
# ============================================================================
# ROUTES - Box.com Integration (Phase 2 - requires Box credentials)
# ============================================================================
@app.route('/box/folders', methods=['GET'])
@app.route('/box/folders/<folder_id>', methods=['GET'])
@auth.require_auth
def box_folders(folder_id=None):
"""
List folders in Box.
Args:
folder_id: Optional parent folder ID. If not provided, lists root folder.
Returns:
JSON with list of folders
"""
try:
box = get_box_client()
folders = box.list_folders(folder_id)
return jsonify({'folders': folders})
except Exception as e:
logger.error(f"Error listing folders: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/box/videos/<folder_id>', methods=['GET'])
@auth.require_auth
def box_videos(folder_id):
"""
List videos in a Box folder with safety metadata.
Args:
folder_id: Box folder ID
Returns:
JSON with list of video files and safety info
"""
try:
box = get_box_client()
videos = box.list_videos(folder_id, include_metadata=True)
return jsonify({'videos': videos, 'folder_id': folder_id})
except Exception as e:
logger.error(f"Error listing videos: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/box/check-files', methods=['POST'])
@auth.require_auth
def check_box_files():
"""
Check multiple Box files for safety before downloading.
Request body:
{
"video_ids": ["<video1_id>", "<video2_id>", ...]
}
Returns:
JSON with safety assessment, warnings, and errors
"""
try:
data = request.get_json()
video_ids = data.get('video_ids', [])
if not video_ids:
return jsonify({'error': 'No video IDs provided'}), 400
box = get_box_client()
check_result = box.check_files_before_download(video_ids)
return jsonify(check_result)
except Exception as e:
logger.error(f"Error checking files: {e}")
return jsonify({'error': str(e)}), 500
# ============================================================================
# ROUTES - Video Matching (Phase 3)
# ============================================================================
@app.route('/match', methods=['POST'])
@auth.require_auth
def match_videos():
"""
Start a video matching job with safety checks and automatic cleanup.
Request body:
{
"folder_id": "<Box folder ID>",
"video_ids": ["<video1_id>", "<video2_id>", ...],
"threshold": 0.80, // optional
"frame_threshold": 0.80, // optional
"min_avg_similarity": 0.90 // optional
}
Returns:
JSON with job_id and results
"""
job_id = None
try:
data = request.get_json()
folder_id = data.get('folder_id')
video_ids = data.get('video_ids', [])
threshold = float(data.get('threshold', 0.80))
frame_threshold = float(data.get('frame_threshold', 0.80))
min_avg_similarity = float(data.get('min_avg_similarity', 0.90))
# Validate input
if not video_ids:
return jsonify({'error': 'No videos selected'}), 400
if len(video_ids) > app.config['MAX_VIDEOS_PER_JOB']:
return jsonify({
'error': f'Maximum {app.config["MAX_VIDEOS_PER_JOB"]} videos per job'
}), 400
# Get services
box = get_box_client()
matcher = get_matcher_service()
# Check disk space BEFORE starting
disk_check = matcher.check_disk_space(app.config['MIN_DISK_SPACE_GB'])
if not disk_check['sufficient']:
return jsonify({
'error': 'Insufficient disk space',
'details': f"Only {disk_check['free_gb']}GB free (need {disk_check['required_gb']}GB)",
'disk_info': disk_check
}), 507 # Insufficient Storage
# Check files BEFORE downloading
logger.info(f"Checking {len(video_ids)} files before download")
file_check = box.check_files_before_download(video_ids)
if not file_check['safe']:
return jsonify({
'error': 'File validation failed',
'errors': file_check['errors'],
'warnings': file_check.get('warnings', [])
}), 400
# Log warnings if any
if file_check.get('warnings'):
logger.warning(f"File warnings: {file_check['warnings']}")
# Create job
job_id = matcher.create_job(
user_email=g.user['email'],
folder_id=folder_id,
video_ids=video_ids
)
logger.info(f"Job {job_id} created by {g.user['email']} with {len(video_ids)} videos "
f"({file_check['total_size_mb']}MB total)")
# Download videos from Box
video_paths = []
try:
for i, video_id in enumerate(video_ids):
logger.info(f"Downloading video {i+1}/{len(video_ids)} for job {job_id}")
path = box.download_video(
file_id=video_id,
job_id=job_id,
temp_dir=app.config['VIDEO_TEMP_DIR']
)
video_paths.append(path)
logger.info(f"All videos downloaded for job {job_id}, starting matching")
# Process videos (synchronous)
result = matcher.process_videos(
job_id=job_id,
video_paths=video_paths,
threshold=threshold,
frame_threshold=frame_threshold,
min_avg_similarity=min_avg_similarity
)
logger.info(f"Job {job_id} completed successfully")
return jsonify({
'job_id': job_id,
'status': 'completed',
'results': result['results'],
'completed_at': result.get('completed_at'),
'file_warnings': file_check.get('warnings')
})
finally:
# ALWAYS cleanup temp files, even if processing fails
if job_id:
logger.info(f"Cleaning up temp files for job {job_id}")
cleanup_stats = matcher.cleanup_job_files(job_id, force=True)
if cleanup_stats.get('success'):
logger.info(f"Cleanup successful: {cleanup_stats.get('size_freed_mb', 0)}MB freed")
except ValueError as e:
# Validation errors (file too large, blocked format, etc.)
logger.warning(f"Validation error: {e}")
return jsonify({'error': str(e), 'type': 'validation_error'}), 400
except Exception as e:
logger.error(f"Error processing videos: {e}", exc_info=True)
return jsonify({'error': str(e), 'type': 'processing_error'}), 500
@app.route('/jobs/<job_id>/status', methods=['GET'])
@auth.require_auth
def job_status(job_id):
"""
Get job status.
Args:
job_id: Job ID
Returns:
JSON with job status and progress
"""
try:
matcher = get_matcher_service()
job_data = matcher._load_job(job_id)
return jsonify({
'job_id': job_id,
'status': job_data['status'],
'progress': job_data.get('progress', {}),
'created_at': job_data.get('created_at'),
'completed_at': job_data.get('completed_at')
})
except FileNotFoundError:
return jsonify({'error': 'Job not found'}), 404
except Exception as e:
logger.error(f"Error getting job status: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/jobs/<job_id>/results', methods=['GET'])
@auth.require_auth
def job_results(job_id):
"""
Get job results.
Args:
job_id: Job ID
Returns:
JSON with full job results
"""
try:
matcher = get_matcher_service()
job_data = matcher._load_job(job_id)
if job_data['status'] != 'completed':
return jsonify({
'error': 'Job not completed',
'status': job_data['status']
}), 400
return jsonify({
'job_id': job_id,
'status': job_data['status'],
'results': job_data['results'],
'created_at': job_data.get('created_at'),
'completed_at': job_data.get('completed_at')
})
except FileNotFoundError:
return jsonify({'error': 'Job not found'}), 404
except Exception as e:
logger.error(f"Error getting job results: {e}")
return jsonify({'error': str(e)}), 500
# ============================================================================
# ROUTES - Utility
# ============================================================================
@app.route('/health')
def health():
"""
Health check endpoint with disk space monitoring.
Returns:
JSON with service status and resource info
"""
try:
# Check if critical components are available
checks = {
'status': 'healthy',
'service': 'video-matcher'
}
# Try to check Box connection if credentials are configured
if app.config.get('BOX_CONFIG_PATH') and os.path.exists(app.config['BOX_CONFIG_PATH']):
try:
get_box_client()
checks['box_connected'] = True
except Exception as e:
checks['box_connected'] = False
checks['box_error'] = str(e)
else:
checks['box_connected'] = False
checks['box_note'] = 'Box credentials not configured'
# Check disk space
try:
matcher = get_matcher_service()
disk_info = matcher.check_disk_space(app.config['MIN_DISK_SPACE_GB'])
checks['disk_space'] = {
'free_gb': disk_info['free_gb'],
'used_percent': disk_info['used_percent'],
'sufficient': disk_info['sufficient']
}
except Exception as e:
checks['disk_space'] = {'error': str(e)}
return jsonify(checks)
except Exception as e:
logger.error(f"Health check error: {e}")
return jsonify({'status': 'unhealthy', 'error': str(e)}), 500
@app.route('/admin/disk-space', methods=['GET'])
@auth.require_auth
def get_disk_space():
"""
Get detailed disk space information.
Returns:
JSON with disk space details and temp directory size
"""
try:
matcher = get_matcher_service()
disk_info = matcher.check_disk_space(app.config['MIN_DISK_SPACE_GB'])
temp_size = matcher.get_temp_dir_size()
return jsonify({
'disk_space': disk_info,
'temp_directory': temp_size
})
except Exception as e:
logger.error(f"Error getting disk space: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/admin/cleanup', methods=['POST'])
@auth.require_auth
def cleanup_old_files():
"""
Manually trigger cleanup of old temporary files.
Returns:
JSON with cleanup statistics
"""
try:
matcher = get_matcher_service()
cleanup_stats = matcher.cleanup_old_files()
return jsonify({
'success': True,
'cleanup_stats': cleanup_stats
})
except Exception as e:
logger.error(f"Error during cleanup: {e}")
return jsonify({'error': str(e)}), 500
# ============================================================================
# ERROR HANDLERS
# ============================================================================
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
logger.error(f"Internal error: {error}")
return render_template('500.html'), 500
# ============================================================================
# MAIN
# ============================================================================
if __name__ == '__main__':
# Run development server
app.run(
host=app.config['HOST'],
port=app.config['PORT'],
debug=(os.environ.get('FLASK_ENV') == 'development')
)