Skip to content

utils

extract_field_from_www_auth

extract_field_from_www_auth(
    response: Response, field_name: str
) -> str | None

Extract field from WWW-Authenticate header.

Returns:

Type Description
str | None

Field value if found in WWW-Authenticate header, None otherwise

Source code in src/mcp/client/auth/utils.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def extract_field_from_www_auth(response: Response, field_name: str) -> str | None:
    """Extract field from WWW-Authenticate header.

    Returns:
        Field value if found in WWW-Authenticate header, None otherwise
    """
    www_auth_header = response.headers.get("WWW-Authenticate")
    if not www_auth_header:
        return None

    # Pattern matches: field_name="value" or field_name=value (unquoted)
    pattern = rf'{field_name}=(?:"([^"]+)"|([^\s,]+))'
    match = re.search(pattern, www_auth_header)

    if match:
        # Return quoted value if present, otherwise unquoted value
        return match.group(1) or match.group(2)

    return None

extract_scope_from_www_auth

extract_scope_from_www_auth(
    response: Response,
) -> str | None

Extract scope parameter from WWW-Authenticate header as per RFC 6750.

Returns:

Type Description
str | None

Scope string if found in WWW-Authenticate header, None otherwise

Source code in src/mcp/client/auth/utils.py
40
41
42
43
44
45
46
def extract_scope_from_www_auth(response: Response) -> str | None:
    """Extract scope parameter from WWW-Authenticate header as per RFC 6750.

    Returns:
        Scope string if found in WWW-Authenticate header, None otherwise
    """
    return extract_field_from_www_auth(response, "scope")

extract_resource_metadata_from_www_auth

extract_resource_metadata_from_www_auth(
    response: Response,
) -> str | None

Extract protected resource metadata URL from WWW-Authenticate header as per RFC 9728.

Returns:

Type Description
str | None

Resource metadata URL if found in WWW-Authenticate header, None otherwise

Source code in src/mcp/client/auth/utils.py
49
50
51
52
53
54
55
56
57
58
def extract_resource_metadata_from_www_auth(response: Response) -> str | None:
    """Extract protected resource metadata URL from WWW-Authenticate header as per RFC 9728.

    Returns:
        Resource metadata URL if found in WWW-Authenticate header, None otherwise
    """
    if not response or response.status_code != 401:
        return None  # pragma: no cover

    return extract_field_from_www_auth(response, "resource_metadata")

build_protected_resource_metadata_discovery_urls

build_protected_resource_metadata_discovery_urls(
    www_auth_url: str | None, server_url: str
) -> list[str]

Build ordered list of URLs to try for protected resource metadata discovery.

Per SEP-985, the client MUST: 1. Try resource_metadata from WWW-Authenticate header (if present) 2. Fall back to path-based well-known URI: /.well-known/oauth-protected-resource/{path} 3. Fall back to root-based well-known URI: /.well-known/oauth-protected-resource

Parameters:

Name Type Description Default
www_auth_url str | None

Optional resource_metadata URL extracted from the WWW-Authenticate header

required
server_url str

Server URL

required

Returns:

Type Description
list[str]

Ordered list of URLs to try for discovery

Source code in src/mcp/client/auth/utils.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def build_protected_resource_metadata_discovery_urls(www_auth_url: str | None, server_url: str) -> list[str]:
    """Build ordered list of URLs to try for protected resource metadata discovery.

    Per SEP-985, the client MUST:
    1. Try resource_metadata from WWW-Authenticate header (if present)
    2. Fall back to path-based well-known URI: /.well-known/oauth-protected-resource/{path}
    3. Fall back to root-based well-known URI: /.well-known/oauth-protected-resource

    Args:
        www_auth_url: Optional resource_metadata URL extracted from the WWW-Authenticate header
        server_url: Server URL

    Returns:
        Ordered list of URLs to try for discovery
    """
    urls: list[str] = []

    # Priority 1: WWW-Authenticate header with resource_metadata parameter
    if www_auth_url:
        urls.append(www_auth_url)

    # Priority 2-3: Well-known URIs (RFC 9728)
    parsed = urlparse(server_url)
    base_url = f"{parsed.scheme}://{parsed.netloc}"

    # Priority 2: Path-based well-known URI (if server has a path component)
    if parsed.path and parsed.path != "/":
        path_based_url = urljoin(base_url, f"/.well-known/oauth-protected-resource{parsed.path}")
        urls.append(path_based_url)

    # Priority 3: Root-based well-known URI
    root_based_url = urljoin(base_url, "/.well-known/oauth-protected-resource")
    urls.append(root_based_url)

    return urls

