feat: [NPM] Support pod grace period for v2 (#1095)
* Use official k8s equals functions for label instead of our own implementation * Update v2 pod controller to support graceful shutdown of pod
This commit is contained in:
Родитель
cc68c342fa
Коммит
76e7531a08
|
@ -21,6 +21,7 @@ import (
|
|||
coreinformer "k8s.io/client-go/informers/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
|
||||
k8slabels "k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
|
@ -88,7 +89,7 @@ func (nPod *NpmPod) noUpdate(podObj *corev1.Pod) bool {
|
|||
nPod.Name == podObj.ObjectMeta.Name &&
|
||||
nPod.Phase == podObj.Status.Phase &&
|
||||
nPod.PodIP == podObj.Status.PodIP &&
|
||||
util.IsSameLabels(nPod.Labels, podObj.ObjectMeta.Labels) &&
|
||||
k8slabels.Equals(nPod.Labels, podObj.ObjectMeta.Labels) &&
|
||||
// TODO(jungukcho) to avoid using DeepEqual for ContainerPorts,
|
||||
// it needs a precise sorting. Will optimize it later if needed.
|
||||
reflect.DeepEqual(nPod.ContainerPorts, getContainerPortList(podObj))
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
@ -85,7 +86,7 @@ func (f *podFixture) newPodController(_ chan struct{}) {
|
|||
// f.kubeInformer.Start(stopCh)
|
||||
}
|
||||
|
||||
func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtwork bool, podPhase corev1.PodPhase) *corev1.Pod {
|
||||
func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNetwork bool, podPhase corev1.PodPhase) *corev1.Pod {
|
||||
return &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
|
@ -94,7 +95,7 @@ func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtw
|
|||
ResourceVersion: rv,
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
HostNetwork: isHostNewtwork,
|
||||
HostNetwork: isHostNetwork,
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Ports: []corev1.ContainerPort{
|
||||
|
@ -734,6 +735,88 @@ func TestHasValidPodIP(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIsCompletePod(t *testing.T) {
|
||||
var zeroGracePeriod int64
|
||||
var defaultGracePeriod int64 = 30
|
||||
|
||||
type podState struct {
|
||||
phase corev1.PodPhase
|
||||
deletionTimestamp *metav1.Time
|
||||
deletionGracePeriodSeconds *int64
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
podState podState
|
||||
expectedCompletedPod bool
|
||||
}{
|
||||
|
||||
{
|
||||
name: "pod is in running status",
|
||||
podState: podState{
|
||||
phase: corev1.PodRunning,
|
||||
deletionTimestamp: nil,
|
||||
deletionGracePeriodSeconds: nil,
|
||||
},
|
||||
expectedCompletedPod: false,
|
||||
},
|
||||
{
|
||||
name: "pod is in completely terminating states after graceful shutdown period",
|
||||
podState: podState{
|
||||
phase: corev1.PodRunning,
|
||||
deletionTimestamp: &metav1.Time{},
|
||||
deletionGracePeriodSeconds: &zeroGracePeriod,
|
||||
},
|
||||
expectedCompletedPod: true,
|
||||
},
|
||||
{
|
||||
name: "pod is in terminating states, but in graceful shutdown period",
|
||||
podState: podState{
|
||||
phase: corev1.PodRunning,
|
||||
deletionTimestamp: &metav1.Time{},
|
||||
deletionGracePeriodSeconds: &defaultGracePeriod,
|
||||
},
|
||||
expectedCompletedPod: false,
|
||||
},
|
||||
{
|
||||
name: "pod is in PodSucceeded status",
|
||||
podState: podState{
|
||||
phase: corev1.PodSucceeded,
|
||||
deletionTimestamp: nil,
|
||||
deletionGracePeriodSeconds: nil,
|
||||
},
|
||||
expectedCompletedPod: true,
|
||||
},
|
||||
{
|
||||
name: "pod is in PodFailed status",
|
||||
podState: podState{
|
||||
phase: corev1.PodSucceeded,
|
||||
deletionTimestamp: nil,
|
||||
deletionGracePeriodSeconds: nil,
|
||||
},
|
||||
expectedCompletedPod: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
corev1Pod := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
DeletionTimestamp: tt.podState.deletionTimestamp,
|
||||
DeletionGracePeriodSeconds: tt.podState.deletionGracePeriodSeconds,
|
||||
},
|
||||
Status: corev1.PodStatus{
|
||||
Phase: tt.podState.phase,
|
||||
},
|
||||
}
|
||||
isPodCompleted := isCompletePod(corev1Pod)
|
||||
require.Equal(t, tt.expectedCompletedPod, isPodCompleted)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Extra unit test which is not quite related to PodController,
|
||||
// but help to understand how workqueue works to make event handler logic lock-free.
|
||||
// If the same key are queued into workqueue in multiple times,
|
||||
|
@ -768,3 +851,71 @@ func TestWorkQueue(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNPMPodNoUpdate(t *testing.T) {
|
||||
type podInfo struct {
|
||||
podName string
|
||||
ns string
|
||||
rv string
|
||||
podIP string
|
||||
labels map[string]string
|
||||
isHostNetwork bool
|
||||
podPhase corev1.PodPhase
|
||||
}
|
||||
|
||||
labels := map[string]string{
|
||||
"app": "test-pod",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
podInfo
|
||||
updatingNPMPod bool
|
||||
expectedNoUpdate bool
|
||||
}{
|
||||
{
|
||||
"Required update of NPMPod given Pod",
|
||||
podInfo{
|
||||
podName: "test-pod-1",
|
||||
ns: "test-namespace",
|
||||
rv: "0",
|
||||
podIP: "1.2.3.4",
|
||||
labels: labels,
|
||||
isHostNetwork: NonHostNetwork,
|
||||
podPhase: corev1.PodRunning,
|
||||
},
|
||||
false,
|
||||
false,
|
||||
},
|
||||
{
|
||||
"No required update of NPMPod given Pod",
|
||||
podInfo{
|
||||
podName: "test-pod-2",
|
||||
ns: "test-namespace",
|
||||
rv: "0",
|
||||
podIP: "1.2.3.4",
|
||||
labels: labels,
|
||||
isHostNetwork: NonHostNetwork,
|
||||
podPhase: corev1.PodRunning,
|
||||
},
|
||||
true,
|
||||
true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
corev1Pod := createPod(tt.podName, tt.ns, tt.rv, tt.podIP, tt.labels, tt.isHostNetwork, tt.podPhase)
|
||||
npmPod := newNpmPod(corev1Pod)
|
||||
if tt.updatingNPMPod {
|
||||
npmPod.appendLabels(corev1Pod.Labels, appendToExistingLabels)
|
||||
npmPod.updateNpmPodAttributes(corev1Pod)
|
||||
npmPod.appendContainerPorts(corev1Pod)
|
||||
}
|
||||
noUpdate := npmPod.noUpdate(corev1Pod)
|
||||
require.Equal(t, tt.expectedNoUpdate, noUpdate)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
k8slabels "k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformer "k8s.io/client-go/informers/core/v1"
|
||||
|
@ -83,6 +84,18 @@ func (nPod *NpmPod) updateNpmPodAttributes(podObj *corev1.Pod) {
|
|||
}
|
||||
}
|
||||
|
||||
// noUpdate evaluates whether NpmPod is required to be update given podObj.
|
||||
func (nPod *NpmPod) noUpdate(podObj *corev1.Pod) bool {
|
||||
return nPod.Namespace == podObj.ObjectMeta.Namespace &&
|
||||
nPod.Name == podObj.ObjectMeta.Name &&
|
||||
nPod.Phase == podObj.Status.Phase &&
|
||||
nPod.PodIP == podObj.Status.PodIP &&
|
||||
k8slabels.Equals(nPod.Labels, podObj.ObjectMeta.Labels) &&
|
||||
// TODO(jungukcho) to avoid using DeepEqual for ContainerPorts,
|
||||
// it needs a precise sorting. Will optimize it later if needed.
|
||||
reflect.DeepEqual(nPod.ContainerPorts, getContainerPortList(podObj))
|
||||
}
|
||||
|
||||
type PodController struct {
|
||||
podLister corelisters.PodLister
|
||||
workqueue workqueue.RateLimitingInterface
|
||||
|
@ -169,7 +182,8 @@ func (c *PodController) addPod(obj interface{}) {
|
|||
}
|
||||
podObj, _ := obj.(*corev1.Pod)
|
||||
|
||||
// If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, do not need to add it into workqueue.
|
||||
// To check whether this pod is needed to queue or not.
|
||||
// If the pod are in completely terminated states, the pod is not enqueued to avoid unnecessary computation.
|
||||
if isCompletePod(podObj) {
|
||||
return
|
||||
}
|
||||
|
@ -333,7 +347,9 @@ func (c *PodController) syncPod(key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, start clean-up the lastly applied states.
|
||||
// If this pod is completely in terminated states (which means pod is gracefully shutdown),
|
||||
// NPM starts clean-up the lastly applied states even in update events.
|
||||
// This proactive clean-up helps to miss stale pod object in case delete event is missed.
|
||||
if isCompletePod(pod) {
|
||||
if err = c.cleanUpDeletedPod(key); err != nil {
|
||||
return fmt.Errorf("Error: %w when when pod is in completed state", err)
|
||||
|
@ -346,7 +362,7 @@ func (c *PodController) syncPod(key string) error {
|
|||
// if pod does not have different states against lastly applied states stored in cachedNpmPod,
|
||||
// podController does not need to reconcile this update.
|
||||
// in this updatePod event, newPod was updated with states which PodController does not need to reconcile.
|
||||
if isInvalidPodUpdate(cachedNpmPod, pod) {
|
||||
if cachedNpmPod.noUpdate(pod) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -619,13 +635,20 @@ func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, p
|
|||
return nil
|
||||
}
|
||||
|
||||
// isCompletePod evaluates whether this pod is completely in terminated states,
|
||||
// which means pod is gracefully shutdown.
|
||||
func isCompletePod(podObj *corev1.Pod) bool {
|
||||
if podObj.DeletionTimestamp != nil {
|
||||
// DeletionTimestamp and DeletionGracePeriodSeconds in pod are not nil,
|
||||
// which means pod is expected to be deleted and
|
||||
// DeletionGracePeriodSeconds value is zero, which means the pod is gracefully terminated.
|
||||
if podObj.DeletionTimestamp != nil && podObj.DeletionGracePeriodSeconds != nil && *podObj.DeletionGracePeriodSeconds == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
// K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them
|
||||
// K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them.
|
||||
// So NPM will ignorer adding these pods
|
||||
// TODO(jungukcho): what are the values of DeletionTimestamp and podObj.DeletionGracePeriodSeconds
|
||||
// in either below status?
|
||||
if podObj.Status.Phase == corev1.PodSucceeded || podObj.Status.Phase == corev1.PodFailed {
|
||||
return true
|
||||
}
|
||||
|
@ -647,15 +670,3 @@ func getContainerPortList(podObj *corev1.Pod) []corev1.ContainerPort {
|
|||
}
|
||||
return portList
|
||||
}
|
||||
|
||||
// (TODO): better naming?
|
||||
func isInvalidPodUpdate(npmPod *NpmPod, newPodObj *corev1.Pod) bool {
|
||||
return npmPod.Namespace == newPodObj.ObjectMeta.Namespace &&
|
||||
npmPod.Name == newPodObj.ObjectMeta.Name &&
|
||||
npmPod.Phase == newPodObj.Status.Phase &&
|
||||
npmPod.PodIP == newPodObj.Status.PodIP &&
|
||||
newPodObj.ObjectMeta.DeletionTimestamp == nil &&
|
||||
newPodObj.ObjectMeta.DeletionGracePeriodSeconds == nil &&
|
||||
reflect.DeepEqual(npmPod.Labels, newPodObj.ObjectMeta.Labels) &&
|
||||
reflect.DeepEqual(npmPod.ContainerPorts, getContainerPortList(newPodObj))
|
||||
}
|
||||
|
|
|
@ -338,18 +338,3 @@ func CompareSlices(list1, list2 []string) bool {
|
|||
func SliceToString(list []string) string {
|
||||
return strings.Join(list, SetPolicyDelimiter)
|
||||
}
|
||||
|
||||
// IsSameLabels return if all pairs of key and value in two maps are same.
|
||||
// Otherwise, it returns false.
|
||||
func IsSameLabels(labelA, labelB map[string]string) bool {
|
||||
if len(labelA) != len(labelB) {
|
||||
return false
|
||||
}
|
||||
|
||||
for labelKey, labelVal := range labelA {
|
||||
if val, exist := labelB[labelKey]; !exist || labelVal != val {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/version"
|
||||
)
|
||||
|
||||
|
@ -324,108 +323,3 @@ func TestCompareSlices(t *testing.T) {
|
|||
t.Errorf("TestCompareSlices failed @ slice comparison 4")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsSameLabels(t *testing.T) {
|
||||
var nilLabel map[string]string
|
||||
tests := []struct {
|
||||
name string
|
||||
labelA map[string]string
|
||||
labelB map[string]string
|
||||
expectedIsSameLabel bool
|
||||
}{
|
||||
{
|
||||
name: "Empty labels",
|
||||
labelA: map[string]string{},
|
||||
labelB: map[string]string{},
|
||||
expectedIsSameLabel: true,
|
||||
},
|
||||
{
|
||||
name: "Empty label and Nil label",
|
||||
labelA: map[string]string{},
|
||||
labelB: nilLabel,
|
||||
expectedIsSameLabel: true,
|
||||
},
|
||||
{
|
||||
name: "Same labels",
|
||||
labelA: map[string]string{
|
||||
"e": "f",
|
||||
"c": "d",
|
||||
"a": "b",
|
||||
},
|
||||
labelB: map[string]string{
|
||||
"e": "f",
|
||||
"c": "d",
|
||||
"a": "b",
|
||||
},
|
||||
expectedIsSameLabel: true,
|
||||
},
|
||||
{
|
||||
name: "Same labels with different ordered addition",
|
||||
labelA: map[string]string{
|
||||
"e": "f",
|
||||
"c": "d",
|
||||
"a": "b",
|
||||
},
|
||||
labelB: map[string]string{
|
||||
"c": "d",
|
||||
"e": "f",
|
||||
"a": "b",
|
||||
},
|
||||
expectedIsSameLabel: true,
|
||||
},
|
||||
{
|
||||
name: "Different length",
|
||||
labelA: map[string]string{
|
||||
"e": "f",
|
||||
},
|
||||
labelB: map[string]string{
|
||||
"e": "f",
|
||||
"a": "b",
|
||||
},
|
||||
expectedIsSameLabel: false,
|
||||
},
|
||||
{
|
||||
name: "Different (empty map and non-empty map)",
|
||||
labelA: map[string]string{},
|
||||
labelB: map[string]string{
|
||||
"e": "f",
|
||||
"c": "d",
|
||||
"a": "b",
|
||||
},
|
||||
expectedIsSameLabel: false,
|
||||
},
|
||||
{
|
||||
name: "Different (nil map and non-empty map)",
|
||||
labelA: nilLabel,
|
||||
labelB: map[string]string{
|
||||
"e": "f",
|
||||
"c": "d",
|
||||
"a": "b",
|
||||
},
|
||||
expectedIsSameLabel: false,
|
||||
},
|
||||
{
|
||||
name: "Have a different one pair of key and value",
|
||||
labelA: map[string]string{
|
||||
"e": "f",
|
||||
"d": "c",
|
||||
"a": "b",
|
||||
},
|
||||
labelB: map[string]string{
|
||||
"e": "f",
|
||||
"c": "d",
|
||||
"a": "b",
|
||||
},
|
||||
expectedIsSameLabel: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
got := IsSameLabels(tt.labelA, tt.labelB)
|
||||
require.Equal(t, tt.expectedIsSameLabel, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче