diff --git a/azure_functions_devops_build/service_endpoint/service_endpoint_manager.py b/azure_functions_devops_build/service_endpoint/service_endpoint_manager.py index 582c230..c717ef1 100644 --- a/azure_functions_devops_build/service_endpoint/service_endpoint_manager.py +++ b/azure_functions_devops_build/service_endpoint/service_endpoint_manager.py @@ -4,7 +4,11 @@ # -------------------------------------------------------------------------------------------- import json -import subprocess +try: + from subprocess import DEVNULL +except ImportError: + DEVNULL = open(os.devnull, 'w') +from subprocess import check_output, check_call, CalledProcessError import vsts.service_endpoint.v4_1.models as models from vsts.exceptions import VstsClientRequestError from ..base.base_manager import BaseManager @@ -22,6 +26,28 @@ class ServiceEndpointManager(BaseManager): super(ServiceEndpointManager, self).__init__(creds, organization_name=organization_name, project_name=project_name) + @staticmethod + def check_service_endpoint_assignment_permission(): + """Try to create a service principle and perform a role assignment on it""" + command = "az ad sp create-for-rbac" + result = None + + try: + result = check_output(command, shell=True, stderr=DEVNULL).decode() + except CalledProcessError: + return False + + # Clean up service principle, even if it fails, we still consider the operator has role assignment permission + if result: + service_principle = json.loads(result) + command = "az ad sp delete --id {app_id}".format(app_id=service_principle["appId"]) + try: + check_call(command, shell=True, stdout=DEVNULL, stderr=DEVNULL) + except CalledProcessError as e: + return True + + return True + # Get the details of a service endpoint # If endpoint does not exist, return None def get_service_endpoints(self, repository_name): @@ -59,7 +85,7 @@ class ServiceEndpointManager(BaseManager): project = self._get_project_by_name(self._project_name) command = "az account show --o json" - token_resp = subprocess.check_output(command, shell=True).decode() + token_resp = check_output(command, shell=True).decode() account = json.loads(token_resp) data = {} @@ -74,7 +100,7 @@ class ServiceEndpointManager(BaseManager): # A service principal name has to include the http/https to be valid command = "az ad sp create-for-rbac --o json --name http://" + service_principle_name - token_resp = subprocess.check_output(command, shell=True).decode() + token_resp = check_output(command, shell=True).decode() token_resp_dict = json.loads(token_resp) auth = models.endpoint_authorization.EndpointAuthorization( parameters={