get_client_metadata_scopes

get_client_metadata_scopes(
    www_authenticate_scope: str | None,
    protected_resource_metadata: (
        ProtectedResourceMetadata | None
    ),
    authorization_server_metadata: (
        OAuthMetadata | None
    ) = None,
    client_grant_types: list[str] | None = None,
) -> str | None

Select effective scopes and augment for refresh token support.

Source code in src/mcp/client/auth/utils.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_client_metadata_scopes(
    www_authenticate_scope: str | None,
    protected_resource_metadata: ProtectedResourceMetadata | None,
    authorization_server_metadata: OAuthMetadata | None = None,
    client_grant_types: list[str] | None = None,
) -> str | None:
    """Select effective scopes and augment for refresh token support."""
    selected_scope: str | None = None

    # MCP spec scope selection priority:
    #   1. WWW-Authenticate header scope
    #   2. PRM scopes_supported
    #   3. AS scopes_supported (SDK fallback)
    #   4. Omit scope parameter
    if www_authenticate_scope is not None:
        selected_scope = www_authenticate_scope
    elif protected_resource_metadata is not None and protected_resource_metadata.scopes_supported is not None:
        selected_scope = " ".join(protected_resource_metadata.scopes_supported)
    elif authorization_server_metadata is not None and authorization_server_metadata.scopes_supported is not None:
        selected_scope = " ".join(authorization_server_metadata.scopes_supported)

    # SEP-2207: append offline_access when the AS supports it and the client can use refresh tokens
    if (
        selected_scope is not None
        and authorization_server_metadata is not None
        and authorization_server_metadata.scopes_supported is not None
        and "offline_access" in authorization_server_metadata.scopes_supported
        and client_grant_types is not None
        and "refresh_token" in client_grant_types
        and "offline_access" not in selected_scope.split()
    ):
        selected_scope = f"{selected_scope} offline_access"

    return selected_scope

union_scopes

union_scopes(
    previous_scope: str | None, new_scope: str | None
) -> str | None

Merge two space-delimited scope strings, preserving order and dropping duplicates.

SEP-2350: on step-up re-authorization the client requests the union of previously requested scopes and the newly challenged scopes, so escalating one operation does not drop the permissions granted for another. Previously requested scopes come first; new scopes are appended in order.

Source code in src/mcp/client/auth/utils.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def union_scopes(previous_scope: str | None, new_scope: str | None) -> str | None:
    """Merge two space-delimited scope strings, preserving order and dropping duplicates.

    SEP-2350: on step-up re-authorization the client requests the union of previously requested
    scopes and the newly challenged scopes, so escalating one operation does not drop the
    permissions granted for another. Previously requested scopes come first; new scopes are
    appended in order.
    """
    if not previous_scope:
        return new_scope
    if not new_scope:
        return previous_scope

    merged = previous_scope.split()
    seen = set(merged)
    for scope in new_scope.split():
        if scope not in seen:
            merged.append(scope)
            seen.add(scope)
    return " ".join(merged)

build_oauth_authorization_server_metadata_discovery_urls

build_oauth_authorization_server_metadata_discovery_urls(
    auth_server_url: str | None, server_url: str
) -> list[str]

Generate an ordered list of URLs for authorization server metadata discovery.

Parameters:

Name Type Description Default
auth_server_url str | None

OAuth Authorization Server Metadata URL if found, otherwise None

required
server_url str

URL for the MCP server, used as a fallback if auth_server_url is None

