aks-engine/pkg/operations/cordondrainvm.go

241 строка
6.4 KiB
Go

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package operations
import (
"context"
"strings"
"time"
"github.com/Azure/aks-engine/pkg/armhelpers"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
interval = time.Second * 1
mirrorPodAnnotation = "kubernetes.io/config.mirror"
// This is checked into K8s code but I was getting into vendoring issues so I copied it here instead
kubernetesOptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
cordonMaxRetries = 5
)
type drainOperation struct {
client kubernetes.Client
node *v1.Node
logger *log.Entry
timeout time.Duration
}
type podFilter func(v1.Pod) bool
// SafelyDrainNode safely drains a node so that it can be deleted from the cluster
func SafelyDrainNode(az armhelpers.AKSEngineClient, logger *log.Entry, apiserverURL, kubeConfig, nodeName string, timeout time.Duration) error {
//get client using kubeconfig
client, err := az.GetKubernetesClient(apiserverURL, kubeConfig, interval, timeout)
if err != nil {
return err
}
return SafelyDrainNodeWithClient(client, logger, nodeName, timeout)
}
// SafelyDrainNodeWithClient safely drains a node so that it can be deleted from the cluster
func SafelyDrainNodeWithClient(client kubernetes.Client, logger *log.Entry, nodeName string, timeout time.Duration) error {
nodeName = strings.ToLower(nodeName)
//Mark the node unschedulable
var node *v1.Node
var err error
for i := 0; i < cordonMaxRetries; i++ {
node, err = client.GetNode(nodeName)
if err != nil {
return err
}
node.Spec.Unschedulable = true
node, err = client.UpdateNode(node)
if err != nil {
// If this error is because of a concurrent modification get the update
// and then apply the change
if strings.Contains(err.Error(), kubernetesOptimisticLockErrorMsg) {
logger.Infof("Node %s got an error suggesting a concurrent modification. Will retry to cordon", nodeName)
continue
}
return err
}
break
}
logger.Infof("Node %s has been marked unschedulable.", nodeName)
//Evict pods in node
drainOp := &drainOperation{client: client, node: node, logger: logger, timeout: timeout}
return drainOp.deleteOrEvictPodsSimple()
}
func (o *drainOperation) deleteOrEvictPodsSimple() error {
pods, err := o.getPodsForDeletion()
if err != nil {
return err
}
if len(pods) > 0 {
o.logger.WithFields(log.Fields{
"prefix": "drain",
"node": o.node.Name,
}).Infof("%d pods will be deleted", len(pods))
} else {
o.logger.Infof("Node %s has no scheduled pods", o.node.Name)
}
err = o.deleteOrEvictPods(pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion()
if newErr != nil {
return newErr
}
o.logger.Errorf("There are pending pods when an error occurred: %v\n", err)
for _, pendingPod := range pendingPods {
o.logger.Errorf("%s/%s\n", "pod", pendingPod.Name)
}
}
return err
}
func mirrorPodFilter(pod v1.Pod) bool {
if _, found := pod.ObjectMeta.Annotations[mirrorPodAnnotation]; found {
return false
}
return true
}
func getControllerRef(pod *v1.Pod) *metav1.OwnerReference {
for _, ref := range pod.ObjectMeta.OwnerReferences {
if ref.Controller != nil && *ref.Controller {
return &ref
}
}
return nil
}
func daemonSetPodFilter(pod v1.Pod) bool {
controllerRef := getControllerRef(&pod)
// Kubectl goes and verifies this controller exists in the api server to make sure it isn't orphaned
// we are deleting orphaned pods so we don't care and delete any that aren't a daemonset
if controllerRef == nil || controllerRef.Kind != "DaemonSet" {
return true
}
// Don't delete/evict daemonsets as they will just come back
// and can deleting/evicting them can cause service disruptions
return false
}
// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *drainOperation) getPodsForDeletion() (pods []v1.Pod, err error) {
podList, err := o.client.ListPods(o.node)
if err != nil {
return pods, err
}
for _, pod := range podList.Items {
podOk := true
for _, filt := range []podFilter{
mirrorPodFilter,
daemonSetPodFilter,
} {
podOk = podOk && filt(pod)
}
if podOk {
pods = append(pods, pod)
}
}
return pods, nil
}
// deleteOrEvictPods deletes or evicts the pods on the api server
func (o *drainOperation) deleteOrEvictPods(pods []v1.Pod) error {
if len(pods) == 0 {
return nil
}
policyGroupVersion, err := o.client.SupportEviction()
if err != nil {
return err
}
if len(policyGroupVersion) > 0 {
return o.evictPods(pods, policyGroupVersion)
}
return o.deletePods(pods)
}
func (o *drainOperation) evictPods(pods []v1.Pod, policyGroupVersion string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doneCh := make(chan bool, len(pods))
errCh := make(chan error, 1)
for _, pod := range pods {
go func(ctx context.Context, pod v1.Pod, doneCh chan bool, errCh chan error) {
var err error
doneEviction:
for {
select {
case <-ctx.Done():
return
default:
err = o.client.EvictPod(&pod, policyGroupVersion)
if err == nil {
break doneEviction
} else if apierrors.IsNotFound(err) {
doneCh <- true
return
} else if apierrors.IsTooManyRequests(err) {
time.Sleep(5 * time.Second)
} else {
errCh <- errors.Wrapf(err, "error when evicting pod %q", pod.Name)
return
}
}
}
podArray := []v1.Pod{pod}
_, err = o.client.WaitForDelete(o.logger, podArray, true)
if err == nil {
doneCh <- true
} else {
errCh <- errors.Wrapf(err, "error when waiting for pod %q terminating", pod.Name)
}
}(ctx, pod, doneCh, errCh)
}
doneCount := 0
for {
select {
case err := <-errCh:
return err
case <-doneCh:
doneCount++
if doneCount == len(pods) {
return nil
}
case <-time.After(o.timeout):
return errors.Errorf("Drain did not complete within %v", o.timeout)
}
}
}
func (o *drainOperation) deletePods(pods []v1.Pod) error {
for _, pod := range pods {
err := o.client.DeletePod(&pod)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
}
_, err := o.client.WaitForDelete(o.logger, pods, false)
return err
}