Major Features: - 🖥️ Standalone desktop app (VideoMatcher.app) - double-click to run - 🎨 Black & gold branded UI (Montserrat font, #FFC407 accent) - 📁 Local file browser for master/adaptation folders - ⚡ Fast mode processing (10-20x faster, disables AKAZE/AI Vision) - 🤖 Smart AI Vision fallback (auto-retry when no matches found) - 📊 Real-time progress bars (fingerprinting & matching) - 💾 Local processing (no cloud, no authentication) - 📤 CSV export with master filenames Web Application (Enterprise): - 🌐 Flask web app with Azure AD authentication - 📦 Box.com integration for cloud storage - 🐳 Docker support for deployment - 🔐 JWT validation with httpOnly cookies - 🎯 REST API endpoints Enhancements: - Fixed master filename lookup (was showing "Unknown") - Automatic fingerprint recovery (detects missing files) - Improved CSV format (master file next to adaptation) - Port conflict handling (auto-finds available port) - Environment variable fixes for standalone mode Documentation: - Updated README with standalone app section - Added 10+ guide documents (UI improvements, fingerprint recovery, etc.) - Build instructions with PyInstaller - Comprehensive troubleshooting guide Technical: - PyInstaller build configuration (video_matcher.spec) - Launcher with environment setup (launcher.py) - Mock authentication for standalone mode - Video matcher service layer - Metadata parser and AKAZE video matching 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
246 lines
No EOL
8.3 KiB
Python
246 lines
No EOL
8.3 KiB
Python
"""
|
|
Authentication Middleware for Flask application.
|
|
Python equivalent of AuthMiddleware.php from MSAL specification.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from typing import Dict, Any, Optional, Tuple
|
|
from flask import request, jsonify, make_response, g
|
|
from jwt_validator import JWTValidator
|
|
|
|
|
|
class AuthMiddleware:
|
|
"""Authentication middleware for Flask with Azure AD JWT validation and httpOnly cookie management."""
|
|
|
|
def __init__(self, app=None, tenant_id: str = None, client_id: str = None):
|
|
self.tenant_id = tenant_id or os.getenv('AZURE_TENANT_ID', 'e519c2e6-bc6d-4fdf-8d9c-923c2f002385')
|
|
self.client_id = client_id or os.getenv('AZURE_CLIENT_ID', '9079054c-9620-4757-a256-23413042f1ef')
|
|
self.jwt_validator = JWTValidator(self.tenant_id, self.client_id)
|
|
self.cookie_name = 'ai_qc_auth_token'
|
|
|
|
if app:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app):
|
|
"""Initialize the middleware with Flask app."""
|
|
self.app = app
|
|
app.auth_middleware = self
|
|
|
|
# Set secure cookie defaults based on environment
|
|
app.config.setdefault('SESSION_COOKIE_SECURE', os.getenv('FLASK_ENV') == 'production')
|
|
app.config.setdefault('SESSION_COOKIE_HTTPONLY', True)
|
|
app.config.setdefault('SESSION_COOKIE_SAMESITE', 'Lax')
|
|
|
|
def require_auth(self, f):
|
|
"""
|
|
Decorator to require authentication for protected routes.
|
|
Similar to AuthMiddleware->requireAuth() in PHP version.
|
|
"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
auth_result = self.is_authenticated()
|
|
if not auth_result['authenticated']:
|
|
return jsonify({
|
|
'error': 'Authentication required',
|
|
'message': auth_result['error'],
|
|
'authenticated': False
|
|
}), 401
|
|
|
|
# Store user info in Flask's g object for use in route handlers
|
|
g.user = auth_result['user']
|
|
g.token_payload = auth_result['payload']
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
def is_authenticated(self) -> Dict[str, Any]:
|
|
"""
|
|
Check if current request is authenticated.
|
|
Returns dict with authentication status and user info.
|
|
"""
|
|
try:
|
|
# Try to get token from httpOnly cookie
|
|
token = request.cookies.get(self.cookie_name)
|
|
|
|
if not token:
|
|
return {
|
|
'authenticated': False,
|
|
'error': 'No authentication token found',
|
|
'user': None,
|
|
'payload': None
|
|
}
|
|
|
|
# Validate token
|
|
payload = self.jwt_validator.validate_token(token)
|
|
|
|
# Check if token is expired
|
|
if self.jwt_validator.is_token_expired(payload):
|
|
return {
|
|
'authenticated': False,
|
|
'error': 'Authentication token has expired',
|
|
'user': None,
|
|
'payload': None
|
|
}
|
|
|
|
# Extract user information
|
|
user_info = self.jwt_validator.get_user_info(payload)
|
|
|
|
return {
|
|
'authenticated': True,
|
|
'error': None,
|
|
'user': user_info,
|
|
'payload': payload
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'authenticated': False,
|
|
'error': f'Token validation failed: {str(e)}',
|
|
'user': None,
|
|
'payload': None
|
|
}
|
|
|
|
def set_auth_token(self, token: str):
|
|
"""
|
|
Validate and store authentication token in httpOnly cookie.
|
|
Returns Flask response with cookie set.
|
|
"""
|
|
try:
|
|
# Validate token before storing
|
|
payload = self.jwt_validator.validate_token(token)
|
|
|
|
# Create response with httpOnly cookie
|
|
response = make_response(jsonify({
|
|
'success': True,
|
|
'message': 'Authentication successful',
|
|
'authenticated': True,
|
|
'user': self.jwt_validator.get_user_info(payload)
|
|
}))
|
|
|
|
# Set httpOnly cookie with security flags
|
|
self._set_secure_cookie(response, token, payload)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
return make_response(jsonify({
|
|
'success': False,
|
|
'error': f'Token validation failed: {str(e)}',
|
|
'authenticated': False
|
|
}), 401)
|
|
|
|
def clear_auth_token(self):
|
|
"""Clear authentication cookie and return response."""
|
|
response = make_response(jsonify({
|
|
'success': True,
|
|
'message': 'Logged out successfully',
|
|
'authenticated': False
|
|
}))
|
|
|
|
# Clear the authentication cookie
|
|
response.set_cookie(
|
|
self.cookie_name,
|
|
'',
|
|
expires=0,
|
|
path='/',
|
|
domain='',
|
|
secure=self._is_secure_context(),
|
|
httponly=True,
|
|
samesite='Lax'
|
|
)
|
|
|
|
return response
|
|
|
|
def _set_secure_cookie(self, response, token: str, payload: Dict[str, Any]):
|
|
"""Set httpOnly cookie with proper security flags."""
|
|
# Calculate expiration time (24 hours or token expiration, whichever is sooner)
|
|
token_exp = payload.get('exp')
|
|
max_age = 24 * 60 * 60 # 24 hours in seconds
|
|
|
|
if token_exp:
|
|
current_time = datetime.utcnow().timestamp()
|
|
token_remaining = token_exp - current_time
|
|
max_age = min(max_age, int(token_remaining))
|
|
|
|
# Set secure cookie
|
|
response.set_cookie(
|
|
self.cookie_name,
|
|
token,
|
|
max_age=max_age,
|
|
path='/',
|
|
domain='',
|
|
secure=self._is_secure_context(),
|
|
httponly=True,
|
|
samesite='Lax'
|
|
)
|
|
|
|
def _is_secure_context(self) -> bool:
|
|
"""Determine if we're in a secure context (HTTPS)."""
|
|
# Check various indicators of HTTPS
|
|
if request.is_secure:
|
|
return True
|
|
|
|
# Check for common proxy headers
|
|
if request.headers.get('X-Forwarded-Proto') == 'https':
|
|
return True
|
|
|
|
if request.headers.get('X-Forwarded-SSL') == 'on':
|
|
return True
|
|
|
|
# Check Flask environment
|
|
if os.getenv('FLASK_ENV') == 'production':
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_auth_status(self) -> Dict[str, Any]:
|
|
"""Get current authentication status for API endpoint."""
|
|
auth_result = self.is_authenticated()
|
|
|
|
response_data = {
|
|
'authenticated': auth_result['authenticated'],
|
|
'user': auth_result['user']
|
|
}
|
|
|
|
if not auth_result['authenticated']:
|
|
response_data['error'] = auth_result['error']
|
|
|
|
return response_data
|
|
|
|
def validate_and_refresh_token(self) -> Dict[str, Any]:
|
|
"""
|
|
Validate current token and check if refresh is needed.
|
|
This method can be called periodically to ensure token validity.
|
|
"""
|
|
auth_result = self.is_authenticated()
|
|
|
|
if not auth_result['authenticated']:
|
|
return auth_result
|
|
|
|
# Check if token is close to expiration (within 5 minutes)
|
|
payload = auth_result['payload']
|
|
exp = payload.get('exp')
|
|
|
|
if exp:
|
|
current_time = datetime.utcnow().timestamp()
|
|
time_to_expire = exp - current_time
|
|
|
|
if time_to_expire < 300: # 5 minutes
|
|
return {
|
|
'authenticated': True,
|
|
'user': auth_result['user'],
|
|
'payload': payload,
|
|
'refresh_needed': True,
|
|
'expires_in': int(time_to_expire)
|
|
}
|
|
|
|
return {
|
|
'authenticated': True,
|
|
'user': auth_result['user'],
|
|
'payload': payload,
|
|
'refresh_needed': False
|
|
} |