Add PKCE support for Azure AD public client SSO

- Use initiate_auth_code_flow for PKCE (required by Azure AD for public clients)
- Store auth flow in session for token exchange
- Fix AADSTS9002325 error

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
SamoilenkoVadym 2026-02-06 23:21:20 +00:00
parent 614322e135
commit 14ea29c5cb
2 changed files with 38 additions and 28 deletions

View file

@ -194,35 +194,38 @@ class MicrosoftSSO:
self.enabled = False
logger.error(f"Failed to initialize Microsoft SSO: {e}")
def get_auth_url(self, state: Optional[str] = None) -> Optional[str]:
def get_auth_url(self, state: Optional[str] = None) -> Optional[Dict]:
"""
Get Microsoft login URL.
Get Microsoft login URL with PKCE support.
Args:
state: State parameter for CSRF protection
Returns:
Authorization URL or None if SSO not enabled
Auth flow dict containing 'auth_uri' and PKCE data, or None if SSO not enabled
"""
if not self.enabled:
return None
try:
return self.app.get_authorization_request_url(
# Use initiate_auth_code_flow for PKCE support (required for public clients)
flow = self.app.initiate_auth_code_flow(
scopes=["User.Read"],
state=state,
redirect_uri=self.redirect_uri
redirect_uri=self.redirect_uri,
state=state
)
return flow
except Exception as e:
logger.error(f"Error generating auth URL: {e}")
return None
def acquire_token(self, auth_code: str) -> Optional[Dict]:
def acquire_token(self, auth_response: Dict, auth_flow: Dict) -> Optional[Dict]:
"""
Exchange authorization code for access token.
Exchange authorization code for access token using PKCE flow.
Args:
auth_code: Authorization code from Microsoft
auth_response: The full callback response (request.args)
auth_flow: The auth flow dict from get_auth_url
Returns:
Token result dictionary or None if failed
@ -231,10 +234,9 @@ class MicrosoftSSO:
return None
try:
result = self.app.acquire_token_by_authorization_code(
auth_code,
scopes=["User.Read"],
redirect_uri=self.redirect_uri
result = self.app.acquire_token_by_auth_code_flow(
auth_flow,
auth_response
)
return result
except Exception as e:

View file

@ -246,7 +246,7 @@ def logout():
@app.route('/login/microsoft')
def login_microsoft():
"""Redirect to Microsoft SSO."""
"""Redirect to Microsoft SSO with PKCE."""
sso = get_sso_instance()
if not sso.enabled:
@ -254,31 +254,34 @@ def login_microsoft():
# Generate state for CSRF protection
state = secrets.token_urlsafe(16)
session['oauth_state'] = state
auth_url = sso.get_auth_url(state=state)
if auth_url:
return redirect(auth_url)
# Get auth flow (includes PKCE code_verifier)
auth_flow = sso.get_auth_url(state=state)
if auth_flow and 'auth_uri' in auth_flow:
# Store the entire flow in session (needed for PKCE verification)
session['oauth_flow'] = auth_flow
return redirect(auth_flow['auth_uri'])
else:
return render_template('login.html', error='Failed to generate SSO URL', sso_enabled=is_sso_enabled())
@app.route('/auth/callback')
def auth_callback():
"""Handle Microsoft SSO callback."""
"""Handle Microsoft SSO callback with PKCE."""
sso = get_sso_instance()
# Verify state
if request.args.get('state') != session.get('oauth_state'):
return render_template('login.html', error='Invalid state parameter', sso_enabled=is_sso_enabled())
# Get stored auth flow (contains PKCE code_verifier)
auth_flow = session.get('oauth_flow')
if not auth_flow:
return render_template('login.html', error='Session expired, please try again', sso_enabled=is_sso_enabled())
code = request.args.get('code')
if not code:
error_desc = request.args.get('error_description', 'No authorization code')
# Check for error in response
if request.args.get('error'):
error_desc = request.args.get('error_description', 'Unknown error')
return render_template('login.html', error=f'SSO failed: {error_desc}', sso_enabled=is_sso_enabled())
# Exchange code for token
result = sso.acquire_token(code)
# Exchange code for token using PKCE flow
result = sso.acquire_token(dict(request.args), auth_flow)
if result and 'access_token' in result:
# Get user info from Microsoft Graph
@ -302,9 +305,14 @@ def auth_callback():
session['username'] = user['username']
session['session_id'] = session_id
# Clear oauth flow from session
session.pop('oauth_flow', None)
return redirect(f'{URL_PREFIX}/')
return render_template('login.html', error='SSO authentication failed', sso_enabled=is_sso_enabled())
# Log error details if available
error_msg = result.get('error_description', 'SSO authentication failed') if result else 'SSO authentication failed'
return render_template('login.html', error=error_msg, sso_enabled=is_sso_enabled())
@app.route('/')