refactor: delete Azure Stack's KubernetesClient implementation (#3957)

This commit is contained in:
Javier Darsie 2020-10-23 13:54:51 -07:00 коммит произвёл GitHub
Родитель d79bda25bc
Коммит 79de20bd76
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 117 добавлений и 291 удалений

Просмотреть файл

@ -28,6 +28,7 @@ import (
"github.com/Azure/aks-engine/pkg/engine/transform"
"github.com/Azure/aks-engine/pkg/helpers"
"github.com/Azure/aks-engine/pkg/i18n"
"github.com/Azure/aks-engine/pkg/kubernetes"
)
const (
@ -342,12 +343,12 @@ func (rcc *rotateCertsCmd) updateKubeconfig() error {
return nil
}
func (rcc *rotateCertsCmd) getKubeClient() (armhelpers.KubernetesClient, error) {
func (rcc *rotateCertsCmd) getKubeClient() (kubernetes.Client, error) {
kubeconfig, err := engine.GenerateKubeConfig(rcc.containerService.Properties, rcc.location)
if err != nil {
return nil, errors.Wrap(err, "generating kubeconfig")
}
var kubeClient armhelpers.KubernetesClient
var kubeClient kubernetes.Client
if rcc.client != nil {
kubeClient, err = rcc.client.GetKubernetesClient("", kubeconfig, time.Second*1, time.Duration(60)*time.Minute)
if err != nil {

Просмотреть файл

@ -17,10 +17,10 @@ import (
"time"
"github.com/Azure/aks-engine/pkg/engine"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/Azure/azure-sdk-for-go/services/apimanagement/mgmt/2017-03-01/apimanagement"
"github.com/Azure/azure-sdk-for-go/services/authorization/mgmt/2015-07-01/authorization"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute"
"github.com/Azure/azure-sdk-for-go/services/graphrbac/1.6/graphrbac"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-08-01/network"
"github.com/Azure/azure-sdk-for-go/services/preview/msi/mgmt/2015-08-31-preview/msi"
@ -80,6 +80,11 @@ type AzureClient struct {
servicePrincipalsClient graphrbac.ServicePrincipalsClient
}
// GetKubernetesClient returns a KubernetesClient hooked up to the api server at the apiserverURL.
func (az *AzureClient) GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (kubernetes.Client, error) {
return kubernetes.NewClient(apiserverURL, kubeConfig, interval, timeout)
}
// NewAzureClientWithCLI creates an AzureClient configured from Azure CLI 2.0 for local development scenarios.
func NewAzureClientWithCLI(env azure.Environment, subscriptionID string) (*AzureClient, error) {
_, tenantID, err := getOAuthConfig(env, subscriptionID)

Просмотреть файл

@ -15,6 +15,7 @@ import (
"time"
"github.com/Azure/aks-engine/pkg/engine"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/Azure/azure-sdk-for-go/services/apimanagement/mgmt/2017-03-01/apimanagement"
"github.com/Azure/azure-sdk-for-go/services/authorization/mgmt/2015-07-01/authorization"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-03-30/compute"
@ -74,6 +75,11 @@ type AzureClient struct {
servicePrincipalsClient graphrbac.ServicePrincipalsClient
}
// GetKubernetesClient returns a KubernetesClient hooked up to the api server at the apiserverURL.
func (az *AzureClient) GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (kubernetes.Client, error) {
return kubernetes.NewClient(apiserverURL, kubeConfig, interval, timeout)
}
// NewAzureClientWithClientSecret returns an AzureClient via client_id and client_secret
func NewAzureClientWithClientSecret(env azure.Environment, subscriptionID, clientID, clientSecret string) (*AzureClient, error) {
oauthConfig, tenantID, err := getOAuthConfig(env, subscriptionID)

Просмотреть файл

@ -1,204 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package azurestack
import (
"time"
"github.com/Azure/aks-engine/pkg/armhelpers"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
const (
evictionKind = "Eviction"
evictionSubresource = "pods/eviction"
)
//KubernetesClientSetClient is a Kubernetes client hooked up to a live api server.
type KubernetesClientSetClient struct {
clientset *kubernetes.Clientset
interval, timeout time.Duration
}
//GetKubernetesClient returns a KubernetesClient hooked up to the api server at the apiserverURL.
func (az *AzureClient) GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (armhelpers.KubernetesClient, error) {
// creates the clientset
config, err := clientcmd.BuildConfigFromKubeconfigGetter(apiserverURL, func() (*clientcmdapi.Config, error) { return clientcmd.Load([]byte(kubeConfig)) })
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &KubernetesClientSetClient{clientset: clientset, interval: interval, timeout: timeout}, nil
}
// ListPods returns all Pods running on the passed in node.
func (c *KubernetesClientSetClient) ListPods(node *v1.Node) (*v1.PodList, error) {
return c.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String()})
}
// ListAllPods returns all Pods running.
func (c *KubernetesClientSetClient) ListAllPods() (*v1.PodList, error) {
return c.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{})
}
// ListNodes returns a list of Nodes registered in the api server.
func (c *KubernetesClientSetClient) ListNodes() (*v1.NodeList, error) {
return c.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
}
// ListServiceAccounts returns a list of Service Accounts in the provided namespace.
func (c *KubernetesClientSetClient) ListServiceAccounts(namespace string) (*v1.ServiceAccountList, error) {
return c.clientset.CoreV1().ServiceAccounts(namespace).List(metav1.ListOptions{})
}
// GetNode returns details about node with passed in name.
func (c *KubernetesClientSetClient) GetNode(name string) (*v1.Node, error) {
return c.clientset.CoreV1().Nodes().Get(name, metav1.GetOptions{})
}
// UpdateNode updates the node in the api server with the passed in info.
func (c *KubernetesClientSetClient) UpdateNode(node *v1.Node) (*v1.Node, error) {
return c.clientset.CoreV1().Nodes().Update(node)
}
// DeleteNode deregisters the node in the api server.
func (c *KubernetesClientSetClient) DeleteNode(name string) error {
return c.clientset.CoreV1().Nodes().Delete(name, &metav1.DeleteOptions{})
}
// DeleteServiceAccount deletes the passed in service account.
func (c *KubernetesClientSetClient) DeleteServiceAccount(sa *v1.ServiceAccount) error {
return c.clientset.CoreV1().ServiceAccounts(sa.Namespace).Delete(sa.Name, &metav1.DeleteOptions{})
}
// SupportEviction queries the api server to discover if it supports eviction, and returns supported type if it is supported.
func (c *KubernetesClientSetClient) SupportEviction() (string, error) {
discoveryClient := c.clientset.Discovery()
groupList, err := discoveryClient.ServerGroups()
if err != nil {
return "", err
}
foundPolicyGroup := false
var policyGroupVersion string
for _, group := range groupList.Groups {
if group.Name == "policy" {
foundPolicyGroup = true
policyGroupVersion = group.PreferredVersion.GroupVersion
break
}
}
if !foundPolicyGroup {
return "", nil
}
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
if err != nil {
return "", err
}
for _, resource := range resourceList.APIResources {
if resource.Name == evictionSubresource && resource.Kind == evictionKind {
return policyGroupVersion, nil
}
}
return "", nil
}
// DeleteClusterRole deletes the passed in cluster role.
func (c *KubernetesClientSetClient) DeleteClusterRole(role *rbacv1.ClusterRole) error {
return c.clientset.RbacV1().ClusterRoles().Delete(role.Name, &metav1.DeleteOptions{})
}
// DeleteDaemonSet deletes the passed in daemonset.
func (c *KubernetesClientSetClient) DeleteDaemonSet(daemonset *appsv1.DaemonSet) error {
return c.clientset.AppsV1().DaemonSets(daemonset.Namespace).Delete(daemonset.Name, &metav1.DeleteOptions{})
}
// DeleteDeployment deletes the passed in daemonset.
func (c *KubernetesClientSetClient) DeleteDeployment(deployment *appsv1.Deployment) error {
return c.clientset.AppsV1().Deployments(deployment.Namespace).Delete(deployment.Name, &metav1.DeleteOptions{})
}
// DeletePod deletes the passed in pod.
func (c *KubernetesClientSetClient) DeletePod(pod *v1.Pod) error {
return c.clientset.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
// EvictPod evicts the passed in pod using the passed in api version.
func (c *KubernetesClientSetClient) EvictPod(pod *v1.Pod, policyGroupVersion string) error {
eviction := &policy.Eviction{
TypeMeta: metav1.TypeMeta{
APIVersion: policyGroupVersion,
Kind: evictionKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
return c.clientset.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction)
}
// GetPod returns the pod with the provided name and namespace.
func (c *KubernetesClientSetClient) getPod(namespace, name string) (*v1.Pod, error) {
return c.clientset.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
// WaitForDelete waits until all pods are deleted. Returns all pods not deleted and an error on failure.
func (c *KubernetesClientSetClient) WaitForDelete(logger *log.Entry, pods []v1.Pod, usingEviction bool) ([]v1.Pod, error) {
var verbStr string
if usingEviction {
verbStr = "evicted"
} else {
verbStr = "deleted"
}
err := wait.PollImmediate(c.interval, c.timeout, func() (bool, error) {
pendingPods := []v1.Pod{}
for i, pod := range pods {
p, err := c.getPod(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
logger.Infof("%s pod successfully %s", pod.Name, verbStr)
continue
} else if err != nil {
return false, err
} else {
pendingPods = append(pendingPods, pods[i])
}
}
pods = pendingPods
if len(pendingPods) > 0 {
return false, nil
}
return true, nil
})
return pods, err
}
// GetDaemonSet returns a given daemonset in a namespace.
func (c *KubernetesClientSetClient) GetDaemonSet(namespace, name string) (*appsv1.DaemonSet, error) {
return c.clientset.AppsV1().DaemonSets(namespace).Get(name, metav1.GetOptions{})
}
// GetDeployment returns a given deployment in a namespace.
func (c *KubernetesClientSetClient) GetDeployment(namespace, name string) (*appsv1.Deployment, error) {
return c.clientset.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
}
// UpdateDeployment updates a deployment to match the given specification.
func (c *KubernetesClientSetClient) UpdateDeployment(namespace string, deployment *appsv1.Deployment) (*appsv1.Deployment, error) {
return c.clientset.AppsV1().Deployments(namespace).Update(deployment)
}

Просмотреть файл

@ -7,6 +7,7 @@ import (
"context"
"time"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/Azure/azure-sdk-for-go/services/authorization/mgmt/2015-07-01/authorization"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute"
"github.com/Azure/azure-sdk-for-go/services/graphrbac/1.6/graphrbac"
@ -16,10 +17,6 @@ import (
azStorage "github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
)
// ResourceSkusResultPage
@ -201,7 +198,7 @@ type AKSEngineClient interface {
DeleteManagedDisk(ctx context.Context, resourceGroupName string, diskName string) error
ListManagedDisksByResourceGroup(ctx context.Context, resourceGroupName string) (result DiskListPage, err error)
GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (KubernetesClient, error)
GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (kubernetes.Client, error)
ListProviders(ctx context.Context) (ProviderListResultPage, error)
@ -231,43 +228,3 @@ type AKSStorageClient interface {
// SaveBlockBlob initializes a block blob by taking the byte
SaveBlockBlob(containerName, blobName string, b []byte, options *azStorage.PutBlobOptions) error
}
// KubernetesClient interface models client for interacting with kubernetes api server
type KubernetesClient interface {
// ListPods returns Pods running on the passed in node.
ListPods(node *v1.Node) (*v1.PodList, error)
// ListPods returns all Pods running
ListAllPods() (*v1.PodList, error)
// ListNodes returns a list of Nodes registered in the api server.
ListNodes() (*v1.NodeList, error)
// ListServiceAccounts returns a list of Service Accounts in a namespace
ListServiceAccounts(namespace string) (*v1.ServiceAccountList, error)
// GetDaemonSet returns details about DaemonSet with passed in name.
GetDaemonSet(namespace, name string) (*appsv1.DaemonSet, error)
// GetDeployment returns a given deployment in a namespace.
GetDeployment(namespace, name string) (*appsv1.Deployment, error)
// GetNode returns details about node with passed in name.
GetNode(name string) (*v1.Node, error)
// UpdateNode updates the node in the api server with the passed in info.
UpdateNode(node *v1.Node) (*v1.Node, error)
// DeleteNode deregisters node in the api server.
DeleteNode(name string) error
// SupportEviction queries the api server to discover if it supports eviction, and returns supported type if it is supported.
SupportEviction() (string, error)
// DeleteClusterRole deletes the passed in ClusterRole.
DeleteClusterRole(role *rbacv1.ClusterRole) error
// DeleteDaemonSet deletes the passed in DaemonSet.
DeleteDaemonSet(ds *appsv1.DaemonSet) error
// DeleteDeployment deletes the passed in Deployment.
DeleteDeployment(ds *appsv1.Deployment) error
// DeletePod deletes the passed in pod.
DeletePod(pod *v1.Pod) error
// DeleteServiceAccount deletes the passed in service account.
DeleteServiceAccount(sa *v1.ServiceAccount) error
// EvictPod evicts the passed in pod using the passed in api version.
EvictPod(pod *v1.Pod, policyGroupVersion string) error
// WaitForDelete waits until all pods are deleted. Returns all pods not deleted and an error on failure.
WaitForDelete(logger *log.Entry, pods []v1.Pod, usingEviction bool) ([]v1.Pod, error)
// UpdateDeployment updates a deployment to match the given specification.
UpdateDeployment(namespace string, deployment *appsv1.Deployment) (*appsv1.Deployment, error)
}

Просмотреть файл

@ -13,6 +13,7 @@ import (
"time"
"github.com/Azure/aks-engine/pkg/api/common"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/Azure/go-autorest/autorest/to"
"github.com/Azure/azure-sdk-for-go/services/authorization/mgmt/2015-07-01/authorization"
@ -926,7 +927,7 @@ func (mc *MockAKSEngineClient) ListManagedDisksByResourceGroup(ctx context.Conte
}
//GetKubernetesClient mock
func (mc *MockAKSEngineClient) GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (KubernetesClient, error) {
func (mc *MockAKSEngineClient) GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (kubernetes.Client, error) {
if mc.FailGetKubernetesClient {
return nil, errors.New("GetKubernetesClient failed")
}

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package armhelpers
package kubernetes
import (
"time"
@ -25,16 +25,21 @@ const (
evictionSubresource = "pods/eviction"
)
// KubernetesClientSetClient is a Kubernetes client hooked up to a live api server.
type KubernetesClientSetClient struct {
// kubernetesClientSetClient is a Kubernetes client hooked up to a live api server.
type kubernetesClientSetClient struct {
clientset *kubernetes.Clientset
interval, timeout time.Duration
}
// GetKubernetesClient returns a KubernetesClient hooked up to the api server at the apiserverURL.
func (az *AzureClient) GetKubernetesClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (KubernetesClient, error) {
// TODO This contructor does not follow best practices
// https://github.com/golang/go/wiki/CodeReviewComments#interfaces
// NewClient returns a KubernetesClient hooked up to the api server at the apiserverURL.
func NewClient(apiserverURL, kubeConfig string, interval, timeout time.Duration) (Client, error) {
// creates the clientset
config, err := clientcmd.BuildConfigFromKubeconfigGetter(apiserverURL, func() (*clientcmdapi.Config, error) { return clientcmd.Load([]byte(kubeConfig)) })
config, err := clientcmd.BuildConfigFromKubeconfigGetter(apiserverURL, func() (*clientcmdapi.Config, error) {
return clientcmd.Load([]byte(kubeConfig))
})
if err != nil {
return nil, err
}
@ -42,52 +47,52 @@ func (az *AzureClient) GetKubernetesClient(apiserverURL, kubeConfig string, inte
if err != nil {
return nil, err
}
return &KubernetesClientSetClient{clientset: clientset, interval: interval, timeout: timeout}, nil
return &kubernetesClientSetClient{clientset: clientset, interval: interval, timeout: timeout}, nil
}
// ListPods returns Pods running on the passed in node.
func (c *KubernetesClientSetClient) ListPods(node *v1.Node) (*v1.PodList, error) {
func (c *kubernetesClientSetClient) ListPods(node *v1.Node) (*v1.PodList, error) {
return c.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String()})
}
// ListAllPods returns all Pods running.
func (c *KubernetesClientSetClient) ListAllPods() (*v1.PodList, error) {
func (c *kubernetesClientSetClient) ListAllPods() (*v1.PodList, error) {
return c.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{})
}
// ListNodes returns a list of Nodes registered in the api server.
func (c *KubernetesClientSetClient) ListNodes() (*v1.NodeList, error) {
func (c *kubernetesClientSetClient) ListNodes() (*v1.NodeList, error) {
return c.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
}
// ListServiceAccounts returns a list of Service Accounts in the provided namespace.
func (c *KubernetesClientSetClient) ListServiceAccounts(namespace string) (*v1.ServiceAccountList, error) {
func (c *kubernetesClientSetClient) ListServiceAccounts(namespace string) (*v1.ServiceAccountList, error) {
return c.clientset.CoreV1().ServiceAccounts(namespace).List(metav1.ListOptions{})
}
// GetNode returns details about node with passed in name.
func (c *KubernetesClientSetClient) GetNode(name string) (*v1.Node, error) {
func (c *kubernetesClientSetClient) GetNode(name string) (*v1.Node, error) {
return c.clientset.CoreV1().Nodes().Get(name, metav1.GetOptions{})
}
// UpdateNode updates the node in the api server with the passed in info.
func (c *KubernetesClientSetClient) UpdateNode(node *v1.Node) (*v1.Node, error) {
func (c *kubernetesClientSetClient) UpdateNode(node *v1.Node) (*v1.Node, error) {
return c.clientset.CoreV1().Nodes().Update(node)
}
// DeleteNode deregisters the node in the api server.
func (c *KubernetesClientSetClient) DeleteNode(name string) error {
func (c *kubernetesClientSetClient) DeleteNode(name string) error {
return c.clientset.CoreV1().Nodes().Delete(name, &metav1.DeleteOptions{})
}
// DeleteServiceAccount deletes the passed in service account.
func (c *KubernetesClientSetClient) DeleteServiceAccount(sa *v1.ServiceAccount) error {
func (c *kubernetesClientSetClient) DeleteServiceAccount(sa *v1.ServiceAccount) error {
return c.clientset.CoreV1().ServiceAccounts(sa.Namespace).Delete(sa.Name, &metav1.DeleteOptions{})
}
// SupportEviction queries the api server to discover if it supports eviction, and returns supported type if it is supported.
func (c *KubernetesClientSetClient) SupportEviction() (string, error) {
func (c *kubernetesClientSetClient) SupportEviction() (string, error) {
discoveryClient := c.clientset.Discovery()
groupList, err := discoveryClient.ServerGroups()
if err != nil {
@ -118,27 +123,27 @@ func (c *KubernetesClientSetClient) SupportEviction() (string, error) {
}
// DeleteClusterRole deletes the passed in cluster role.
func (c *KubernetesClientSetClient) DeleteClusterRole(role *rbacv1.ClusterRole) error {
func (c *kubernetesClientSetClient) DeleteClusterRole(role *rbacv1.ClusterRole) error {
return c.clientset.RbacV1().ClusterRoles().Delete(role.Name, &metav1.DeleteOptions{})
}
// DeleteDaemonSet deletes the passed in daemonset.
func (c *KubernetesClientSetClient) DeleteDaemonSet(daemonset *appsv1.DaemonSet) error {
func (c *kubernetesClientSetClient) DeleteDaemonSet(daemonset *appsv1.DaemonSet) error {
return c.clientset.AppsV1().DaemonSets(daemonset.Namespace).Delete(daemonset.Name, &metav1.DeleteOptions{})
}
// DeleteDeployment deletes the passed in daemonset.
func (c *KubernetesClientSetClient) DeleteDeployment(deployment *appsv1.Deployment) error {
func (c *kubernetesClientSetClient) DeleteDeployment(deployment *appsv1.Deployment) error {
return c.clientset.AppsV1().Deployments(deployment.Namespace).Delete(deployment.Name, &metav1.DeleteOptions{})
}
// DeletePod deletes the passed in pod.
func (c *KubernetesClientSetClient) DeletePod(pod *v1.Pod) error {
func (c *kubernetesClientSetClient) DeletePod(pod *v1.Pod) error {
return c.clientset.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
// EvictPod evicts the passed in pod using the passed in api version.
func (c *KubernetesClientSetClient) EvictPod(pod *v1.Pod, policyGroupVersion string) error {
func (c *kubernetesClientSetClient) EvictPod(pod *v1.Pod, policyGroupVersion string) error {
eviction := &policy.Eviction{
TypeMeta: metav1.TypeMeta{
APIVersion: policyGroupVersion,
@ -153,17 +158,15 @@ func (c *KubernetesClientSetClient) EvictPod(pod *v1.Pod, policyGroupVersion str
}
// GetPod returns the pod.
func (c *KubernetesClientSetClient) getPod(namespace, name string) (*v1.Pod, error) {
func (c *kubernetesClientSetClient) getPod(namespace, name string) (*v1.Pod, error) {
return c.clientset.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
// WaitForDelete waits until all pods are deleted. Returns all pods not deleted and an error on failure.
func (c *KubernetesClientSetClient) WaitForDelete(logger *log.Entry, pods []v1.Pod, usingEviction bool) ([]v1.Pod, error) {
var verbStr string
func (c *kubernetesClientSetClient) WaitForDelete(logger *log.Entry, pods []v1.Pod, usingEviction bool) ([]v1.Pod, error) {
verbStr := "deleted"
if usingEviction {
verbStr = "evicted"
} else {
verbStr = "deleted"
}
err := wait.PollImmediate(c.interval, c.timeout, func() (bool, error) {
pendingPods := []v1.Pod{}
@ -188,16 +191,16 @@ func (c *KubernetesClientSetClient) WaitForDelete(logger *log.Entry, pods []v1.P
}
// GetDaemonSet returns a given daemonset in a namespace.
func (c *KubernetesClientSetClient) GetDaemonSet(namespace, name string) (*appsv1.DaemonSet, error) {
func (c *kubernetesClientSetClient) GetDaemonSet(namespace, name string) (*appsv1.DaemonSet, error) {
return c.clientset.AppsV1().DaemonSets(namespace).Get(name, metav1.GetOptions{})
}
// GetDeployment returns a given deployment in a namespace.
func (c *KubernetesClientSetClient) GetDeployment(namespace, name string) (*appsv1.Deployment, error) {
func (c *kubernetesClientSetClient) GetDeployment(namespace, name string) (*appsv1.Deployment, error) {
return c.clientset.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
}
// UpdateDeployment updates a deployment to match the given specification.
func (c *KubernetesClientSetClient) UpdateDeployment(namespace string, deployment *appsv1.Deployment) (*appsv1.Deployment, error) {
func (c *kubernetesClientSetClient) UpdateDeployment(namespace string, deployment *appsv1.Deployment) (*appsv1.Deployment, error) {
return c.clientset.AppsV1().Deployments(namespace).Update(deployment)
}

Просмотреть файл

@ -0,0 +1,54 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package kubernetes
import (
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
)
// TODO This interface does not follow best practices
// https://github.com/golang/go/wiki/CodeReviewComments#interfaces
// Client interface models client for interacting with kubernetes api server
type Client interface {
// ListPods returns Pods running on the passed in node.
ListPods(node *v1.Node) (*v1.PodList, error)
// ListPods returns all Pods running
ListAllPods() (*v1.PodList, error)
// ListNodes returns a list of Nodes registered in the api server.
ListNodes() (*v1.NodeList, error)
// ListServiceAccounts returns a list of Service Accounts in a namespace
ListServiceAccounts(namespace string) (*v1.ServiceAccountList, error)
// GetDaemonSet returns details about DaemonSet with passed in name.
GetDaemonSet(namespace, name string) (*appsv1.DaemonSet, error)
// GetDeployment returns a given deployment in a namespace.
GetDeployment(namespace, name string) (*appsv1.Deployment, error)
// GetNode returns details about node with passed in name.
GetNode(name string) (*v1.Node, error)
// UpdateNode updates the node in the api server with the passed in info.
UpdateNode(node *v1.Node) (*v1.Node, error)
// DeleteNode deregisters node in the api server.
DeleteNode(name string) error
// SupportEviction queries the api server to discover if it supports eviction, and returns supported type if it is supported.
SupportEviction() (string, error)
// DeleteClusterRole deletes the passed in ClusterRole.
DeleteClusterRole(role *rbacv1.ClusterRole) error
// DeleteDaemonSet deletes the passed in DaemonSet.
DeleteDaemonSet(ds *appsv1.DaemonSet) error
// DeleteDeployment deletes the passed in Deployment.
DeleteDeployment(ds *appsv1.Deployment) error
// DeletePod deletes the passed in pod.
DeletePod(pod *v1.Pod) error
// DeleteServiceAccount deletes the passed in service account.
DeleteServiceAccount(sa *v1.ServiceAccount) error
// EvictPod evicts the passed in pod using the passed in api version.
EvictPod(pod *v1.Pod, policyGroupVersion string) error
// WaitForDelete waits until all pods are deleted. Returns all pods not deleted and an error on failure.
WaitForDelete(logger *log.Entry, pods []v1.Pod, usingEviction bool) ([]v1.Pod, error)
// UpdateDeployment updates a deployment to match the given specification.
UpdateDeployment(namespace string, deployment *appsv1.Deployment) (*appsv1.Deployment, error)
}

Просмотреть файл

@ -8,6 +8,7 @@ import (
"time"
"github.com/Azure/aks-engine/pkg/armhelpers"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
@ -25,7 +26,7 @@ const (
)
type drainOperation struct {
client armhelpers.KubernetesClient
client kubernetes.Client
node *v1.Node
logger *log.Entry
timeout time.Duration
@ -44,7 +45,7 @@ func SafelyDrainNode(az armhelpers.AKSEngineClient, logger *log.Entry, apiserver
}
// SafelyDrainNodeWithClient safely drains a node so that it can be deleted from the cluster
func SafelyDrainNodeWithClient(client armhelpers.KubernetesClient, logger *log.Entry, nodeName string, timeout time.Duration) error {
func SafelyDrainNodeWithClient(client kubernetes.Client, logger *log.Entry, nodeName string, timeout time.Duration) error {
nodeName = strings.ToLower(nodeName)
//Mark the node unschedulable
var node *v1.Node

Просмотреть файл

@ -15,6 +15,7 @@ import (
"github.com/Azure/aks-engine/pkg/armhelpers"
"github.com/Azure/aks-engine/pkg/armhelpers/utils"
"github.com/Azure/aks-engine/pkg/i18n"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -90,7 +91,7 @@ func (uc *UpgradeCluster) UpgradeCluster(az armhelpers.AKSEngineClient, kubeConf
uc.UpgradedMasterVMs = &[]compute.VirtualMachine{}
uc.AgentPools = make(map[string]*AgentPoolTopology)
var kubeClient armhelpers.KubernetesClient
var kubeClient kubernetes.Client
if az != nil {
timeout := time.Duration(60) * time.Minute
k, err := az.GetKubernetesClient("", kubeConfig, interval, timeout)
@ -146,7 +147,7 @@ func (uc *UpgradeCluster) UpgradeCluster(az armhelpers.AKSEngineClient, kubeConf
}
// SetClusterAutoscalerReplicaCount changes the replica count of a cluster-autoscaler deployment.
func (uc *UpgradeCluster) SetClusterAutoscalerReplicaCount(kubeClient armhelpers.KubernetesClient, replicaCount int32) (int32, error) {
func (uc *UpgradeCluster) SetClusterAutoscalerReplicaCount(kubeClient kubernetes.Client, replicaCount int32) (int32, error) {
if kubeClient == nil {
return 0, errors.New("no kubernetes client")
}
@ -184,7 +185,7 @@ func (uc *UpgradeCluster) getUpgradeWorkflow(kubeConfig string, aksEngineVersion
return u
}
func (uc *UpgradeCluster) getClusterNodeStatus(kubeClient armhelpers.KubernetesClient, resourceGroup string) error {
func (uc *UpgradeCluster) getClusterNodeStatus(kubeClient kubernetes.Client, resourceGroup string) error {
goalVersion := uc.DataModel.Properties.OrchestratorProfile.OrchestratorVersion
ctx, cancel := context.WithTimeout(context.Background(), armhelpers.DefaultARMOperationTimeout)
@ -322,7 +323,7 @@ func (uc *UpgradeCluster) upgradable(currentVersion string) error {
// Also, if the latest VMSS model is applied, then we can get the version info from the tags.
// Otherwise, we have to get version via K8s API. This is because VMSS does not support tags
// for individual instances and old/new instances have the same tags.
func (uc *UpgradeCluster) getNodeVersion(client armhelpers.KubernetesClient, name string, tags map[string]*string, getVersionFromTags bool) string {
func (uc *UpgradeCluster) getNodeVersion(client kubernetes.Client, name string, tags map[string]*string, getVersionFromTags bool) string {
if getVersionFromTags {
if tags != nil && tags["orchestrator"] != nil {
parts := strings.Split(*tags["orchestrator"], ":")

Просмотреть файл

@ -19,6 +19,7 @@ import (
"github.com/Azure/aks-engine/pkg/engine/transform"
"github.com/Azure/aks-engine/pkg/helpers"
"github.com/Azure/aks-engine/pkg/i18n"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/Azure/aks-engine/pkg/operations"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -791,7 +792,7 @@ func (ku *Upgrader) getLastVMNameInVMSS(ctx context.Context, resourceGroup strin
return lastVMName, nil
}
func (ku *Upgrader) copyCustomPropertiesToNewNode(client armhelpers.KubernetesClient, oldNodeName string, newNodeName string) error {
func (ku *Upgrader) copyCustomPropertiesToNewNode(client kubernetes.Client, oldNodeName string, newNodeName string) error {
// The new node is created without any taints, Kubernetes might schedule some pods on this newly created node before the taints/annotations/labels
// are copied over from corresponding old node. So drain the new node first before copying over the node properties.
// Note: SafelyDrainNodeWithClient() sets the Unschedulable of the node to true, set Unschedulable to false in copyCustomNodeProperties
@ -846,7 +847,7 @@ func (ku *Upgrader) copyCustomPropertiesToNewNode(client armhelpers.KubernetesCl
}
}
func (ku *Upgrader) copyCustomNodeProperties(client armhelpers.KubernetesClient, oldNodeName string, oldNode *v1.Node, newNodeName string, newNode *v1.Node) error {
func (ku *Upgrader) copyCustomNodeProperties(client kubernetes.Client, oldNodeName string, oldNode *v1.Node, newNodeName string, newNode *v1.Node) error {
// copy additional custom annotations from old node to new node
if oldNode.Annotations != nil {
if newNode.Annotations == nil {
@ -893,7 +894,7 @@ func (ku *Upgrader) copyCustomNodeProperties(client armhelpers.KubernetesClient,
return err
}
func (ku *Upgrader) getKubernetesClient(timeout time.Duration) (armhelpers.KubernetesClient, error) {
func (ku *Upgrader) getKubernetesClient(timeout time.Duration) (kubernetes.Client, error) {
apiserverURL := ku.DataModel.Properties.GetMasterFQDN()
if ku.DataModel.Properties.HostedMasterProfile != nil {
apiServerListeningPort := 443