Resolved device_flow style issues.

This commit is contained in:
Nate McFeters 2022-12-30 16:55:19 -06:00
Родитель 8171cd7702
Коммит 1892538b04
2 изменённых файлов: 25 добавлений и 40 удалений

Просмотреть файл

@ -17,6 +17,7 @@ from tenacity import retry, retry_if_exception, stop_after_attempt, wait_random_
from simple_ado.exceptions import ADOException, ADOHTTPException
from simple_ado.models import PatchOperation
# pylint: disable=invalid-name
ADOThread = Dict[str, Any]
ADOResponse = Any
@ -58,7 +59,7 @@ class ADOHTTPClient:
log: logging.Logger
tenant: str
extra_headers: Dict[str, str]
credentials = None # Modify away from tuple to expand for device_flow
credentials: Tuple[str, str]
_not_before: Optional[datetime.datetime]
_session: requests.Session
@ -66,7 +67,7 @@ class ADOHTTPClient:
self,
*,
tenant: str,
credentials: None, # Modify away from tuple to support device_flow
credentials: Tuple[str, str],
user_agent: str,
log: logging.Logger,
extra_headers: Optional[Dict[str, str]] = None,
@ -77,7 +78,6 @@ class ADOHTTPClient:
self.tenant = tenant
self.credentials = credentials
self._not_before = None
self._session = requests.Session()
@ -200,27 +200,12 @@ class ADOHTTPClient:
:returns: The raw response object from the API
"""
self._wait()
response = None
# Modified to support PAT-based Authentication
if isinstance(self.credentials, tuple) and len(self.credentials) == 2:
headers = self.construct_headers(additional_headers=additional_headers)
response = self._session.get(
request_url, auth=self.credentials, headers=headers, stream=stream
)
# Since requests does not support BearerAuth, this is a hacky way to do it.
elif isinstance(self.credentials, dict) and len(self.credentials) == 3:
if additional_headers:
additional_headers['Authorization'] = 'Bearer ' + self.credentials["access_token"]
else:
additional_headers = {"Authorization":"Bearer " + self.credentials["access_token"]}
headers = self.construct_headers(additional_headers=additional_headers)
response = self._session.get(
request_url, auth=None, headers=headers, stream=stream
)
else:
raise ValueError("Unknown authentication type. Modify simple_ado/http_client.py to support.")
if not response:
raise ADOHTTPException("Unable to get a response from authentication phase. See simple_ado/http_client.py.")
headers = self.construct_headers(additional_headers=additional_headers)
response = self._session.get(
request_url, auth=self.credentials, headers=headers, stream=stream
)
self._track_rate_limit(response)
if response.status_code == 429:

Просмотреть файл

@ -21,22 +21,22 @@ from simple_ado.types import TeamFoundationId
class ADOBranchPermission(enum.IntEnum):
"""Possible types of git branch permissions."""
ADMINISTER: int = 2 ** 0
READ: int = 2 ** 1
CONTRIBUTE: int = 2 ** 2
FORCE_PUSH: int = 2 ** 3
CREATE_BRANCH: int = 2 ** 4
CREATE_TAG: int = 2 ** 5
MANAGE_NOTES: int = 2 ** 6
BYPASS_PUSH_POLICIES: int = 2 ** 7
CREATE_REPOSITORY: int = 2 ** 8
DELETE_REPOSITORY: int = 2 ** 9
RENAME_REPOSITORY: int = 2 ** 10
EDIT_POLICIES: int = 2 ** 11
REMOVE_OTHERS_LOCKS: int = 2 ** 12
MANAGE_PERMISSIONS: int = 2 ** 13
CONTRIBUTE_TO_PULL_REQUESTS: int = 2 ** 14
BYPASS_PULL_REQUEST_POLICIES: int = 2 ** 15
ADMINISTER: int = 2**0
READ: int = 2**1
CONTRIBUTE: int = 2**2
FORCE_PUSH: int = 2**3
CREATE_BRANCH: int = 2**4
CREATE_TAG: int = 2**5
MANAGE_NOTES: int = 2**6
BYPASS_PUSH_POLICIES: int = 2**7
CREATE_REPOSITORY: int = 2**8
DELETE_REPOSITORY: int = 2**9
RENAME_REPOSITORY: int = 2**10
EDIT_POLICIES: int = 2**11
REMOVE_OTHERS_LOCKS: int = 2**12
MANAGE_PERMISSIONS: int = 2**13
CONTRIBUTE_TO_PULL_REQUESTS: int = 2**14
BYPASS_PULL_REQUEST_POLICIES: int = 2**15
class ADOBranchPermissionLevel(enum.IntEnum):