required
Source code in src/mcp/client/auth/utils.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def build_oauth_authorization_server_metadata_discovery_urls(auth_server_url: str | None, server_url: str) -> list[str]:
    """Generate an ordered list of URLs for authorization server metadata discovery.

    Args:
        auth_server_url: OAuth Authorization Server Metadata URL if found, otherwise None
        server_url: URL for the MCP server, used as a fallback if auth_server_url is None
    """

    if not auth_server_url:
        # Legacy path using the 2025-03-26 spec:
        # link: https://modelcontextprotocol.io/specification/2025-03-26/basic/authorization
        parsed = urlparse(server_url)
        return [f"{parsed.scheme}://{parsed.netloc}/.well-known/oauth-authorization-server"]

    urls: list[str] = []
    parsed = urlparse(auth_server_url)
    base_url = f"{parsed.scheme}://{parsed.netloc}"

    # RFC 8414: Path-aware OAuth discovery
    if parsed.path and parsed.path != "/":
        oauth_path = f"/.well-known/oauth-authorization-server{parsed.path.rstrip('/')}"
        urls.append(urljoin(base_url, oauth_path))

        # RFC 8414 section 5: Path-aware OIDC discovery
        # See https://www.rfc-editor.org/rfc/rfc8414.html#section-5
        oidc_path = f"/.well-known/openid-configuration{parsed.path.rstrip('/')}"
        urls.append(urljoin(base_url, oidc_path))

        # https://openid.net/specs/openid-connect-discovery-1_0.html
        oidc_path = f"{parsed.path.rstrip('/')}/.well-known/openid-configuration"
        urls.append(urljoin(base_url, oidc_path))
        return urls

    # OAuth root
    urls.append(urljoin(base_url, "/.well-known/oauth-authorization-server"))

    # OIDC 1.0 fallback (appends to full URL per OIDC spec)
    # https://openid.net/specs/openid-connect-discovery-1_0.html
    urls.append(urljoin(base_url, "/.well-known/openid-configuration"))

    return urls

handle_protected_resource_response async

handle_protected_resource_response(
    response: Response,
) -> ProtectedResourceMetadata | None

Handle protected resource metadata discovery response.

Per SEP-985, supports fallback when discovery fails at one URL.

Returns:

Type Description
ProtectedResourceMetadata | None

ProtectedResourceMetadata if successfully discovered, None if we should try next URL

Source code in src/mcp/client/auth/utils.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
async def handle_protected_resource_response(
    response: Response,
) -> ProtectedResourceMetadata | None:
    """Handle protected resource metadata discovery response.

    Per SEP-985, supports fallback when discovery fails at one URL.

    Returns:
        ProtectedResourceMetadata if successfully discovered, None if we should try next URL
    """
    if response.status_code == 200:
        try:
            content = await response.aread()
            metadata = ProtectedResourceMetadata.model_validate_json(content)
            return metadata

        except ValidationError:  # pragma: no cover
            # Invalid metadata - try next URL
            return None
    else:
        # Not found - try next URL in fallback chain
        return None

validate_authorization_response_iss

validate_authorization_response_iss(
    iss: str | None, oauth_metadata: OAuthMetadata | None
) -> None

Validate the RFC 9207 iss authorization-response parameter.

Per RFC 9207 section 2.4, the client compares iss against the issuer of the authorization server the request was sent to, using simple string comparison (RFC 3986 section 6.2.1, i.e. without URL normalization), and rejects on mismatch. A response that omits iss is rejected only when the server advertised support via authorization_response_iss_parameter_supported.

Raises:

Type Description
OAuthFlowError

If iss is present and does not match, or is absent when the authorization server advertised support.

Source code in src/mcp/client/auth/utils.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def validate_authorization_response_iss(iss: str | None, oauth_metadata: OAuthMetadata | None) -> None:
    """Validate the RFC 9207 `iss` authorization-response parameter.

    Per RFC 9207 section 2.4, the client compares `iss` against the issuer of the
    authorization server the request was sent to, using simple string comparison
    (RFC 3986 section 6.2.1, i.e. without URL normalization), and rejects on mismatch.
    A response that omits `iss` is rejected only when the server advertised support via
    `authorization_response_iss_parameter_supported`.

    Raises:
        OAuthFlowError: If `iss` is present and does not match, or is absent when the
            authorization server advertised support.
    """
    expected = str(oauth_metadata.issuer) if oauth_metadata else None

    if iss is not None:
        if iss != expected:
            raise OAuthFlowError(f"Authorization response iss mismatch: {iss} != {expected}")
        return

    if oauth_metadata is not None and oauth_metadata.authorization_response_iss_parameter_supported:
        raise OAuthFlowError("Authorization response missing iss parameter advertised by the authorization server")

validate_metadata_issuer

validate_metadata_issuer(
    oauth_metadata: OAuthMetadata, expected_issuer: str
) -> None

Validate that authorization server metadata issuer matches the discovery issuer.

Per RFC 8414 section 3.3 / SEP-2468, the issuer in the metadata must match the issuer used to construct the well-known URL, compared as a simple string (RFC 3986 section 6.2.1).

Raises:

Type Description
OAuthFlowError

If the metadata issuer does not match expected_issuer.

Source code in src/mcp/client/auth/utils.py
260
261
262
263
264
265
266
267
268
269
270
271
272
def validate_metadata_issuer(oauth_metadata: OAuthMetadata, expected_issuer: str) -> None:
    """Validate that authorization server metadata `issuer` matches the discovery issuer.

    Per RFC 8414 section 3.3 / SEP-2468, the `issuer` in the metadata must match the issuer
    used to construct the well-known URL, compared as a simple string (RFC 3986 section 6.2.1).

    Raises:
        OAuthFlowError: If the metadata issuer does not match `expected_issuer`.
    """
    if str(oauth_metadata.issuer) != expected_issuer:
        raise OAuthFlowError(
            f"Authorization server metadata issuer mismatch: {oauth_metadata.issuer} != {expected_issuer}"
        )

create_client_registration_request

create_client_registration_request(
    auth_server_metadata: OAuthMetadata | None,
    client_metadata: OAuthClientMetadata,
    auth_base_url: str,
) -> Request

Build a client registration request.

Source code in src/mcp/client/auth/utils.py
279
280
281
282
283
284
285
286
287
288
289
290
291
def create_client_registration_request(
    auth_server_metadata: OAuthMetadata | None, client_metadata: OAuthClientMetadata, auth_base_url: str
) -> Request:
    """Build a client registration request."""

    if auth_server_metadata and auth_server_metadata.registration_endpoint:
        registration_url = str(auth_server_metadata.registration_endpoint)
    else:
        registration_url = urljoin(auth_base_url, "/register")

    registration_data = client_metadata.model_dump(by_alias=True, mode="json", exclude_none=True)

    return Request("POST", registration_url, json=registration_data, headers={"Content-Type": "application/json"})

handle_registration_response async

handle_registration_response(
    response: Response,
) -> OAuthClientInformationFull

Handle registration response.

Source code in src/mcp/client/auth/utils.py
294
295
296
297
298
299
300
301
302
303
304
305
async def handle_registration_response(response: Response) -> OAuthClientInformationFull:
    """Handle registration response."""
    if response.status_code not in (200, 201):
        await response.aread()
        raise OAuthRegistrationError(f"Registration failed: {response.status_code} {response.text}")

    try:
        content = await response.aread()
        client_info = OAuthClientInformationFull.model_validate_json(content)
        return client_info
    except ValidationError as e:  # pragma: no cover
        raise OAuthRegistrationError(f"Invalid registration response: {e}")

is_valid_client_metadata_url

is_valid_client_metadata_url(url: str | None) -> bool

Validate that a URL is suitable for use as a client_id (CIMD).

The URL must be HTTPS with a non-root pathname.

Parameters:

Name Type Description Default
url str | None

The URL to validate

required

Returns:

Type Description
bool

True if the URL is a valid HTTPS URL with a non-root pathname

Source code in src/mcp/client/auth/utils.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def is_valid_client_metadata_url(url: str | None) -> bool:
    """Validate that a URL is suitable for use as a client_id (CIMD).

    The URL must be HTTPS with a non-root pathname.

    Args:
        url: The URL to validate

    Returns:
        True if the URL is a valid HTTPS URL with a non-root pathname
    """
    if not url:
        return False
    try:
        parsed = urlparse(url)
        return parsed.scheme == "https" and parsed.path not in ("", "/")
    except Exception:
        return False

credentials_match_issuer

credentials_match_issuer(
    client_info: OAuthClientInformationFull,
    issuer: str,
    client_metadata_url: str | None,
) -> bool

Whether stored client credentials may be reused against issuer (SEP-2352).

A URL-based client ID (CIMD) is portable across authorization servers — the same self-hosted document is resolved by whichever server is in use — so it always matches; CIMD is identified by the client ID being the configured client_metadata_url, not by URL shape (a registration server may also issue URL-shaped IDs that are bound to it). Credentials with a recorded issuer match only when it equals issuer (simple string comparison). Credentials with no recorded issuer (pre-registered, or stored before issuer binding existed) carry no binding to enforce and are left as-is.

Source code in src/mcp/client/auth/utils.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def credentials_match_issuer(
    client_info: OAuthClientInformationFull, issuer: str, client_metadata_url: str | None
) -> bool:
    """Whether stored client credentials may be reused against `issuer` (SEP-2352).

    A URL-based client ID (CIMD) is portable across authorization servers — the same self-hosted
    document is resolved by whichever server is in use — so it always matches; CIMD is identified
    by the client ID being the configured `client_metadata_url`, not by URL shape (a registration
    server may also issue URL-shaped IDs that are bound to it). Credentials with a recorded issuer
    match only when it equals `issuer` (simple string comparison). Credentials with no recorded
    issuer (pre-registered, or stored before issuer binding existed) carry no binding to enforce
    and are left as-is.
    """
    if client_metadata_url is not None and client_info.client_id == client_metadata_url:
        return True
    if client_info.issuer is None:
        return True
    return client_info.issuer == issuer

