Merge pull request #63 from Azure/cs_auth
Add Cognitive Services auth class
This commit is contained in:
Коммит
4f0d10354d
|
@ -135,3 +135,51 @@ class OAuthTokenAuthentication(BasicTokenAuthentication):
|
|||
:rtype: requests.Session.
|
||||
"""
|
||||
return oauth.OAuth2Session(self.id, token=self.token)
|
||||
|
||||
class ApiKeyCredentials(Authentication):
|
||||
"""Represent the ApiKey feature of Swagger.
|
||||
|
||||
Dict should be dict[str, str] to be accepted by requests.
|
||||
|
||||
:param dict[str, str] in_headers: Headers part of the ApiKey
|
||||
:param dict[str, str] in_query: ApiKey in the query as parameters.
|
||||
"""
|
||||
def __init__(self, in_headers=None, in_query=None):
|
||||
if in_headers is None:
|
||||
in_headers = {}
|
||||
if in_query is None:
|
||||
in_query = {}
|
||||
|
||||
if not in_headers and not in_query:
|
||||
raise ValueError("You need to define in_headers or in_query")
|
||||
|
||||
self.in_headers = in_headers
|
||||
self.in_query = in_query
|
||||
|
||||
def signed_session(self):
|
||||
"""Create requests session with ApiKey.
|
||||
|
||||
:rtype: requests.Session.
|
||||
"""
|
||||
session = super(ApiKeyCredentials, self).signed_session()
|
||||
session.headers.update(self.in_headers)
|
||||
session.params.update(self.in_query)
|
||||
return session
|
||||
|
||||
class CognitiveServicesCredentials(ApiKeyCredentials):
|
||||
"""Cognitive Services authentication.
|
||||
|
||||
:param str subscription_key: The CS subscription key
|
||||
"""
|
||||
|
||||
_subscription_key_header = 'Ocp-Apim-Subscription-Key'
|
||||
|
||||
def __init__(self, subscription_key):
|
||||
if not subscription_key:
|
||||
raise ValueError("Subscription key cannot be None")
|
||||
super(CognitiveServicesCredentials, self).__init__(
|
||||
in_headers={
|
||||
self._subscription_key_header: subscription_key,
|
||||
'X-BingApis-SDK-Client': 'Python-SDK'
|
||||
}
|
||||
)
|
||||
|
|
|
@ -40,7 +40,10 @@ except ImportError:
|
|||
from msrest.authentication import (
|
||||
BasicAuthentication,
|
||||
BasicTokenAuthentication,
|
||||
OAuthTokenAuthentication)
|
||||
OAuthTokenAuthentication,
|
||||
ApiKeyCredentials,
|
||||
CognitiveServicesCredentials
|
||||
)
|
||||
|
||||
from requests import Request
|
||||
|
||||
|
@ -93,7 +96,32 @@ class TestAuthentication(unittest.TestCase):
|
|||
|
||||
self.assertEqual(session.token, token)
|
||||
|
||||
|
||||
def test_apikey_auth(self):
|
||||
auth = ApiKeyCredentials(
|
||||
in_headers={
|
||||
'testheader' : 'testheadervalue'
|
||||
}
|
||||
)
|
||||
session = auth.signed_session()
|
||||
prep_req = session.prepare_request(self.request)
|
||||
self.assertDictContainsSubset({'testheader' : 'testheadervalue'}, prep_req.headers)
|
||||
|
||||
auth = ApiKeyCredentials(
|
||||
in_query={
|
||||
'testquery' : 'testparamvalue'
|
||||
}
|
||||
)
|
||||
session = auth.signed_session()
|
||||
prep_req = session.prepare_request(self.request)
|
||||
self.assertIn("testquery=testparamvalue", prep_req.path_url)
|
||||
|
||||
def test_cs_auth(self):
|
||||
auth = CognitiveServicesCredentials("mysubkey")
|
||||
session = auth.signed_session()
|
||||
prep_req = session.prepare_request(self.request)
|
||||
self.assertDictContainsSubset({'Ocp-Apim-Subscription-Key' : 'mysubkey'}, prep_req.headers)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче