diff --git a/src/auth.py b/src/auth.py index 2538284..2ee2e39 100644 --- a/src/auth.py +++ b/src/auth.py @@ -194,35 +194,38 @@ class MicrosoftSSO: self.enabled = False logger.error(f"Failed to initialize Microsoft SSO: {e}") - def get_auth_url(self, state: Optional[str] = None) -> Optional[str]: + def get_auth_url(self, state: Optional[str] = None) -> Optional[Dict]: """ - Get Microsoft login URL. + Get Microsoft login URL with PKCE support. Args: state: State parameter for CSRF protection Returns: - Authorization URL or None if SSO not enabled + Auth flow dict containing 'auth_uri' and PKCE data, or None if SSO not enabled """ if not self.enabled: return None try: - return self.app.get_authorization_request_url( + # Use initiate_auth_code_flow for PKCE support (required for public clients) + flow = self.app.initiate_auth_code_flow( scopes=["User.Read"], - state=state, - redirect_uri=self.redirect_uri + redirect_uri=self.redirect_uri, + state=state ) + return flow except Exception as e: logger.error(f"Error generating auth URL: {e}") return None - def acquire_token(self, auth_code: str) -> Optional[Dict]: + def acquire_token(self, auth_response: Dict, auth_flow: Dict) -> Optional[Dict]: """ - Exchange authorization code for access token. + Exchange authorization code for access token using PKCE flow. Args: - auth_code: Authorization code from Microsoft + auth_response: The full callback response (request.args) + auth_flow: The auth flow dict from get_auth_url Returns: Token result dictionary or None if failed @@ -231,10 +234,9 @@ class MicrosoftSSO: return None try: - result = self.app.acquire_token_by_authorization_code( - auth_code, - scopes=["User.Read"], - redirect_uri=self.redirect_uri + result = self.app.acquire_token_by_auth_code_flow( + auth_flow, + auth_response ) return result except Exception as e: diff --git a/web_app.py b/web_app.py index 6082d4c..3331aed 100644 --- a/web_app.py +++ b/web_app.py @@ -246,7 +246,7 @@ def logout(): @app.route('/login/microsoft') def login_microsoft(): - """Redirect to Microsoft SSO.""" + """Redirect to Microsoft SSO with PKCE.""" sso = get_sso_instance() if not sso.enabled: @@ -254,31 +254,34 @@ def login_microsoft(): # Generate state for CSRF protection state = secrets.token_urlsafe(16) - session['oauth_state'] = state - auth_url = sso.get_auth_url(state=state) - if auth_url: - return redirect(auth_url) + # Get auth flow (includes PKCE code_verifier) + auth_flow = sso.get_auth_url(state=state) + if auth_flow and 'auth_uri' in auth_flow: + # Store the entire flow in session (needed for PKCE verification) + session['oauth_flow'] = auth_flow + return redirect(auth_flow['auth_uri']) else: return render_template('login.html', error='Failed to generate SSO URL', sso_enabled=is_sso_enabled()) @app.route('/auth/callback') def auth_callback(): - """Handle Microsoft SSO callback.""" + """Handle Microsoft SSO callback with PKCE.""" sso = get_sso_instance() - # Verify state - if request.args.get('state') != session.get('oauth_state'): - return render_template('login.html', error='Invalid state parameter', sso_enabled=is_sso_enabled()) + # Get stored auth flow (contains PKCE code_verifier) + auth_flow = session.get('oauth_flow') + if not auth_flow: + return render_template('login.html', error='Session expired, please try again', sso_enabled=is_sso_enabled()) - code = request.args.get('code') - if not code: - error_desc = request.args.get('error_description', 'No authorization code') + # Check for error in response + if request.args.get('error'): + error_desc = request.args.get('error_description', 'Unknown error') return render_template('login.html', error=f'SSO failed: {error_desc}', sso_enabled=is_sso_enabled()) - # Exchange code for token - result = sso.acquire_token(code) + # Exchange code for token using PKCE flow + result = sso.acquire_token(dict(request.args), auth_flow) if result and 'access_token' in result: # Get user info from Microsoft Graph @@ -302,9 +305,14 @@ def auth_callback(): session['username'] = user['username'] session['session_id'] = session_id + # Clear oauth flow from session + session.pop('oauth_flow', None) + return redirect(f'{URL_PREFIX}/') - return render_template('login.html', error='SSO authentication failed', sso_enabled=is_sso_enabled()) + # Log error details if available + error_msg = result.get('error_description', 'SSO authentication failed') if result else 'SSO authentication failed' + return render_template('login.html', error=error_msg, sso_enabled=is_sso_enabled()) @app.route('/')