221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734 | class OAuthClientProvider(httpx.Auth):
"""OAuth2 authentication for httpx.
Handles OAuth flow with automatic client registration and token storage.
"""
requires_response_body = True
def __init__(
self,
server_url: str,
client_metadata: OAuthClientMetadata,
storage: TokenStorage,
redirect_handler: Callable[[str], Awaitable[None]] | None = None,
callback_handler: Callable[[], Awaitable[AuthorizationCodeResult]] | None = None,
timeout: float = 300.0,
client_metadata_url: str | None = None,
validate_resource_url: Callable[[str, str | None], Awaitable[None]] | None = None,
):
"""Initialize OAuth2 authentication.
Args:
server_url: The MCP server URL.
client_metadata: OAuth client metadata for registration.
storage: Token storage implementation.
redirect_handler: Handler for authorization redirects.
callback_handler: Handler for authorization callbacks.
timeout: Timeout for the OAuth flow.
client_metadata_url: URL-based client ID. When provided and the server
advertises client_id_metadata_document_supported=True, this URL will be
used as the client_id instead of performing dynamic client registration.
Must be a valid HTTPS URL with a non-root pathname.
validate_resource_url: Optional callback to override resource URL validation.
Called with (server_url, prm_resource) where prm_resource is the resource
from Protected Resource Metadata (or None if not present). If not provided,
default validation rejects mismatched resources per RFC 8707.
Raises:
ValueError: If client_metadata_url is provided but not a valid HTTPS URL
with a non-root pathname.
"""
# Validate client_metadata_url if provided
if client_metadata_url is not None and not is_valid_client_metadata_url(client_metadata_url):
raise ValueError(
f"client_metadata_url must be a valid HTTPS URL with a non-root pathname, got: {client_metadata_url}"
)
self.context = OAuthContext(
server_url=server_url,
client_metadata=client_metadata,
storage=storage,
redirect_handler=redirect_handler,
callback_handler=callback_handler,
timeout=timeout,
client_metadata_url=client_metadata_url,
)
self._validate_resource_url_callback = validate_resource_url
self._initialized = False
async def _handle_protected_resource_response(self, response: httpx.Response) -> bool:
"""Handle protected resource metadata discovery response.
Per SEP-985, supports fallback when discovery fails at one URL.
Returns:
True if metadata was successfully discovered, False if we should try next URL
"""
if response.status_code == 200:
try:
content = await response.aread()
metadata = ProtectedResourceMetadata.model_validate_json(content)
self.context.protected_resource_metadata = metadata
if metadata.authorization_servers: # pragma: no branch
self.context.auth_server_url = str(metadata.authorization_servers[0])
return True
except ValidationError: # pragma: no cover
# Invalid metadata - try next URL
logger.warning(f"Invalid protected resource metadata at {response.request.url}")
return False
elif response.status_code == 404: # pragma: no cover
# Not found - try next URL in fallback chain
logger.debug(f"Protected resource metadata not found at {response.request.url}, trying next URL")
return False
else:
# Other error - fail immediately
raise OAuthFlowError(
f"Protected Resource Metadata request failed: {response.status_code}"
) # pragma: no cover
async def _perform_authorization(self) -> httpx.Request:
"""Perform the authorization flow."""
auth_code, code_verifier = await self._perform_authorization_code_grant()
token_request = await self._exchange_token_authorization_code(auth_code, code_verifier)
return token_request
async def _perform_authorization_code_grant(self) -> tuple[str, str]:
"""Perform the authorization redirect and get auth code."""
if self.context.client_metadata.redirect_uris is None:
raise OAuthFlowError("No redirect URIs provided for authorization code grant") # pragma: no cover
if not self.context.redirect_handler:
raise OAuthFlowError("No redirect handler provided for authorization code grant") # pragma: no cover
if not self.context.callback_handler:
raise OAuthFlowError("No callback handler provided for authorization code grant") # pragma: no cover
if self.context.oauth_metadata and self.context.oauth_metadata.authorization_endpoint:
auth_endpoint = str(self.context.oauth_metadata.authorization_endpoint)
else:
auth_base_url = self.context.get_authorization_base_url(self.context.server_url)
auth_endpoint = urljoin(auth_base_url, "/authorize")
if not self.context.client_info:
raise OAuthFlowError("No client info available for authorization") # pragma: no cover
# Generate PKCE parameters
pkce_params = PKCEParameters.generate()
state = secrets.token_urlsafe(32)
auth_params = {
"response_type": "code",
"client_id": self.context.client_info.client_id,
"redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
"state": state,
"code_challenge": pkce_params.code_challenge,
"code_challenge_method": "S256",
}
# Only include resource param if conditions are met
if self.context.should_include_resource_param(self.context.protocol_version):
auth_params["resource"] = self.context.get_resource_url() # RFC 8707
if self.context.client_metadata.scope: # pragma: no branch
auth_params["scope"] = self.context.client_metadata.scope
# OIDC requires prompt=consent when offline_access is requested
# https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
if "offline_access" in self.context.client_metadata.scope.split():
auth_params["prompt"] = "consent"
authorization_url = f"{auth_endpoint}?{urlencode(auth_params)}"
await self.context.redirect_handler(authorization_url)
# Wait for callback
result = await self.context.callback_handler()
if result.state is None or not secrets.compare_digest(result.state, state):
raise OAuthFlowError(f"State parameter mismatch: {result.state} != {state}")
# RFC 9207: validate the authorization-response issuer
validate_authorization_response_iss(result.iss, self.context.oauth_metadata)
if not result.code:
raise OAuthFlowError("No authorization code received")
# Return auth code and code verifier for token exchange
return result.code, pkce_params.code_verifier
def _get_token_endpoint(self) -> str:
if self.context.oauth_metadata and self.context.oauth_metadata.token_endpoint:
token_url = str(self.context.oauth_metadata.token_endpoint)
else:
auth_base_url = self.context.get_authorization_base_url(self.context.server_url)
token_url = urljoin(auth_base_url, "/token")
return token_url
async def _exchange_token_authorization_code(
self, auth_code: str, code_verifier: str, *, token_data: dict[str, Any] | None = {}
) -> httpx.Request:
"""Build token exchange request for authorization_code flow."""
if self.context.client_metadata.redirect_uris is None:
raise OAuthFlowError("No redirect URIs provided for authorization code grant") # pragma: no cover
if not self.context.client_info:
raise OAuthFlowError("Missing client info") # pragma: no cover
token_url = self._get_token_endpoint()
token_data = token_data or {}
token_data.update(
{
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
"client_id": self.context.client_info.client_id,
"code_verifier": code_verifier,
}
)
# Only include resource param if conditions are met
if self.context.should_include_resource_param(self.context.protocol_version):
token_data["resource"] = self.context.get_resource_url() # RFC 8707
# Prepare authentication based on preferred method
headers = {"Content-Type": "application/x-www-form-urlencoded"}
token_data, headers = self.context.prepare_token_auth(token_data, headers)
return httpx.Request("POST", token_url, data=token_data, headers=headers)
async def _handle_token_response(self, response: httpx.Response) -> None:
"""Handle token exchange response."""
if response.status_code not in {200, 201}:
body = await response.aread()
body_text = body.decode("utf-8")
raise OAuthTokenError(f"Token exchange failed ({response.status_code}): {body_text}")
# Parse and validate response with scope validation
token_response = await handle_token_response_scopes(response)
# RFC 6749 §5.1: an omitted scope means the granted scope equals the requested
# scope. Record it explicitly so the persisted token is self-describing — the
# SEP-2350 step-up union reads it after a restart, when client_metadata.scope
# has reverted to its constructor value.
if token_response.scope is None:
token_response.scope = self.context.client_metadata.scope
# Store tokens in context
self.context.current_tokens = token_response
self.context.update_token_expiry(token_response)
await self.context.storage.set_tokens(token_response)
async def _refresh_token(self) -> httpx.Request:
"""Build token refresh request."""
if not self.context.current_tokens or not self.context.current_tokens.refresh_token:
raise OAuthTokenError("No refresh token available") # pragma: no cover
if not self.context.client_info or not self.context.client_info.client_id:
raise OAuthTokenError("No client info available") # pragma: no cover
if self.context.oauth_metadata and self.context.oauth_metadata.token_endpoint:
token_url = str(self.context.oauth_metadata.token_endpoint)
else:
auth_base_url = self.context.get_authorization_base_url(self.context.server_url)
token_url = urljoin(auth_base_url, "/token")
refresh_data: dict[str, str] = {
"grant_type": "refresh_token",
"refresh_token": self.context.current_tokens.refresh_token,
"client_id": self.context.client_info.client_id,
}
# Only include resource param if conditions are met
if self.context.should_include_resource_param(self.context.protocol_version):
refresh_data["resource"] = self.context.get_resource_url() # RFC 8707
# Prepare authentication based on preferred method
headers = {"Content-Type": "application/x-www-form-urlencoded"}
refresh_data, headers = self.context.prepare_token_auth(refresh_data, headers)
return httpx.Request("POST", token_url, data=refresh_data, headers=headers)
async def _handle_refresh_response(self, response: httpx.Response) -> bool:
"""Handle token refresh response. Returns True if successful."""
if response.status_code != 200:
logger.warning(f"Token refresh failed: {response.status_code}")
self.context.clear_tokens()
return False
try:
content = await response.aread()
token_response = OAuthToken.model_validate_json(content)
# RFC 6749 §6: a refresh response may omit scope (unchanged) and refresh_token
# (the AS does not rotate). Carry both forward so the persisted token stays
# self-describing for the SEP-2350 step-up union and the next expiry can
# still refresh instead of forcing a full re-authorization.
prior = self.context.current_tokens
if token_response.scope is None and prior is not None:
token_response.scope = prior.scope
if token_response.refresh_token is None and prior is not None:
token_response.refresh_token = prior.refresh_token
self.context.current_tokens = token_response
self.context.update_token_expiry(token_response)
await self.context.storage.set_tokens(token_response)
return True
except ValidationError: # pragma: no cover
logger.exception("Invalid refresh response")
self.context.clear_tokens()
return False
async def _initialize(self) -> None:
"""Load stored tokens and client info."""
self.context.current_tokens = await self.context.storage.get_tokens()
self.context.client_info = await self.context.storage.get_client_info()
self._initialized = True
def _add_auth_header(self, request: httpx.Request) -> None:
"""Add authorization header to request if we have valid tokens."""
if self.context.current_tokens and self.context.current_tokens.access_token: # pragma: no branch
request.headers["Authorization"] = f"Bearer {self.context.current_tokens.access_token}"
async def _handle_oauth_metadata_response(self, response: httpx.Response) -> None:
content = await response.aread()
metadata = OAuthMetadata.model_validate_json(content)
self.context.oauth_metadata = metadata
async def _validate_resource_match(self, prm: ProtectedResourceMetadata) -> None:
"""Validate that PRM resource matches the server URL per RFC 8707."""
prm_resource = str(prm.resource) if prm.resource else None
if self._validate_resource_url_callback is not None:
await self._validate_resource_url_callback(self.context.server_url, prm_resource)
return
if not prm_resource:
return # pragma: no cover
default_resource = resource_url_from_server_url(self.context.server_url)
if not check_resource_allowed(requested_resource=default_resource, configured_resource=prm_resource):
raise OAuthFlowError(f"Protected resource {prm_resource} does not match expected {default_resource}")
async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]:
"""HTTPX auth flow integration."""
async with self.context.lock:
if not self._initialized:
await self._initialize()
# Capture protocol version from request headers
self.context.protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER)
if not self.context.is_token_valid() and self.context.can_refresh_token():
# Try to refresh token
refresh_request = await self._refresh_token()
refresh_response = yield refresh_request
if not await self._handle_refresh_response(refresh_response):
# Refresh failed, need full re-authentication
self._initialized = False
if self.context.is_token_valid():
self._add_auth_header(request)
response = yield request
if response.status_code == 401:
# Perform full OAuth flow
try:
# OAuth flow must be inline due to generator constraints
www_auth_resource_metadata_url = extract_resource_metadata_from_www_auth(response)
# Step 1: Discover protected resource metadata (SEP-985 with fallback support)
prm_discovery_urls = build_protected_resource_metadata_discovery_urls(
www_auth_resource_metadata_url, self.context.server_url
)
for url in prm_discovery_urls: # pragma: no branch
discovery_request = create_oauth_metadata_request(url)
discovery_response = yield discovery_request # sending request
prm = await handle_protected_resource_response(discovery_response)
if prm:
# Validate PRM resource matches server URL (RFC 8707)
await self._validate_resource_match(prm)
self.context.protected_resource_metadata = prm
# todo: try all authorization_servers to find the OASM
assert (
len(prm.authorization_servers) > 0
) # this is always true as authorization_servers has a min length of 1
self.context.auth_server_url = str(prm.authorization_servers[0])
break
else:
logger.debug(f"Protected resource metadata discovery failed: {url}")
# SEP-2352: stored credentials are bound to the issuer that registered them.
# If the authorization server changed, drop them (and the old tokens) so the
# flow re-registers instead of presenting another server's credentials.
if (
self.context.client_info is not None
and self.context.auth_server_url is not None
and not credentials_match_issuer(
self.context.client_info, self.context.auth_server_url, self.context.client_metadata_url
)
):
logger.debug("Authorization server changed; discarding bound credentials and re-registering")
self.context.client_info = None
self.context.clear_tokens()
# Any cached AS metadata is for the old server; drop it so a failed
# rediscovery cannot leak the old registration/token endpoints into Step 4.
self.context.oauth_metadata = None
asm_discovery_urls = build_oauth_authorization_server_metadata_discovery_urls(
self.context.auth_server_url, self.context.server_url
)
# Step 2: Discover OAuth Authorization Server Metadata (OASM) (with fallback for legacy servers)
for url in asm_discovery_urls: # pragma: no branch
oauth_metadata_request = create_oauth_metadata_request(url)
oauth_metadata_response = yield oauth_metadata_request
ok, asm = await handle_auth_metadata_response(oauth_metadata_response)
if not ok:
break
if ok and asm:
# SEP-2468: metadata issuer must match the discovery issuer
if self.context.auth_server_url is not None:
validate_metadata_issuer(asm, self.context.auth_server_url)
self.context.oauth_metadata = asm
break
else:
logger.debug(f"OAuth metadata discovery failed: {url}")
# SEP-2352: on the legacy no-PRM path the issuer is only known after ASM
# discovery, so re-evaluate the binding here using the discovered metadata
# issuer (mirroring the bound_issuer fallback in Step 4).
if (
self.context.client_info is not None
and self.context.auth_server_url is None
and self.context.oauth_metadata is not None
and not credentials_match_issuer(
self.context.client_info,
str(self.context.oauth_metadata.issuer),
self.context.client_metadata_url,
)
):
logger.debug("Authorization server changed; discarding bound credentials and re-registering")
self.context.client_info = None
self.context.clear_tokens()
# Step 3: Apply scope selection strategy
self.context.client_metadata.scope = get_client_metadata_scopes(
extract_scope_from_www_auth(response),
self.context.protected_resource_metadata,
self.context.oauth_metadata,
self.context.client_metadata.grant_types,
)
# Step 4: Register client or use URL-based client ID (CIMD)
if not self.context.client_info:
# SEP-2352: the issuer to bind these credentials to, when known.
discovered_issuer: str | None = None
if self.context.oauth_metadata is not None:
discovered_issuer = self.context.auth_server_url or str(self.context.oauth_metadata.issuer)
if should_use_client_metadata_url(
self.context.oauth_metadata, self.context.client_metadata_url
):
# Use URL-based client ID (CIMD). CIMD records are portable across
# authorization servers, so the issuer stamp is informational.
logger.debug(f"Using URL-based client ID (CIMD): {self.context.client_metadata_url}")
client_information = create_client_info_from_metadata_url(
self.context.client_metadata_url, # type: ignore[arg-type]
redirect_uris=self.context.client_metadata.redirect_uris,
)
client_information.issuer = discovered_issuer
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)
else:
# Fallback to Dynamic Client Registration
fallback_base = self.context.get_authorization_base_url(self.context.server_url)
registration_request = create_client_registration_request(
self.context.oauth_metadata, self.context.client_metadata, fallback_base
)
registration_response = yield registration_request
client_information = await handle_registration_response(registration_response)
# Only record the issuer when the registration above actually targeted
# the discovered AS — either via its published registration_endpoint,
# or because the resource-origin /register fallback is on the issuer's
# own host (legacy same-origin embedded AS). Otherwise the fallback hit
# a different server and recording a binding to the PRM-advertised AS
# would persist a binding that was never established.
if (
self.context.oauth_metadata is not None
and discovered_issuer is not None
and (
self.context.oauth_metadata.registration_endpoint is not None
or self.context.get_authorization_base_url(discovered_issuer) == fallback_base
)
):
client_information.issuer = discovered_issuer
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)
# Step 5: Perform authorization and complete token exchange
token_response = yield await self._perform_authorization()
await self._handle_token_response(token_response)
except Exception:
logger.exception("OAuth flow error")
raise
# Retry with new tokens
self._add_auth_header(request)
yield request
elif response.status_code == 403:
# Step 1: Extract error field from WWW-Authenticate header
error = extract_field_from_www_auth(response, "error")
# Step 2: Check if we need to step-up authorization
if error == "insufficient_scope": # pragma: no branch
try:
# Step 2a: Union previously requested scopes with the newly challenged
# scopes (SEP-2350) so escalating one operation keeps the others' grants.
# Fold in the stored token's scope too: on a restart the token is reloaded
# but client_metadata.scope is not, so it would otherwise be the only basis.
challenged_scope = get_client_metadata_scopes(
extract_scope_from_www_auth(response),
self.context.protected_resource_metadata,
self.context.oauth_metadata,
self.context.client_metadata.grant_types,
)
granted_scope = self.context.current_tokens.scope if self.context.current_tokens else None
prior_scope = union_scopes(self.context.client_metadata.scope, granted_scope)
self.context.client_metadata.scope = union_scopes(prior_scope, challenged_scope)
# Step 2b: Perform (re-)authorization and token exchange
token_response = yield await self._perform_authorization()
await self._handle_token_response(token_response)
except Exception: # pragma: no cover
logger.exception("OAuth flow error")
raise
# Retry with new tokens
self._add_auth_header(request)
yield request
|