should_use_client_metadata_url

should_use_client_metadata_url(
    oauth_metadata: OAuthMetadata | None,
    client_metadata_url: str | None,
) -> bool

Determine if URL-based client ID (CIMD) should be used instead of DCR.

URL-based client IDs should be used when: 1. The server advertises client_id_metadata_document_supported=True 2. The client has a valid client_metadata_url configured

Parameters:

Name Type Description Default
oauth_metadata OAuthMetadata | None

OAuth authorization server metadata

required
client_metadata_url str | None

URL-based client ID (already validated)

required

Returns:

Type Description
bool

True if CIMD should be used, False if DCR should be used

Source code in src/mcp/client/auth/utils.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def should_use_client_metadata_url(
    oauth_metadata: OAuthMetadata | None,
    client_metadata_url: str | None,
) -> bool:
    """Determine if URL-based client ID (CIMD) should be used instead of DCR.

    URL-based client IDs should be used when:
    1. The server advertises client_id_metadata_document_supported=True
    2. The client has a valid client_metadata_url configured

    Args:
        oauth_metadata: OAuth authorization server metadata
        client_metadata_url: URL-based client ID (already validated)

    Returns:
        True if CIMD should be used, False if DCR should be used
    """
    if not client_metadata_url:
        return False

    if not oauth_metadata:
        return False

    return oauth_metadata.client_id_metadata_document_supported is True

create_client_info_from_metadata_url

create_client_info_from_metadata_url(
    client_metadata_url: str,
    redirect_uris: list[AnyUrl] | None = None,
) -> OAuthClientInformationFull

Create client information using a URL-based client ID (CIMD).

When using URL-based client IDs, the URL itself becomes the client_id and no client_secret is used (token_endpoint_auth_method="none").

Parameters:

Name Type Description Default
client_metadata_url str

The URL to use as the client_id

required
redirect_uris list[AnyUrl] | None

The redirect URIs from the client metadata (passed through for compatibility with OAuthClientInformationFull which inherits from OAuthClientMetadata)

None

Returns:

Type Description
OAuthClientInformationFull

OAuthClientInformationFull with the URL as client_id

Source code in src/mcp/client/auth/utils.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def create_client_info_from_metadata_url(
    client_metadata_url: str, redirect_uris: list[AnyUrl] | None = None
) -> OAuthClientInformationFull:
    """Create client information using a URL-based client ID (CIMD).

    When using URL-based client IDs, the URL itself becomes the client_id
    and no client_secret is used (token_endpoint_auth_method="none").

    Args:
        client_metadata_url: The URL to use as the client_id
        redirect_uris: The redirect URIs from the client metadata (passed through for
            compatibility with OAuthClientInformationFull which inherits from OAuthClientMetadata)

    Returns:
        OAuthClientInformationFull with the URL as client_id
    """
    return OAuthClientInformationFull(
        client_id=client_metadata_url,
        token_endpoint_auth_method="none",
        redirect_uris=redirect_uris,
    )

handle_token_response_scopes async

handle_token_response_scopes(
    response: Response,
) -> OAuthToken

Parse and validate a token response.

Parses token response JSON. Callers should check response.status_code before calling.

Parameters:

Name Type Description Default
response Response

HTTP response from token endpoint (status already checked by caller)

required

Returns:

Type Description
OAuthToken

Validated OAuthToken model

Raises:

Type Description
OAuthTokenError

If response JSON is invalid

Source code in src/mcp/client/auth/utils.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
async def handle_token_response_scopes(
    response: Response,
) -> OAuthToken:
    """Parse and validate a token response.

    Parses token response JSON. Callers should check response.status_code before calling.

    Args:
        response: HTTP response from token endpoint (status already checked by caller)

    Returns:
        Validated OAuthToken model

    Raises:
        OAuthTokenError: If response JSON is invalid
    """
    try:
        content = await response.aread()
        token_response = OAuthToken.model_validate_json(content)
        return token_response
    except ValidationError as e:  # pragma: no cover
        raise OAuthTokenError(f"Invalid token response: {e}")