зеркало из https://github.com/Azure/ipam.git
Added additional support for other Azure clouds
This commit is contained in:
Родитель
b11b4e8ce1
Коммит
bd7a79a34d
|
@ -9,29 +9,61 @@ from azure.identity import AzureAuthorityHosts
|
|||
AZURE_ENV_MAP = {
|
||||
'AZURE_PUBLIC': {
|
||||
'AZURE_ARM': 'management.azure.com',
|
||||
'AZURE_MGMT': 'management.core.windows.net',
|
||||
'AUTH_HOST': AzureAuthorityHosts.AZURE_PUBLIC_CLOUD
|
||||
},
|
||||
'AZURE_US_GOV': {
|
||||
'AZURE_ARM': 'management.usgovcloudapi.net',
|
||||
'AZURE_MGMT': 'management.core.usgovcloudapi.net',
|
||||
'AUTH_HOST': AzureAuthorityHosts.AZURE_GOVERNMENT
|
||||
},
|
||||
'AZURE_GERMANY': {
|
||||
'AZURE_ARM': 'management.microsoftazure.de',
|
||||
'AZURE_MGMT': 'management.core.cloudapi.de',
|
||||
'AUTH_HOST': AzureAuthorityHosts.AZURE_GERMANY
|
||||
},
|
||||
'AZURE_CHINA': {
|
||||
'AZURE_ARM': 'management.chinacloudapi.cn',
|
||||
'AZURE_MGMT': 'management.core.chinacloudapi.cn',
|
||||
'AUTH_HOST': AzureAuthorityHosts.AZURE_CHINA
|
||||
}
|
||||
}
|
||||
|
||||
class Globals:
|
||||
def __init__(self):
|
||||
client_creds = ClientSecretCredential(self.TENANT_ID, self.CLIENT_ID, self.CLIENT_SECRET, authority=self.AUTHORITY_HOST)
|
||||
mgmt_group_api = ManagementGroupsAPI(client_creds)
|
||||
target_group = mgmt_group_api.management_groups.get(os.environ.get('TENANT_ID'))
|
||||
# def __init__(self):
|
||||
# azure_env = os.environ.get('AZURE_ENV')
|
||||
# azure_arm_host = AZURE_ENV_MAP[azure_env]['AZURE_ARM'] if azure_env in AZURE_ENV_MAP else AZURE_ENV_MAP['AZURE_PUBLIC']['AZURE_ARM']
|
||||
# azure_auth_host = AZURE_ENV_MAP[azure_env]['AUTH_HOST'] if azure_env in AZURE_ENV_MAP else AZURE_ENV_MAP['AZURE_PUBLIC']['AUTH_HOST']
|
||||
# azure_arm_url = 'https://{}'.format(azure_arm_host)
|
||||
# azure_arm_scope = '{}/.default'.format(azure_arm_url)
|
||||
# mgmt_group_id = '/providers/Microsoft.Management/managementGroups/{}'.format(os.environ.get('TENANT_ID'))
|
||||
|
||||
self.root_mgmt_group = target_group.name
|
||||
# print("---------------------------")
|
||||
# print("********GLOBAL INIT********")
|
||||
# print("---------------------------")
|
||||
# print("TENANT_ID: {}".format(os.environ.get('TENANT_ID')))
|
||||
# print("CLIENT_ID: {}".format(os.environ.get('CLIENT_ID')))
|
||||
# print("CLIENT_SECRET: {}".format(os.environ.get('CLIENT_SECRET')))
|
||||
# print("AUTHORITY_HOST: {}".format(azure_auth_host))
|
||||
# print("AZURE_MGMT_URL: {}".format(azure_arm_url))
|
||||
# print("---------------------------")
|
||||
|
||||
# client_creds = ClientSecretCredential(
|
||||
# tenant_id=os.environ.get('TENANT_ID'),
|
||||
# client_id=os.environ.get('CLIENT_ID'),
|
||||
# client_secret=os.environ.get('CLIENT_SECRET'),
|
||||
# authority=azure_auth_host
|
||||
# )
|
||||
|
||||
# mgmt_group_api = ManagementGroupsAPI(
|
||||
# credential=client_creds,
|
||||
# base_url=azure_arm_url,
|
||||
# credential_scopes=[azure_arm_scope]
|
||||
# )
|
||||
|
||||
# target_group = mgmt_group_api.management_groups.get(os.environ.get('TENANT_ID'))
|
||||
|
||||
# self.root_mgmt_group = target_group.name
|
||||
|
||||
@property
|
||||
def CLIENT_ID(self):
|
||||
|
@ -57,9 +89,9 @@ class Globals:
|
|||
def KEYVAULT_URL(self):
|
||||
return os.environ.get('KEYVAULT_URL')
|
||||
|
||||
@property
|
||||
def ROOT_MGMT_GROUP(self):
|
||||
return self.root_mgmt_group
|
||||
# @property
|
||||
# def ROOT_MGMT_GROUP(self):
|
||||
# return self.root_mgmt_group
|
||||
|
||||
@property
|
||||
def AZURE_ARM_URL(self):
|
||||
|
@ -67,7 +99,8 @@ class Globals:
|
|||
|
||||
azure_arm_url = AZURE_ENV_MAP[azure_env]['AZURE_ARM'] if azure_env in AZURE_ENV_MAP else AZURE_ENV_MAP['AZURE_PUBLIC']['AZURE_ARM']
|
||||
|
||||
return 'https://{}/user_impersonation'.format(azure_arm_url)
|
||||
# return 'https://{}/user_impersonation'.format(azure_arm_url)
|
||||
return azure_arm_url
|
||||
|
||||
@property
|
||||
def AUTHORITY_HOST(self):
|
||||
|
|
|
@ -45,8 +45,10 @@ def get_user_id_from_jwt(token):
|
|||
async def get_obo_token(assertion):
|
||||
"""DOCSTRING"""
|
||||
|
||||
azure_arm_url = 'https://{}/user_impersonation'.format(globals.AZURE_ARM_URL)
|
||||
|
||||
credential = OnBehalfOfCredential(globals.TENANT_ID, globals.CLIENT_ID, client_secret=globals.CLIENT_SECRET, user_assertion=assertion)
|
||||
obo_token = await credential.get_token(globals.AZURE_ARM_URL)
|
||||
obo_token = await credential.get_token(azure_arm_url)
|
||||
await credential.close()
|
||||
|
||||
return obo_token
|
||||
|
@ -226,6 +228,8 @@ async def arg_query(auth, admin, query):
|
|||
except ClientAuthenticationError:
|
||||
raise HTTPException(status_code=401, detail="Token has expired.")
|
||||
except HttpResponseError as e:
|
||||
print("IsAdmin: {}".format(admin))
|
||||
print(e)
|
||||
raise HTTPException(status_code=403, detail="Access denied.")
|
||||
finally:
|
||||
await creds.close()
|
||||
|
@ -269,7 +273,14 @@ async def arg_query_helper(credentials, query):
|
|||
|
||||
results = []
|
||||
|
||||
resource_graph_client = ResourceGraphClient(credentials)
|
||||
azure_arm_url = 'https://{}'.format(globals.AZURE_ARM_URL)
|
||||
azure_arm_scope = '{}/.default'.format(azure_arm_url)
|
||||
|
||||
resource_graph_client = ResourceGraphClient(
|
||||
credential=credentials,
|
||||
base_url=azure_arm_url,
|
||||
credential_scopes=[azure_arm_scope]
|
||||
)
|
||||
|
||||
try:
|
||||
skip_token = None
|
||||
|
@ -277,7 +288,7 @@ async def arg_query_helper(credentials, query):
|
|||
while True:
|
||||
query_request = QueryRequest(
|
||||
query=query,
|
||||
management_groups=[globals.ROOT_MGMT_GROUP],
|
||||
# management_groups=[globals.TENANT_ID],
|
||||
options=QueryRequestOptions(
|
||||
result_format=ResultFormat.object_array,
|
||||
skip_token=skip_token
|
||||
|
|
Загрузка…
Ссылка в новой задаче