зеркало из
1
0
Форкнуть 0
* Add refresh token logic when exists.
* Use AAD constants
This commit is contained in:
toshetah 2018-08-05 11:52:15 +03:00 коммит произвёл GitHub
Родитель c7c78cf262
Коммит 9fb686987d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 29 добавлений и 15 удалений

Просмотреть файл

@ -3,7 +3,9 @@
from datetime import timedelta, datetime
import webbrowser
import dateutil.parser
from adal import AuthenticationContext
from adal.constants import TokenResponseFields, OAuth2DeviceCodeResponseParameters, AADConstants
class _AadHelper(object):
@ -17,7 +19,9 @@ class _AadHelper(object):
authority=None,
):
self.adal_context = AuthenticationContext(
"https://login.windows.net/{0}".format(authority or "microsoft.com")
"https://{0}/{1}".format(
AADConstants.WORLD_WIDE_AUTHORITY, authority or "microsoft.com"
)
)
self.kusto_cluster = kusto_cluster
self.client_id = client_id or "db662dc1-0cfe-4e1c-a843-19a68e65be58"
@ -27,28 +31,38 @@ class _AadHelper(object):
def acquire_token(self):
"""A method to acquire tokens from AAD."""
token_response = self.adal_context.acquire_token(
self.kusto_cluster, self.username, self.client_id
token = self.adal_context.acquire_token(self.kusto_cluster, self.username, self.client_id)
if token is not None:
expiration_date = dateutil.parser.parse(token[TokenResponseFields.EXPIRES_ON])
if expiration_date > datetime.now() + timedelta(minutes=5):
return _get_header(token)
elif TokenResponseFields.REFRESH_TOKEN in token:
token = self.adal_context.acquire_token_with_refresh_token(
token[TokenResponseFields.REFRESH_TOKEN], self.client_id, self.kusto_cluster
)
if token_response is not None:
expiration_date = dateutil.parser.parse(token_response["expiresOn"])
if expiration_date > datetime.utcnow() + timedelta(minutes=5):
return token_response["accessToken"]
if token is not None:
return _get_header(token)
if self.client_secret is not None and self.client_id is not None:
token_response = self.adal_context.acquire_token_with_client_credentials(
token = self.adal_context.acquire_token_with_client_credentials(
self.kusto_cluster, self.client_id, self.client_secret
)
elif self.username is not None and self.password is not None:
token_response = self.adal_context.acquire_token_with_username_password(
token = self.adal_context.acquire_token_with_username_password(
self.kusto_cluster, self.username, self.password, self.client_id
)
else:
code = self.adal_context.acquire_user_code(self.kusto_cluster, self.client_id)
print(code["message"])
webbrowser.open(code["verification_url"])
token_response = self.adal_context.acquire_token_with_device_code(
print(code[OAuth2DeviceCodeResponseParameters.MESSAGE])
webbrowser.open(code[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL])
token = self.adal_context.acquire_token_with_device_code(
self.kusto_cluster, code, self.client_id
)
return _get_header(token)
return token_response["accessToken"]
@staticmethod
def _get_header(token):
return "{0} {1}".format(
token[TokenResponseFields.TOKEN_TYPE], token[TokenResponseFields.ACCESS_TOKEN]
)

Просмотреть файл

@ -369,7 +369,7 @@ class KustoClient(object):
access_token = self._aad_helper.acquire_token()
request_headers = {
"Authorization": "Bearer {0}".format(access_token),
"Authorization": access_token,
"Accept": "application/json",
"Accept-Encoding": "gzip,deflate",
"Content-Type": "application/json; charset=utf-8",