{Core} Workaround empty and multiple scopes provided by old Track 2 SDKs (#15806)

This commit is contained in:
Jiashuo Li 2020-11-06 14:10:46 +08:00 коммит произвёл GitHub
Родитель 6291b001f2
Коммит dbf49d6ef5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 57 добавлений и 9 удалений

Просмотреть файл

@ -68,14 +68,7 @@ class AdalAuthentication(Authentication): # pylint: disable=too-few-public-meth
def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
logger.debug("AdalAuthentication.get_token invoked by Track 2 SDK with scopes=%s", scopes)
# Deal with an old Track 2 SDK issue where the default credential_scopes is extended with
# custom credential_scopes. Instead, credential_scopes should be replaced by custom credential_scopes.
# https://github.com/Azure/azure-sdk-for-python/issues/12947
# We simply remove the first one if there are multiple scopes provided.
if len(scopes) > 1:
scopes = scopes[1:]
_, token, full_token, _ = self._get_token(scopes_to_resource(scopes))
_, token, full_token, _ = self._get_token(_try_scopes_to_resource(scopes))
try:
return AccessToken(token, int(full_token['expiresIn'] + time.time()))
except KeyError: # needed to deal with differing unserialized MSI token payload
@ -106,7 +99,10 @@ class MSIAuthenticationWrapper(MSIAuthentication):
# This method is exposed for Azure Core. Add *scopes, **kwargs to fit azure.core requirement
def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
logger.debug("MSIAuthenticationWrapper.get_token invoked by Track 2 SDK with scopes=%s", scopes)
self.resource = scopes_to_resource(scopes)
resource = _try_scopes_to_resource(scopes)
if resource:
# If available, use resource provided by SDK
self.resource = resource
self.set_token()
return AccessToken(self.token['access_token'], int(self.token['expires_on']))
@ -135,3 +131,25 @@ class MSIAuthenticationWrapper(MSIAuthentication):
def signed_session(self, session=None):
logger.debug("MSIAuthenticationWrapper.signed_session invoked by Track 1 SDK")
super().signed_session(session)
def _try_scopes_to_resource(scopes):
"""Wrap scopes_to_resource to workaround some SDK issues."""
# Track 2 SDKs generated before https://github.com/Azure/autorest.python/pull/239 don't maintain
# credential_scopes and call `get_token` with empty scopes.
# As a workaround, return None so that the CLI-managed resource is used.
if not scopes:
logger.debug("No scope is provided by the SDK, use the CLI-managed resource.")
return None
# Track 2 SDKs generated before https://github.com/Azure/autorest.python/pull/745 extend default
# credential_scopes with custom credential_scopes. Instead, credential_scopes should be replaced by
# custom credential_scopes. https://github.com/Azure/azure-sdk-for-python/issues/12947
# As a workaround, remove the first one if there are multiple scopes provided.
if len(scopes) > 1:
logger.debug("Multiple scopes are provided by the SDK, discarding the first one: %s", scopes[0])
return scopes_to_resource(scopes[1:])
# Exactly only one scope is provided
return scopes_to_resource(scopes)

Просмотреть файл

@ -0,0 +1,30 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint: disable=line-too-long
import unittest
from azure.cli.core.adal_authentication import _try_scopes_to_resource
class TestUtils(unittest.TestCase):
def test_try_scopes_to_resource(self):
# Test no scopes
self.assertIsNone(_try_scopes_to_resource(()))
self.assertIsNone(_try_scopes_to_resource([]))
self.assertIsNone(_try_scopes_to_resource(None))
# Test multiple scopes, with the first one discarded
resource = _try_scopes_to_resource(("https://management.core.windows.net//.default",
"https://management.core.chinacloudapi.cn//.default"))
self.assertEqual(resource, "https://management.core.chinacloudapi.cn/")
# Test single scopes (the correct usage)
resource = _try_scopes_to_resource(("https://management.core.chinacloudapi.cn//.default",))
self.assertEqual(resource, "https://management.core.chinacloudapi.cn/")
if __name__ == '__main__':
unittest.main()