diff --git a/msal/authority.py b/msal/authority.py index 96204966..5550be74 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -30,6 +30,20 @@ ] _CIAM_DOMAIN_SUFFIX = ".ciamlogin.com" +# Trusted issuer hosts for OIDC issuer validation. +# These are Microsoft's well-known identity provider hosts. +TRUSTED_ISSUER_HOSTS = frozenset([ + AZURE_PUBLIC, # Microsoft Azure Worldwide + "login.microsoft.com", # Microsoft Azure Worldwide + "login.windows.net", # Microsoft Azure Worldwide (validation scenarios) + "sts.windows.net", # Microsoft STS + "login.partner.microsoftonline.cn", # Microsoft Azure China + AZURE_CHINA, # Microsoft Azure China (legacy) + "login.microsoftonline.de", # Microsoft Azure Germany + "login-us.microsoftonline.com", # Microsoft Azure US Government (legacy) + AZURE_US_GOVERNMENT, # Microsoft Azure US Government + "login.usgovcloudapi.net", # Microsoft Azure US Government +]) class AuthorityBuilder(object): def __init__(self, instance, tenant): @@ -93,7 +107,8 @@ def __init__( .format(authority_url) ) + " Also please double check your tenant name or GUID is correct." raise ValueError(error_message) - openid_config.pop("issuer", None) # Not used in MSAL.py, so remove it therefore no need to validate it + if oidc_authority_url: + _validate_issuer(openid_config.get("issuer"), oidc_authority_url) logger.debug( 'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config) self.authorization_endpoint = openid_config['authorization_endpoint'] @@ -224,3 +239,49 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs): raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text)) +def _validate_issuer(issuer, authority_url): + """Validate that the OIDC issuer matches the authority or is from a trusted source. + + Per OIDC Discovery spec, the issuer returned MUST match the authority. + We also allow issuers from well-known trusted Microsoft sources, including + regional variants (e.g., westus2.login.microsoft.com). + + :param issuer: The issuer claim from the OIDC discovery response. + :param authority_url: The OIDC authority URL provided by the caller. + :raises ValueError: If issuer is missing or not from authority/trusted sources. + """ + if not issuer: + raise ValueError( + "The OIDC discovery response from {} is missing the required 'issuer' claim." + .format(authority_url)) + + # Normalize URLs for comparison (remove trailing slashes) + normalized_issuer = issuer.rstrip("/") + normalized_authority = authority_url.rstrip("/") + + # Case 1: Exact match (most common case) + if normalized_issuer == normalized_authority: + return + + # Case 2: Check if issuer is from a trusted source + issuer_parsed = urlparse(issuer) + issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None + if issuer_host: + # Direct lookup - O(1) + if issuer_host in TRUSTED_ISSUER_HOSTS: + return + # Check for regional pattern: {region}.{trusted_base} + # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com" + # Find the first dot and check if the remainder is a trusted host + dot_index = issuer_host.find(".") + if dot_index > 0: + potential_base = issuer_host[dot_index + 1:] # e.g., "login.microsoft.com" + region = issuer_host[:dot_index] # e.g., "westus2" + # O(1) lookup instead of O(n) iteration + if potential_base in TRUSTED_ISSUER_HOSTS and "." not in region: + return + + raise ValueError( + "The issuer '{}' from the OIDC discovery response does not match " + "the authority '{}' and is not from a trusted source." + .format(issuer, authority_url)) diff --git a/tests/test_application.py b/tests/test_application.py index 556750fa..acbefa73 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -22,6 +22,7 @@ _OIDC_DISCOVERY = "msal.authority.tenant_discovery" _OIDC_DISCOVERY_MOCK = Mock(return_value={ + "issuer": "https://contoso.com/placeholder", "authorization_endpoint": "https://contoso.com/placeholder", "token_endpoint": "https://contoso.com/placeholder", }) @@ -794,6 +795,7 @@ def test_should_fallback_when_pymsalruntime_failed_to_initialize_broker(self): @patch("sys.platform", new="darwin") # Pretend running on Mac. @patch("msal.authority.tenant_discovery", new=Mock(return_value={ + "issuer": "https://contoso.com/placeholder", "authorization_endpoint": "https://contoso.com/placeholder", "token_endpoint": "https://contoso.com/placeholder", })) @@ -846,7 +848,7 @@ def test_should_use_broker_when_disabling_instance_discovery(self): def test_should_fallback_to_non_broker_when_using_oidc_authority(self): app = msal.PublicClientApplication( "client_id", - oidc_authority="https://contoso.com/path", + oidc_authority="https://contoso.com/placeholder", enable_broker_on_mac=True, ) self.assertFalse(app._enable_broker) diff --git a/tests/test_authority.py b/tests/test_authority.py index 312037ff..a4da7ce8 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -1,4 +1,6 @@ import os + +from msal.authority import _validate_issuer try: from unittest.mock import patch except: @@ -101,11 +103,14 @@ def test_authority_with_path_should_be_used_as_is(self, oidc_discovery): @patch("msal.authority._instance_discovery") @patch("msal.authority.tenant_discovery", return_value={ + "issuer": "https://contoso.com/tenant", "authorization_endpoint": "https://contoso.com/authorize", "token_endpoint": "https://contoso.com/token", }) class OidcAuthorityTestCase(unittest.TestCase): authority = "https://contoso.com/tenant" + authorization_endpoint = "https://contoso.com/authorize" + token_endpoint = "https://contoso.com/token" def setUp(self): # setUp() gives subclass a dynamic setup based on their authority @@ -121,8 +126,8 @@ def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery( a = Authority(None, c, oidc_authority_url=self.authority) instance_discovery.assert_not_called() oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c) - self.assertEqual(a.authorization_endpoint, 'https://contoso.com/authorize') - self.assertEqual(a.token_endpoint, 'https://contoso.com/token') + self.assertEqual(a.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(a.token_endpoint, self.token_endpoint) def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( self, oidc_discovery, instance_discovery): @@ -132,24 +137,57 @@ def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( oidc_discovery.assert_called_once_with( self.oidc_discovery_endpoint, app.http_client) self.assertEqual( - app.authority.authorization_endpoint, 'https://contoso.com/authorize') - self.assertEqual(app.authority.token_endpoint, 'https://contoso.com/token') + app.authority.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(app.authority.token_endpoint, self.token_endpoint) + +class DstsAuthorityTestCase(unittest.TestCase): + # Test cases for dSTS authority (no longer inherits from OidcAuthorityTestCase to avoid patch conflicts) + authority = 'https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common' + authorization_endpoint = "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize" + token_endpoint = "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token" -class DstsAuthorityTestCase(OidcAuthorityTestCase): - # Inherits OidcAuthority's test cases and run them with a dSTS authority - authority = ( # dSTS is single tenanted with a tenant placeholder - 'https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common') - authorization_endpoint = ( - "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize") - token_endpoint = ( - "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token") + def setUp(self): + self.oidc_discovery_endpoint = self.authority + "/.well-known/openid-configuration" + + @patch("msal.authority._instance_discovery") + @patch("msal.authority.tenant_discovery", return_value={ + "issuer": "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common", + "authorization_endpoint": "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize", + "token_endpoint": "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token", + }) + def test_authority_obj_should_do_oidc_discovery_and_skip_instance_discovery( + self, oidc_discovery, instance_discovery): + c = MinimalHttpClient() + a = Authority(None, c, oidc_authority_url=self.authority) + instance_discovery.assert_not_called() + oidc_discovery.assert_called_once_with(self.oidc_discovery_endpoint, c) + self.assertEqual(a.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(a.token_endpoint, self.token_endpoint) @patch("msal.authority._instance_discovery") @patch("msal.authority.tenant_discovery", return_value={ - "authorization_endpoint": authorization_endpoint, - "token_endpoint": token_endpoint, - }) # We need to create new patches (i.e. mocks) for non-inherited test cases + "issuer": "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common", + "authorization_endpoint": "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize", + "token_endpoint": "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token", + }) + def test_application_obj_should_do_oidc_discovery_and_skip_instance_discovery( + self, oidc_discovery, instance_discovery): + app = msal.ClientApplication( + "id", authority=None, oidc_authority=self.authority) + instance_discovery.assert_not_called() + oidc_discovery.assert_called_once_with( + self.oidc_discovery_endpoint, app.http_client) + self.assertEqual( + app.authority.authorization_endpoint, self.authorization_endpoint) + self.assertEqual(app.authority.token_endpoint, self.token_endpoint) + + @patch("msal.authority._instance_discovery") + @patch("msal.authority.tenant_discovery", return_value={ + "issuer": "https://test-instance1-dsts.dsts.core.azure-test.net/dstsv2/common", + "authorization_endpoint": "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/authorize", + "token_endpoint": "https://some.url.dsts.core.azure-test.net/dstsv2/common/oauth2/token", + }) def test_application_obj_should_accept_dsts_url_as_an_authority( self, oidc_discovery, instance_discovery): app = msal.ClientApplication("id", authority=self.authority) @@ -278,3 +316,302 @@ def _test_turning_off_instance_discovery_should_skip_authority_validation_and_in app.get_accounts() # This could make an instance metadata call for authority aliases instance_metadata.assert_not_called() + +class TestOidcIssuerValidation(unittest.TestCase): + """Tests for OIDC issuer validation against authority and trusted sources.""" + + # ===== Exact Match Tests ===== + + def test_issuer_exact_match(self): + _validate_issuer("https://example.com/tenant", "https://example.com/tenant") + + def test_issuer_exact_match_with_issuer_trailing_slash(self): + _validate_issuer("https://example.com/tenant/", "https://example.com/tenant") + + def test_issuer_exact_match_with_authority_trailing_slash(self): + _validate_issuer("https://example.com/tenant", "https://example.com/tenant/") + + def test_issuer_exact_match_both_trailing_slashes(self): + _validate_issuer("https://example.com/tenant/", "https://example.com/tenant/") + + def test_issuer_exact_match_with_path_segments(self): + _validate_issuer( + "https://example.com/tenant/v2.0", + "https://example.com/tenant/v2.0") + + def test_issuer_exact_match_guid_tenant(self): + _validate_issuer( + "https://example.com/12345678-1234-1234-1234-123456789012", + "https://example.com/12345678-1234-1234-1234-123456789012") + + # ===== Trusted Hosts Tests ===== + + def test_issuer_from_all_trusted_hosts(self): + for host in TRUSTED_ISSUER_HOSTS: + with self.subTest(host=host): + _validate_issuer( + "https://{}/tenant-id".format(host), + "https://custom.example.com/tenant-id") + + def test_issuer_login_microsoftonline_com(self): + _validate_issuer( + "https://login.microsoftonline.com/contoso.com", + "https://custom-domain.com/contoso.com") + + def test_issuer_login_microsoft_com(self): + _validate_issuer( + "https://login.microsoft.com/tenant-guid", + "https://my-app.contoso.com/tenant-guid") + + def test_issuer_login_windows_net(self): + _validate_issuer( + "https://login.windows.net/tenant", + "https://example.com/tenant") + + def test_issuer_sts_windows_net(self): + _validate_issuer( + "https://sts.windows.net/tenant-id/", + "https://example.com/tenant-id") + + def test_issuer_login_chinacloudapi_cn(self): + _validate_issuer( + "https://login.chinacloudapi.cn/tenant", + "https://myapp.cn/tenant") + + def test_issuer_login_partner_microsoftonline_cn(self): + _validate_issuer( + "https://login.partner.microsoftonline.cn/tenant", + "https://myapp.cn/tenant") + + def test_issuer_login_microsoftonline_de(self): + _validate_issuer( + "https://login.microsoftonline.de/tenant", + "https://myapp.de/tenant") + + def test_issuer_login_microsoftonline_us(self): + _validate_issuer( + "https://login.microsoftonline.us/tenant", + "https://myapp.gov/tenant") + + def test_issuer_login_us_microsoftonline_com(self): + _validate_issuer( + "https://login-us.microsoftonline.com/tenant", + "https://myapp.gov/tenant") + + def test_issuer_login_usgovcloudapi_net(self): + _validate_issuer( + "https://login.usgovcloudapi.net/tenant", + "https://myapp.gov/tenant") + + # ===== Regional Host Tests ===== + + def test_issuer_regional_westus2_login_microsoft_com(self): + _validate_issuer( + "https://westus2.login.microsoft.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_eastus_login_microsoft_com(self): + _validate_issuer( + "https://eastus.login.microsoft.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_westeurope_login_microsoftonline_com(self): + _validate_issuer( + "https://westeurope.login.microsoftonline.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_southeastasia_login_microsoftonline_com(self): + _validate_issuer( + "https://southeastasia.login.microsoftonline.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_uksouth_login_microsoft_com(self): + _validate_issuer( + "https://uksouth.login.microsoft.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_japaneast_login_microsoftonline_com(self): + _validate_issuer( + "https://japaneast.login.microsoftonline.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_australiaeast_login_microsoft_com(self): + _validate_issuer( + "https://australiaeast.login.microsoft.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_brazilsouth_login_microsoftonline_com(self): + _validate_issuer( + "https://brazilsouth.login.microsoftonline.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_canadacentral_login_microsoft_com(self): + _validate_issuer( + "https://canadacentral.login.microsoft.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_issuer_regional_centralindia_login_microsoftonline_com(self): + _validate_issuer( + "https://centralindia.login.microsoftonline.com/tenant-id", + "https://custom.example.com/tenant-id") + + def test_all_regional_bases_support_region_prefix(self): + regions = ["westus2", "eastus", "westeurope", "northeurope", "uksouth"] + for base in TRUSTED_ISSUER_HOSTS: + for region in regions: + with self.subTest(region=region, base=base): + _validate_issuer( + "https://{}.{}/tenant".format(region, base), + "https://example.com/tenant") + + # ===== Failure Cases - Missing/Empty Issuer ===== + + def test_missing_issuer_none(self): + with self.assertRaises(ValueError) as cm: + _validate_issuer(None, "https://example.com/tenant") + self.assertIn("missing", str(cm.exception).lower()) + self.assertIn("issuer", str(cm.exception).lower()) + + def test_missing_issuer_empty_string(self): + with self.assertRaises(ValueError) as cm: + _validate_issuer("", "https://example.com/tenant") + self.assertIn("missing", str(cm.exception).lower()) + + # ===== Failure Cases - Untrusted Issuers ===== + + def test_untrusted_issuer_random_domain(self): + with self.assertRaises(ValueError) as cm: + _validate_issuer("https://evil.com/tenant", "https://example.com/tenant") + self.assertIn("not from a trusted source", str(cm.exception)) + + def test_untrusted_issuer_similar_looking_domain(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://login.microsoftonline.com.evil.com/tenant", + "https://example.com/tenant") + + def test_untrusted_issuer_subdomain_of_untrusted(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://login.microsoft.com.attacker.com/tenant", + "https://example.com/tenant") + + def test_untrusted_issuer_typosquat_microsoftonline(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://login.micros0ftonline.com/tenant", + "https://example.com/tenant") + + def test_untrusted_issuer_typosquat_microsoft(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://login.micr0soft.com/tenant", + "https://example.com/tenant") + + def test_untrusted_issuer_different_tenant(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://example.com/different-tenant", + "https://example.com/my-tenant") + + def test_untrusted_issuer_localhost(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://localhost/tenant", + "https://example.com/tenant") + + def test_untrusted_issuer_ip_address(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://192.168.1.1/tenant", + "https://example.com/tenant") + + def test_untrusted_issuer_private_domain(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://internal.corp.local/tenant", + "https://example.com/tenant") + + # ===== Failure Cases - Regional Pattern Abuse ===== + + def test_untrusted_regional_pattern_non_trusted_base(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://westus2.evil.com/tenant", + "https://example.com/tenant") + + def test_untrusted_double_region_prefix(self): + with self.assertRaises(ValueError): + _validate_issuer( + "https://westus2.eastus.login.microsoft.com/tenant", + "https://example.com/tenant") + + # ===== Edge Cases - URL Variations ===== + + def test_issuer_with_port_number_exact_match(self): + _validate_issuer( + "https://example.com:443/tenant", + "https://example.com:443/tenant") + + def test_issuer_with_port_trusted_host(self): + _validate_issuer( + "https://login.microsoftonline.com:443/tenant", + "https://example.com/tenant") + + def test_issuer_case_sensitivity_host_uppercase(self): + _validate_issuer( + "https://LOGIN.MICROSOFTONLINE.COM/tenant", + "https://example.com/tenant") + + def test_issuer_case_sensitivity_host_mixed(self): + _validate_issuer( + "https://Login.MicrosoftOnline.Com/tenant", + "https://example.com/tenant") + + def test_issuer_with_query_string_exact_match(self): + _validate_issuer( + "https://example.com/tenant?foo=bar", + "https://example.com/tenant?foo=bar") + + # ===== Edge Cases - B2C and CIAM Patterns ===== + + def test_issuer_b2c_pattern_trusted_host(self): + _validate_issuer( + "https://login.microsoftonline.com/te/contoso.onmicrosoft.com/b2c_1_signin/v2.0", + "https://contoso.b2clogin.com/contoso.onmicrosoft.com/b2c_1_signin/v2.0") + + def test_issuer_ciam_pattern_trusted_host(self): + _validate_issuer( + "https://login.microsoftonline.com/12345678-1234-1234-1234-123456789012/v2.0", + "https://contoso.ciamlogin.com/12345678-1234-1234-1234-123456789012") + + # ===== Edge Cases - Version Paths ===== + + def test_issuer_v1_endpoint(self): + _validate_issuer( + "https://sts.windows.net/tenant-id/", + "https://example.com/tenant-id") + + def test_issuer_v2_endpoint(self): + _validate_issuer( + "https://login.microsoftonline.com/tenant-id/v2.0", + "https://example.com/tenant-id") + + # ===== Boundary Tests ===== + + def test_issuer_very_long_tenant_id(self): + long_tenant = "a" * 500 + _validate_issuer( + "https://login.microsoftonline.com/{}".format(long_tenant), + "https://example.com/other") + + def test_issuer_minimal_path(self): + _validate_issuer( + "https://login.microsoftonline.com/", + "https://example.com/tenant") + + def test_issuer_special_characters_in_tenant(self): + _validate_issuer( + "https://login.microsoftonline.com/tenant-with-dash_and_underscore.onmicrosoft.com", + "https://example.com/tenant") +