Organized the application into separate frontend and backend directories for cleaner deployment and better separation of concerns. Frontend Directory (frontend/): - index.html: Single-page web interface (renamed from web_ui.html) - README.md: Frontend deployment guide - Total size: ~113 KB (self-contained) - Smart base path detection (works at / or /ai_qc/) - No configuration changes required Backend Directory (backend/): - All Python files (api_server.py, llm_config.py, etc.) - visual_qc_apps/: 33 QC check modules - profiles/: 6 QC profile configurations - brand_guidelines/: Reference asset storage - config/: Environment configurations - scripts/: Deployment automation - uploads/, output/: Data directories - requirements.txt, ai_qc.service, apache_config.conf - Complete documentation New Documentation: - FOLDER_STRUCTURE.md: Comprehensive guide to new structure - frontend/README.md: Frontend deployment instructions - backend/BACKEND_README.md: Backend deployment guide Deployment Mapping: - frontend/ → /var/www/html/ai_qc/ (web root) - backend/ → /opt/ai_qc/ (application directory) Benefits: - Clear separation of concerns - Backend code not in web-accessible directory - Independent frontend/backend updates - Matches server's existing patterns (/opt/veo3, /opt/voice2text) - Industry-standard architecture - Easy to deploy and maintain Original files preserved in root directory for reference. Ready for production deployment following MIGRATION_GUIDE.md. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
246 lines
No EOL
8.3 KiB
Python
246 lines
No EOL
8.3 KiB
Python
"""
|
|
Authentication Middleware for Flask application.
|
|
Python equivalent of AuthMiddleware.php from MSAL specification.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from typing import Dict, Any, Optional, Tuple
|
|
from flask import request, jsonify, make_response, g
|
|
from jwt_validator import JWTValidator
|
|
|
|
|
|
class AuthMiddleware:
|
|
"""Authentication middleware for Flask with Azure AD JWT validation and httpOnly cookie management."""
|
|
|
|
def __init__(self, app=None, tenant_id: str = None, client_id: str = None):
|
|
self.tenant_id = tenant_id or os.getenv('AZURE_TENANT_ID', 'e519c2e6-bc6d-4fdf-8d9c-923c2f002385')
|
|
self.client_id = client_id or os.getenv('AZURE_CLIENT_ID', '9079054c-9620-4757-a256-23413042f1ef')
|
|
self.jwt_validator = JWTValidator(self.tenant_id, self.client_id)
|
|
self.cookie_name = 'ai_qc_auth_token'
|
|
|
|
if app:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app):
|
|
"""Initialize the middleware with Flask app."""
|
|
self.app = app
|
|
app.auth_middleware = self
|
|
|
|
# Set secure cookie defaults based on environment
|
|
app.config.setdefault('SESSION_COOKIE_SECURE', os.getenv('FLASK_ENV') == 'production')
|
|
app.config.setdefault('SESSION_COOKIE_HTTPONLY', True)
|
|
app.config.setdefault('SESSION_COOKIE_SAMESITE', 'Lax')
|
|
|
|
def require_auth(self, f):
|
|
"""
|
|
Decorator to require authentication for protected routes.
|
|
Similar to AuthMiddleware->requireAuth() in PHP version.
|
|
"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
auth_result = self.is_authenticated()
|
|
if not auth_result['authenticated']:
|
|
return jsonify({
|
|
'error': 'Authentication required',
|
|
'message': auth_result['error'],
|
|
'authenticated': False
|
|
}), 401
|
|
|
|
# Store user info in Flask's g object for use in route handlers
|
|
g.user = auth_result['user']
|
|
g.token_payload = auth_result['payload']
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
def is_authenticated(self) -> Dict[str, Any]:
|
|
"""
|
|
Check if current request is authenticated.
|
|
Returns dict with authentication status and user info.
|
|
"""
|
|
try:
|
|
# Try to get token from httpOnly cookie
|
|
token = request.cookies.get(self.cookie_name)
|
|
|
|
if not token:
|
|
return {
|
|
'authenticated': False,
|
|
'error': 'No authentication token found',
|
|
'user': None,
|
|
'payload': None
|
|
}
|
|
|
|
# Validate token
|
|
payload = self.jwt_validator.validate_token(token)
|
|
|
|
# Check if token is expired
|
|
if self.jwt_validator.is_token_expired(payload):
|
|
return {
|
|
'authenticated': False,
|
|
'error': 'Authentication token has expired',
|
|
'user': None,
|
|
'payload': None
|
|
}
|
|
|
|
# Extract user information
|
|
user_info = self.jwt_validator.get_user_info(payload)
|
|
|
|
return {
|
|
'authenticated': True,
|
|
'error': None,
|
|
'user': user_info,
|
|
'payload': payload
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'authenticated': False,
|
|
'error': f'Token validation failed: {str(e)}',
|
|
'user': None,
|
|
'payload': None
|
|
}
|
|
|
|
def set_auth_token(self, token: str):
|
|
"""
|
|
Validate and store authentication token in httpOnly cookie.
|
|
Returns Flask response with cookie set.
|
|
"""
|
|
try:
|
|
# Validate token before storing
|
|
payload = self.jwt_validator.validate_token(token)
|
|
|
|
# Create response with httpOnly cookie
|
|
response = make_response(jsonify({
|
|
'success': True,
|
|
'message': 'Authentication successful',
|
|
'authenticated': True,
|
|
'user': self.jwt_validator.get_user_info(payload)
|
|
}))
|
|
|
|
# Set httpOnly cookie with security flags
|
|
self._set_secure_cookie(response, token, payload)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
return make_response(jsonify({
|
|
'success': False,
|
|
'error': f'Token validation failed: {str(e)}',
|
|
'authenticated': False
|
|
}), 401)
|
|
|
|
def clear_auth_token(self):
|
|
"""Clear authentication cookie and return response."""
|
|
response = make_response(jsonify({
|
|
'success': True,
|
|
'message': 'Logged out successfully',
|
|
'authenticated': False
|
|
}))
|
|
|
|
# Clear the authentication cookie
|
|
response.set_cookie(
|
|
self.cookie_name,
|
|
'',
|
|
expires=0,
|
|
path='/',
|
|
domain='',
|
|
secure=self._is_secure_context(),
|
|
httponly=True,
|
|
samesite='Lax'
|
|
)
|
|
|
|
return response
|
|
|
|
def _set_secure_cookie(self, response, token: str, payload: Dict[str, Any]):
|
|
"""Set httpOnly cookie with proper security flags."""
|
|
# Calculate expiration time (24 hours or token expiration, whichever is sooner)
|
|
token_exp = payload.get('exp')
|
|
max_age = 24 * 60 * 60 # 24 hours in seconds
|
|
|
|
if token_exp:
|
|
current_time = datetime.utcnow().timestamp()
|
|
token_remaining = token_exp - current_time
|
|
max_age = min(max_age, int(token_remaining))
|
|
|
|
# Set secure cookie
|
|
response.set_cookie(
|
|
self.cookie_name,
|
|
token,
|
|
max_age=max_age,
|
|
path='/',
|
|
domain='',
|
|
secure=self._is_secure_context(),
|
|
httponly=True,
|
|
samesite='Lax'
|
|
)
|
|
|
|
def _is_secure_context(self) -> bool:
|
|
"""Determine if we're in a secure context (HTTPS)."""
|
|
# Check various indicators of HTTPS
|
|
if request.is_secure:
|
|
return True
|
|
|
|
# Check for common proxy headers
|
|
if request.headers.get('X-Forwarded-Proto') == 'https':
|
|
return True
|
|
|
|
if request.headers.get('X-Forwarded-SSL') == 'on':
|
|
return True
|
|
|
|
# Check Flask environment
|
|
if os.getenv('FLASK_ENV') == 'production':
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_auth_status(self) -> Dict[str, Any]:
|
|
"""Get current authentication status for API endpoint."""
|
|
auth_result = self.is_authenticated()
|
|
|
|
response_data = {
|
|
'authenticated': auth_result['authenticated'],
|
|
'user': auth_result['user']
|
|
}
|
|
|
|
if not auth_result['authenticated']:
|
|
response_data['error'] = auth_result['error']
|
|
|
|
return response_data
|
|
|
|
def validate_and_refresh_token(self) -> Dict[str, Any]:
|
|
"""
|
|
Validate current token and check if refresh is needed.
|
|
This method can be called periodically to ensure token validity.
|
|
"""
|
|
auth_result = self.is_authenticated()
|
|
|
|
if not auth_result['authenticated']:
|
|
return auth_result
|
|
|
|
# Check if token is close to expiration (within 5 minutes)
|
|
payload = auth_result['payload']
|
|
exp = payload.get('exp')
|
|
|
|
if exp:
|
|
current_time = datetime.utcnow().timestamp()
|
|
time_to_expire = exp - current_time
|
|
|
|
if time_to_expire < 300: # 5 minutes
|
|
return {
|
|
'authenticated': True,
|
|
'user': auth_result['user'],
|
|
'payload': payload,
|
|
'refresh_needed': True,
|
|
'expires_in': int(time_to_expire)
|
|
}
|
|
|
|
return {
|
|
'authenticated': True,
|
|
'user': auth_result['user'],
|
|
'payload': payload,
|
|
'refresh_needed': False
|
|
} |