Add support for token based authentication
This commit is contained in:
Родитель
a00fd567a7
Коммит
eb07dbe7fd
|
@ -11,6 +11,7 @@ import urllib.parse
|
|||
|
||||
import requests
|
||||
|
||||
from simple_ado.auth.ado_auth import ADOAuth
|
||||
from simple_ado.audit import ADOAuditClient
|
||||
from simple_ado.builds import ADOBuildClient
|
||||
from simple_ado.endpoints import ADOEndpointsClient
|
||||
|
@ -33,7 +34,7 @@ class ADOClient:
|
|||
"""Wrapper class around the ADO API.
|
||||
|
||||
:param tenant: The ADO tenant to connect to
|
||||
:param credentials: The credentials to use for the API connection
|
||||
:param auth: The auth details to use for the API connection
|
||||
:param user_agent: The user agent to set
|
||||
:param extra_headers: Any extra headers which should be sent with the API requests
|
||||
:param log: The logger to use for logging (a new one will be used if one is not supplied)
|
||||
|
@ -63,7 +64,7 @@ class ADOClient:
|
|||
self,
|
||||
*,
|
||||
tenant: str,
|
||||
credentials: tuple[str, str],
|
||||
auth: ADOAuth,
|
||||
user_agent: str | None = None,
|
||||
extra_headers: dict[str, str] | None = None,
|
||||
log: logging.Logger | None = None,
|
||||
|
@ -77,7 +78,7 @@ class ADOClient:
|
|||
|
||||
self.http_client = ADOHTTPClient(
|
||||
tenant=tenant,
|
||||
credentials=credentials,
|
||||
auth=auth,
|
||||
user_agent=user_agent if user_agent is not None else tenant,
|
||||
log=self.log,
|
||||
extra_headers=extra_headers,
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
"""Umbrella module for all authentication classes."""
|
||||
|
||||
from .ado_auth import ADOAuth
|
||||
from .ado_basic_auth import ADOBasicAuth
|
||||
from .ado_token_auth import ADOTokenAuth
|
|
@ -0,0 +1,13 @@
|
|||
"""Base auth class."""
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class ADOAuth(abc.ABC):
|
||||
"""Base class for authentication."""
|
||||
|
||||
def get_authorization_header(self) -> str:
|
||||
"""Get the header value.
|
||||
|
||||
:return: The header value."""
|
||||
raise NotImplementedError()
|
|
@ -0,0 +1,25 @@
|
|||
"""Basic authentication auth class."""
|
||||
|
||||
import base64
|
||||
import functools
|
||||
from simple_ado.auth.ado_auth import ADOAuth
|
||||
|
||||
|
||||
class ADOBasicAuth(ADOAuth):
|
||||
"""Username/password auth. Also supports PATs."""
|
||||
|
||||
username: str
|
||||
password: str
|
||||
|
||||
def __init__(self, username: str, password: str) -> None:
|
||||
self.username = username
|
||||
self.password = password
|
||||
|
||||
@functools.lru_cache(maxsize=1)
|
||||
def get_authorization_header(self) -> str:
|
||||
"""Get the header value.
|
||||
|
||||
:return: The header value."""
|
||||
|
||||
username_password_bytes = (self.username + ":" + self.password).encode("utf-8")
|
||||
return "Basic " + base64.b64encode(username_password_bytes).decode("utf-8")
|
|
@ -0,0 +1,19 @@
|
|||
"""Token authentication auth class."""
|
||||
|
||||
from simple_ado.auth.ado_auth import ADOAuth
|
||||
|
||||
|
||||
class ADOTokenAuth(ADOAuth):
|
||||
"""Token auth."""
|
||||
|
||||
token: str
|
||||
|
||||
def __init__(self, token: str) -> None:
|
||||
self.token = token
|
||||
|
||||
def get_authorization_header(self) -> str:
|
||||
"""Get the header value.
|
||||
|
||||
:return: The header value."""
|
||||
|
||||
return "Bearer " + self.token
|
|
@ -19,6 +19,7 @@ from tenacity import (
|
|||
wait_random_exponential,
|
||||
)
|
||||
|
||||
from simple_ado.auth.ado_auth import ADOAuth
|
||||
from simple_ado.exceptions import ADOException, ADOHTTPException
|
||||
from simple_ado.models import PatchOperation
|
||||
|
||||
|
@ -57,14 +58,14 @@ class ADOHTTPClient:
|
|||
:param tenant: The name of the ADO tenant to connect to
|
||||
:param extra_headers: Any extra headers which should be added to each request
|
||||
:param user_agent: The user agent to set
|
||||
:param credentials: The credentials which should be used for authentication
|
||||
:param auth: The authentication details
|
||||
:param log: The logger to use for logging
|
||||
"""
|
||||
|
||||
log: logging.Logger
|
||||
tenant: str
|
||||
extra_headers: dict[str, str]
|
||||
credentials: tuple[str, str]
|
||||
auth: ADOAuth
|
||||
_not_before: datetime.datetime | None
|
||||
_session: requests.Session
|
||||
|
||||
|
@ -72,7 +73,7 @@ class ADOHTTPClient:
|
|||
self,
|
||||
*,
|
||||
tenant: str,
|
||||
credentials: tuple[str, str],
|
||||
auth: ADOAuth,
|
||||
user_agent: str,
|
||||
log: logging.Logger,
|
||||
extra_headers: dict[str, str] | None = None,
|
||||
|
@ -82,7 +83,7 @@ class ADOHTTPClient:
|
|||
self.log = log.getChild("http")
|
||||
|
||||
self.tenant = tenant
|
||||
self.credentials = credentials
|
||||
self.auth = auth
|
||||
self._not_before = None
|
||||
|
||||
self._session = requests.Session()
|
||||
|
@ -196,7 +197,7 @@ class ADOHTTPClient:
|
|||
additional_headers: dict[str, str] | None = None,
|
||||
stream: bool = False,
|
||||
) -> requests.Response:
|
||||
"""Issue a GET request with the correct credentials and headers.
|
||||
"""Issue a GET request with the correct headers.
|
||||
|
||||
:param request_url: The URL to issue the request to
|
||||
:param additional_headers: Any additional headers to add to the request
|
||||
|
@ -207,9 +208,7 @@ class ADOHTTPClient:
|
|||
self._wait()
|
||||
|
||||
headers = self.construct_headers(additional_headers=additional_headers)
|
||||
response = self._session.get(
|
||||
request_url, auth=self.credentials, headers=headers, stream=stream
|
||||
)
|
||||
response = self._session.get(request_url, headers=headers, stream=stream)
|
||||
|
||||
self._track_rate_limit(response)
|
||||
|
||||
|
@ -232,7 +231,7 @@ class ADOHTTPClient:
|
|||
json_data: Any | None = None,
|
||||
stream: bool = False,
|
||||
) -> requests.Response:
|
||||
"""Issue a POST request with the correct credentials and headers.
|
||||
"""Issue a POST request with the correct headers.
|
||||
|
||||
Note: If `json_data` and `operations` are not None, the latter will take
|
||||
precedence.
|
||||
|
@ -256,7 +255,6 @@ class ADOHTTPClient:
|
|||
headers = self.construct_headers(additional_headers=additional_headers)
|
||||
return self._session.post(
|
||||
request_url,
|
||||
auth=self.credentials,
|
||||
headers=headers,
|
||||
json=json_data,
|
||||
stream=stream,
|
||||
|
@ -275,7 +273,7 @@ class ADOHTTPClient:
|
|||
json_data: Any | None = None,
|
||||
additional_headers: dict[str, Any] | None = None,
|
||||
) -> requests.Response:
|
||||
"""Issue a PATCH request with the correct credentials and headers.
|
||||
"""Issue a PATCH request with the correct headers.
|
||||
|
||||
Note: If `json_data` and `operations` are not None, the latter will take
|
||||
precedence.
|
||||
|
@ -296,9 +294,7 @@ class ADOHTTPClient:
|
|||
additional_headers["Content-Type"] = "application/json-patch+json"
|
||||
|
||||
headers = self.construct_headers(additional_headers=additional_headers)
|
||||
return self._session.patch(
|
||||
request_url, auth=self.credentials, headers=headers, json=json_data
|
||||
)
|
||||
return self._session.patch(request_url, headers=headers, json=json_data)
|
||||
|
||||
@retry(
|
||||
retry=retry_if_exception(_is_connection_failure), # type: ignore
|
||||
|
@ -312,7 +308,7 @@ class ADOHTTPClient:
|
|||
*,
|
||||
additional_headers: dict[str, Any] | None = None,
|
||||
) -> requests.Response:
|
||||
"""Issue a PUT request with the correct credentials and headers.
|
||||
"""Issue a PUT request with the correct headers.
|
||||
|
||||
:param request_url: The URL to issue the request to
|
||||
:param additional_headers: Any additional headers to add to the request
|
||||
|
@ -321,9 +317,7 @@ class ADOHTTPClient:
|
|||
:returns: The raw response object from the API
|
||||
"""
|
||||
headers = self.construct_headers(additional_headers=additional_headers)
|
||||
return self._session.put(
|
||||
request_url, auth=self.credentials, headers=headers, json=json_data
|
||||
)
|
||||
return self._session.put(request_url, headers=headers, json=json_data)
|
||||
|
||||
@retry(
|
||||
retry=retry_if_exception(_is_connection_failure), # type: ignore
|
||||
|
@ -333,7 +327,7 @@ class ADOHTTPClient:
|
|||
def delete(
|
||||
self, request_url: str, *, additional_headers: dict[str, Any] | None = None
|
||||
) -> requests.Response:
|
||||
"""Issue a DELETE request with the correct credentials and headers.
|
||||
"""Issue a DELETE request with the correct headers.
|
||||
|
||||
:param request_url: The URL to issue the request to
|
||||
:param additional_headers: Any additional headers to add to the request
|
||||
|
@ -341,7 +335,7 @@ class ADOHTTPClient:
|
|||
:returns: The raw response object from the API
|
||||
"""
|
||||
headers = self.construct_headers(additional_headers=additional_headers)
|
||||
return self._session.delete(request_url, auth=self.credentials, headers=headers)
|
||||
return self._session.delete(request_url, headers=headers)
|
||||
|
||||
@retry(
|
||||
retry=retry_if_exception(_is_connection_failure), # type: ignore
|
||||
|
@ -369,7 +363,7 @@ class ADOHTTPClient:
|
|||
headers["Content-Length"] = str(file_size)
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
request = requests.Request("POST", request_url, headers=headers, auth=self.credentials)
|
||||
request = requests.Request("POST", request_url, headers=headers)
|
||||
prepped = request.prepare()
|
||||
|
||||
# Send the raw content, not with "Content-Disposition", etc.
|
||||
|
@ -448,6 +442,8 @@ class ADOHTTPClient:
|
|||
|
||||
headers = {"Accept": "application/json"}
|
||||
|
||||
headers["Authorization"] = self.auth.get_authorization_header()
|
||||
|
||||
for header_name, header_value in self.extra_headers.items():
|
||||
headers[header_name] = header_value
|
||||
|
||||
|
|
|
@ -255,7 +255,6 @@ class ADOPullRequestClient(ADOBaseClient):
|
|||
request_url += f"/comments/{comment_id}?api-version=3.0-preview"
|
||||
requests.delete(
|
||||
request_url,
|
||||
auth=self.http_client.credentials,
|
||||
headers=self.http_client.construct_headers(),
|
||||
)
|
||||
|
||||
|
|
|
@ -27,10 +27,9 @@ class LibraryTests(unittest.TestCase):
|
|||
def setUp(self) -> None:
|
||||
"""Set up method."""
|
||||
self.test_config = TestDetails()
|
||||
self.client = simple_ado.ADOClient(
|
||||
tenant=self.test_config.tenant,
|
||||
credentials=(self.test_config.username, self.test_config.token),
|
||||
)
|
||||
# Generate the token using `azureauth ado token --output token`
|
||||
auth = simple_ado.auth.ado_token_auth.ADOTokenAuth(self.test_config.token)
|
||||
self.client = simple_ado.ADOClient(tenant=self.test_config.tenant, auth=auth)
|
||||
|
||||
def test_access(self):
|
||||
"""Test access."""
|
||||
|
|
Загрузка…
Ссылка в новой задаче