Refactor CRD watcher to allow refreshing (drop old, create new) (#142)

* refactor CRD watcher to allow refreshing (drop old, create new), add unit tests

* changes while testing

* make the CRD refresh interval configurable
This commit is contained in:
Duke Harlan 2021-02-17 17:07:38 -08:00 коммит произвёл GitHub
Родитель be0fa462f3
Коммит 6c0fc7bb71
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 408 добавлений и 57 удалений

1
go.mod
Просмотреть файл

@ -29,6 +29,7 @@ require (
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
google.golang.org/appengine v1.6.5 // indirect
k8s.io/api v0.17.0
k8s.io/apiextensions-apiserver v0.17.0
k8s.io/apimachinery v0.17.0
k8s.io/client-go v0.17.0

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, salesforce.com, inc.
* Copyright (c) 2021, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
@ -10,7 +10,17 @@ package ingress
import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
"github.com/golang/protobuf/ptypes"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/salesforce/sloop/pkg/sloop/kubeextractor"
"github.com/salesforce/sloop/pkg/sloop/store/typed"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -20,15 +30,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sync"
"time"
"github.com/golang/glog"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/salesforce/sloop/pkg/sloop/kubeextractor"
"github.com/salesforce/sloop/pkg/sloop/store/typed"
)
/*
@ -39,16 +40,6 @@ type KubeWatcher interface {
Stop()
}
type kubeWatcherImpl struct {
informerFactory informers.SharedInformerFactory
stopChan chan struct{}
outchan chan typed.KubeWatchResult
resync time.Duration
outchanlock *sync.Mutex
stopped bool
currentContext string
}
type crdGroupVersionResourceKind struct {
group string
version string
@ -56,15 +47,40 @@ type crdGroupVersionResourceKind struct {
kind string
}
type crdInformerInfo struct {
crd crdGroupVersionResourceKind
stopChan chan struct{}
}
type kubeWatcherImpl struct {
informerFactory informers.SharedInformerFactory
stopChan chan struct{}
crdInformers map[crdGroupVersionResourceKind]*crdInformerInfo
activeCrdInformer int64
outchan chan typed.KubeWatchResult
resync time.Duration
protection *sync.Mutex
stopped bool
refreshCrd *time.Ticker
currentContext string
}
var (
newCrdClient = func(kubeCfg *rest.Config) (clientset.Interface, error) { return clientset.NewForConfig(kubeCfg) }
metricIngressKubewatchcount = promauto.NewCounterVec(prometheus.CounterOpts{Name: "sloop_ingress_kubewatchcount"}, []string{"kind", "watchtype", "namespace"})
metricIngressKubewatchbytes = promauto.NewCounterVec(prometheus.CounterOpts{Name: "sloop_ingress_kubewatchbytes"}, []string{"kind", "watchtype", "namespace"})
metricCrdInformerStarted = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_crd_informer_started"})
metricCrdInformerRunning = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_crd_informer_running"})
)
// Todo: Add additional parameters for filtering
func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, masterURL string, kubeContext string) (KubeWatcher, error) {
kw := &kubeWatcherImpl{resync: resync, outchanlock: &sync.Mutex{}}
func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, crdRefreshInterval time.Duration, masterURL string, kubeContext string) (KubeWatcher, error) {
kw := &kubeWatcherImpl{resync: resync, protection: &sync.Mutex{}}
kw.stopChan = make(chan struct{})
kw.crdInformers = make(map[crdGroupVersionResourceKind]*crdInformerInfo)
kw.outchan = outChan
kw.startWellKnownInformers(kubeClient)
@ -73,6 +89,9 @@ func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.Ku
if err != nil {
return nil, err
}
kw.refreshCrd = time.NewTicker(crdRefreshInterval)
go kw.refreshCrdInformers(masterURL, kubeContext)
}
return kw, nil
@ -81,10 +100,14 @@ func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.Ku
func (i *kubeWatcherImpl) startWellKnownInformers(kubeclient kubernetes.Interface) {
i.informerFactory = informers.NewSharedInformerFactory(kubeclient, i.resync)
i.informerFactory.Apps().V1().DaemonSets().Informer().AddEventHandler(i.getEventHandlerForResource("DaemonSet"))
i.informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(i.getEventHandlerForResource("Deployment"))
i.informerFactory.Apps().V1().ReplicaSets().Informer().AddEventHandler(i.getEventHandlerForResource("ReplicaSet"))
i.informerFactory.Apps().V1().StatefulSets().Informer().AddEventHandler(i.getEventHandlerForResource("StatefulSet"))
i.informerFactory.Core().V1().ConfigMaps().Informer().AddEventHandler(i.getEventHandlerForResource("ConfigMap"))
i.informerFactory.Core().V1().Endpoints().Informer().AddEventHandler(i.getEventHandlerForResource("Endpoint"))
i.informerFactory.Core().V1().Events().Informer().AddEventHandler(i.getEventHandlerForResource("Event"))
i.informerFactory.Core().V1().Namespaces().Informer().AddEventHandler(i.getEventHandlerForResource("Namespace"))
i.informerFactory.Core().V1().Nodes().Informer().AddEventHandler(i.getEventHandlerForResource("Node"))
i.informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(i.getEventHandlerForResource("PersistentVolumeClaim"))
@ -92,66 +115,134 @@ func (i *kubeWatcherImpl) startWellKnownInformers(kubeclient kubernetes.Interfac
i.informerFactory.Core().V1().Pods().Informer().AddEventHandler(i.getEventHandlerForResource("Pod"))
i.informerFactory.Core().V1().Services().Informer().AddEventHandler(i.getEventHandlerForResource("Service"))
i.informerFactory.Core().V1().ReplicationControllers().Informer().AddEventHandler(i.getEventHandlerForResource("ReplicationController"))
i.informerFactory.Extensions().V1beta1().DaemonSets().Informer().AddEventHandler(i.getEventHandlerForResource("DaemonSet"))
i.informerFactory.Extensions().V1beta1().ReplicaSets().Informer().AddEventHandler(i.getEventHandlerForResource("ReplicaSet"))
i.informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(i.getEventHandlerForResource("StorageClass"))
i.informerFactory.Core().V1().Events().Informer().AddEventHandler(i.getEventHandlerForResource("Event"))
i.informerFactory.Start(i.stopChan)
}
func (i *kubeWatcherImpl) startCustomInformers(masterURL string, kubeContext string) error {
clientCfg := getConfig(masterURL, kubeContext)
kubeCfg, err := clientCfg.ClientConfig()
if err != nil {
return errors.Wrap(err, "failed to read config while starting custom informers")
}
crdList, err := getCrdList(kubeCfg)
crdClient, err := newCrdClient(kubeCfg)
if err != nil {
return err
return errors.Wrap(err, "failed to instantiate client for querying CRDs")
}
crdList, err := getCrdList(crdClient)
if err != nil {
return errors.Wrap(err, "failed to query list of CRDs")
}
glog.Infof("Found %d CRD definitions", len(crdList))
dynamicClient, err := dynamic.NewForConfig(kubeCfg)
if err != nil {
return errors.Wrap(err, "failed to instantiate client for custom informers")
}
f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, i.resync, "", nil)
existing := i.pullCrdInformers()
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, i.resync, "", nil)
for _, crd := range crdList {
resource := schema.GroupVersionResource{Group: crd.group, Version: crd.version, Resource: crd.resource}
resourceKind := crd.kind
informer := f.ForResource(resource)
informer.Informer().AddEventHandler(i.getEventHandlerForResource(resourceKind))
go func() {
glog.V(2).Infof("Starting CRD informer for: %s (%v)", resourceKind, resource)
informer.Informer().Run(i.stopChan)
glog.V(2).Infof("Exited CRD informer for: %s", resourceKind)
}()
i.existingOrStartNewCrdInformer(crd, existing, factory)
}
glog.Infof("Stopping %d CRD Informers", len(existing))
stopUnwantedCrdInformers(existing)
metricCrdInformerStarted.Set(float64(len(i.crdInformers)))
return nil
}
func getCrdList(kubeCfg *rest.Config) ([]crdGroupVersionResourceKind, error) {
crdClient, err := clientset.NewForConfig(kubeCfg)
if err != nil {
return nil, errors.Wrap(err, "failed to instantiate client for querying CRDs")
func (i *kubeWatcherImpl) pullCrdInformers() map[crdGroupVersionResourceKind]*crdInformerInfo {
i.protection.Lock()
defer i.protection.Unlock()
crdInformers := i.crdInformers
i.crdInformers = make(map[crdGroupVersionResourceKind]*crdInformerInfo)
return crdInformers
}
func (i *kubeWatcherImpl) existingOrStartNewCrdInformer(crd crdGroupVersionResourceKind, existing map[crdGroupVersionResourceKind]*crdInformerInfo, factory dynamicinformer.DynamicSharedInformerFactory) {
i.protection.Lock()
defer i.protection.Unlock()
if i.stopped {
return
}
// if there is an existing informer for this crd, then keep using the existing informer
crdInformer, found := existing[crd]
if found {
i.crdInformers[crd] = crdInformer
delete(existing, crd) // remove from existing so it wont get stopped as unwanted
return
}
// need an informer for this crd
crdInformer = &crdInformerInfo{crd: crd, stopChan: make(chan struct{})}
i.crdInformers[crd] = crdInformer
i.startNewCrdInformer(crdInformer, factory)
}
func (i *kubeWatcherImpl) startNewCrdInformer(crdInformer *crdInformerInfo, factory dynamicinformer.DynamicSharedInformerFactory) {
gvr := schema.GroupVersionResource{Group: crdInformer.crd.group, Version: crdInformer.crd.version, Resource: crdInformer.crd.resource}
kind := crdInformer.crd.kind
informer := factory.ForResource(gvr)
informer.Informer().AddEventHandler(i.getEventHandlerForResource(kind))
go func() {
glog.V(2).Infof("Starting CRD informer for: %s (%v)", kind, gvr)
metricCrdInformerRunning.Set(float64(atomic.AddInt64(&i.activeCrdInformer, 1)))
informer.Informer().Run(crdInformer.stopChan)
glog.V(2).Infof("Exited CRD informer for: %s (%v)", kind, gvr)
metricCrdInformerRunning.Set(float64(atomic.AddInt64(&i.activeCrdInformer, -1)))
}()
}
func stopUnwantedCrdInformers(existing map[crdGroupVersionResourceKind]*crdInformerInfo) {
// no lock is needed - all these informers should be disconnected from kubeWatcherImpl
for _, v := range existing {
gvr := schema.GroupVersionResource{Group: v.crd.group, Version: v.crd.version, Resource: v.crd.resource}
glog.V(2).Infof("Stopping CRD informer for: %s (%v)", v.crd.kind, gvr)
close(v.stopChan)
}
}
func getCrdList(crdClient clientset.Interface) ([]crdGroupVersionResourceKind, error) {
crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(metav1.ListOptions{})
if err != nil {
glog.Errorf("Failed to get CRD list from ApiextensionsV1, falling back to ApiextensionsV1beta1: %v", err)
return getCrdListV1beta1(crdClient)
}
var resources []crdGroupVersionResourceKind
for _, crd := range crdList.Items {
for _, version := range crd.Spec.Versions {
gvrk := crdGroupVersionResourceKind{group: crd.Spec.Group, version: version.Name, resource: crd.Spec.Names.Plural, kind: crd.Spec.Names.Kind}
glog.V(2).Infof("CRD: group: %s, version: %s, kind: %s, plural:%s, singular:%s, short names:%v", crd.Spec.Group, version.Name, crd.Spec.Names.Kind, crd.Spec.Names.Plural, crd.Spec.Names.Singular, crd.Spec.Names.ShortNames)
resources = append(resources, gvrk)
}
}
return resources, nil
}
func getCrdListV1beta1(crdClient clientset.Interface) ([]crdGroupVersionResourceKind, error) {
crdList, err := crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().List(metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "failed to query CRDs")
}
glog.Infof("Found %d CRD definitions", len(crdList.Items))
// duplicated code (see getCrdList), the types for crdList are different
var resources []crdGroupVersionResourceKind
for _, crd := range crdList.Items {
gvrk := crdGroupVersionResourceKind{group: crd.Spec.Group, version: crd.Spec.Version, resource: crd.Spec.Names.Plural, kind: crd.Spec.Names.Kind}
glog.V(5).Infof("CRD: group: %s, version: %s, kind: %s, plural:%s, singular:%s, short names:%v", crd.Spec.Group, crd.Spec.Version, crd.Spec.Names.Kind, crd.Spec.Names.Plural, crd.Spec.Names.Singular, crd.Spec.Names.ShortNames)
resources = append(resources, gvrk)
for _, version := range crd.Spec.Versions {
gvrk := crdGroupVersionResourceKind{group: crd.Spec.Group, version: version.Name, resource: crd.Spec.Names.Plural, kind: crd.Spec.Names.Kind}
glog.V(2).Infof("CRD: group: %s, version: %s, kind: %s, plural:%s, singular:%s, short names:%v", crd.Spec.Group, version.Name, crd.Spec.Names.Kind, crd.Spec.Names.Plural, crd.Spec.Names.Singular, crd.Spec.Names.ShortNames)
resources = append(resources, gvrk)
}
}
return resources, nil
}
@ -208,14 +299,14 @@ func (i *kubeWatcherImpl) processUpdate(kind string, obj interface{}, watchResul
}
kubeMetadata, err := kubeextractor.ExtractMetadata(resourceJson)
if err != nil {
if err != nil || kubeMetadata.Namespace == "" {
// We are only grabbing namespace here for a prometheus metric, so if metadata extract fails we just log and continue
glog.V(2).Infof("No namespace for resource: %v", err)
}
metricIngressKubewatchcount.WithLabelValues(kind, watchResult.WatchType.String(), kubeMetadata.Namespace).Inc()
metricIngressKubewatchbytes.WithLabelValues(kind, watchResult.WatchType.String(), kubeMetadata.Namespace).Add(float64(len(resourceJson)))
glog.V(5).Infof("Informer update - Name: %s, Namespace: %s, ResourceVersion: %s", kubeMetadata.Name, kubeMetadata.Namespace, kubeMetadata.ResourceVersion)
glog.V(5).Infof("Informer update (%s) - Name: %s, Namespace: %s, ResourceVersion: %s", watchResult.WatchType, kubeMetadata.Name, kubeMetadata.Namespace, kubeMetadata.ResourceVersion)
watchResult.Payload = resourceJson
i.writeToOutChan(watchResult)
}
@ -224,12 +315,12 @@ func (i *kubeWatcherImpl) writeToOutChan(watchResult *typed.KubeWatchResult) {
// We need to ensure that no messages are written to outChan after stop is called
// Kube watch library has a way to tell it to stop, but no way to know it is complete
// Use a lock around output channel for this purpose
i.outchanlock.Lock()
defer i.outchanlock.Unlock()
i.protection.Lock()
defer i.protection.Unlock()
if i.stopped {
return
}
i.outchan <- *watchResult
i.outchan <- *watchResult // WARNING - if this channel gets full, this push will block while holding i.protection in a locked state
}
func (i *kubeWatcherImpl) getResourceAsJsonString(kind string, obj interface{}) (string, error) {
@ -241,15 +332,30 @@ func (i *kubeWatcherImpl) getResourceAsJsonString(kind string, obj interface{})
return string(bytes), nil
}
func (i *kubeWatcherImpl) refreshCrdInformers(masterURL string, kubeContext string) {
for _ = range i.refreshCrd.C {
glog.Infof("Starting to refresh CRD informers")
err := i.startCustomInformers(masterURL, kubeContext)
if err != nil {
glog.Errorf("Failed to refresh CRD informers: %v", err)
}
}
}
func (i *kubeWatcherImpl) Stop() {
glog.Infof("Stopping kubeWatcher")
i.outchanlock.Lock()
i.protection.Lock()
if i.stopped {
return
}
i.stopped = true
i.outchanlock.Unlock()
i.protection.Unlock()
if i.refreshCrd != nil {
i.refreshCrd.Stop()
}
close(i.stopChan)
stopUnwantedCrdInformers(i.pullCrdInformers())
}

Просмотреть файл

@ -0,0 +1,242 @@
/*
* Copyright (c) 2021, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
package ingress
import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/salesforce/sloop/pkg/sloop/store/typed"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
clientsetFake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic/dynamicinformer"
dynamicFake "k8s.io/client-go/dynamic/fake"
kubernetesFake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
k8sTesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)
type dummyData struct {
Namespace string
}
// when fake client tries to list CRDs, return a list with one defined
func reactionListOfOne(_ k8sTesting.Action) (bool, runtime.Object, error) {
versions := []apiextensionsv1.CustomResourceDefinitionVersion{{Name: "v1"}}
name := apiextensionsv1.CustomResourceDefinitionNames{Plural: "things", Kind: "k"}
spec := apiextensionsv1.CustomResourceDefinitionSpec{Group: "g", Versions: versions, Names: name}
crd := apiextensionsv1.CustomResourceDefinition{Spec: spec}
list := apiextensionsv1.CustomResourceDefinitionList{Items: []apiextensionsv1.CustomResourceDefinition{crd}}
return true, &list, nil
}
// when fake client tries to list CRDs, return an error
func reactionError(_ k8sTesting.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("failed")
}
// newTestCrdClient - provides a function pointer to create a fake clientset
// takes: k8sTesting.Action function pointer - adds the reaction to the fake clientset
// returns a function pointer
// takes: restConfig
// returns: clientset.Interface & error (always nil)
func newTestCrdClient(reaction func(_ k8sTesting.Action) (bool, runtime.Object, error)) func(_ *rest.Config) (clientset.Interface, error) {
return func(_ *rest.Config) (clientset.Interface, error) {
crdClient := &clientsetFake.Clientset{}
crdClient.AddReactor("list", "*", reaction)
return crdClient, nil
}
}
// This test (test-harness) exercises the kubewatcher from the client perspective
// - start a kubewatcher
// - force a k8s event in the system
// - wait for an event
// - cleanup
func Test_bigPicture(t *testing.T) {
newCrdClient = newTestCrdClient(reactionListOfOne) // force startCustomInformers() to use a fake clientset
kubeClient := kubernetesFake.NewSimpleClientset()
outChan := make(chan typed.KubeWatchResult, 5)
resync := 30 * time.Minute
includeCrds := true
masterURL := "url"
kubeContext := "" // empty string makes things work
kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext)
assert.NoError(t, err)
// create service and await corresponding event
ns := "ns"
_, err = kubeClient.CoreV1().Namespaces().Create(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
if err != nil {
t.FailNow()
}
svc := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s"}}
_, err = kubeClient.CoreV1().Services(ns).Create(svc)
if err != nil {
t.Fatalf("Error creating service: %v\n", err)
}
_ = <-outChan
kw.Stop()
}
func Test_getCrdList(t *testing.T) {
crdClient, _ := newTestCrdClient(reactionError)(&rest.Config{})
crdList, err := getCrdList(crdClient)
assert.Error(t, err)
crdClient, _ = newTestCrdClient(reactionListOfOne)(&rest.Config{})
crdList, err = getCrdList(crdClient)
assert.Len(t, crdList, 1)
assert.NoError(t, err)
}
func Test_getEventHandlerForResource(t *testing.T) {
kw := &kubeWatcherImpl{protection: &sync.Mutex{}}
handler, ok := kw.getEventHandlerForResource("k").(cache.ResourceEventHandlerFuncs)
assert.True(t, ok)
assert.NotNil(t, handler)
assert.NotNil(t, handler.AddFunc)
assert.NotNil(t, handler.DeleteFunc)
assert.NotNil(t, handler.UpdateFunc)
}
func Test_reportAdd(t *testing.T) {
outChan := make(chan typed.KubeWatchResult, 5)
kw := &kubeWatcherImpl{protection: &sync.Mutex{}, outchan: outChan}
kind := "a"
report := kw.reportAdd(kind)
assert.NotNil(t, report)
obj := dummyData{Namespace: "n"}
bytes, err := json.Marshal(obj)
assert.Nil(t, err)
report(obj)
result := <-outChan
assert.Equal(t, kind, result.Kind)
assert.Equal(t, typed.KubeWatchResult_ADD, result.WatchType)
assert.Equal(t, string(bytes), result.Payload)
verifyChannelEmpty(t, outChan)
}
func Test_reportDelete(t *testing.T) {
outChan := make(chan typed.KubeWatchResult, 5)
kw := &kubeWatcherImpl{protection: &sync.Mutex{}, outchan: outChan}
kind := "d"
report := kw.reportDelete(kind)
assert.NotNil(t, report)
obj := dummyData{Namespace: "n"}
bytes, err := json.Marshal(obj)
assert.Nil(t, err)
report(obj)
result := <-outChan
assert.Equal(t, kind, result.Kind)
assert.Equal(t, typed.KubeWatchResult_DELETE, result.WatchType)
assert.Equal(t, string(bytes), result.Payload)
verifyChannelEmpty(t, outChan)
}
func Test_reportUpdate(t *testing.T) {
outChan := make(chan typed.KubeWatchResult, 5)
kw := &kubeWatcherImpl{protection: &sync.Mutex{}, outchan: outChan}
kind := "d"
report := kw.reportUpdate(kind)
assert.NotNil(t, report)
prev := dummyData{Namespace: "p"}
new := dummyData{Namespace: "n"}
bytes, err := json.Marshal(new)
assert.Nil(t, err)
report(prev, new)
result := <-outChan
assert.Equal(t, kind, result.Kind)
assert.Equal(t, typed.KubeWatchResult_UPDATE, result.WatchType)
assert.Equal(t, string(bytes), result.Payload)
verifyChannelEmpty(t, outChan)
}
func Test_processUpdate(t *testing.T) {
outChan := make(chan typed.KubeWatchResult, 5)
kw := &kubeWatcherImpl{protection: &sync.Mutex{}, outchan: outChan}
kind := "k"
obj := dummyData{Namespace: "n"}
kw.processUpdate(kind, obj, &typed.KubeWatchResult{Kind: kind})
result := <-outChan
assert.Equal(t, kind, result.Kind)
assert.NotEmpty(t, result.Payload)
verifyChannelEmpty(t, outChan)
}
func verifyChannelEmpty(t *testing.T, outChan chan typed.KubeWatchResult) {
select {
case _, _ = <-outChan:
assert.Fail(t, "expected channel to be empty")
default:
// channel is empty
}
}
func Test_existingOrStartNewCrdInformer(t *testing.T) {
kw := &kubeWatcherImpl{protection: &sync.Mutex{}}
kw.crdInformers = make(map[crdGroupVersionResourceKind]*crdInformerInfo)
client := dynamicFake.NewSimpleDynamicClient(runtime.NewScheme())
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(client, 30*time.Minute, "", nil)
crd := crdGroupVersionResourceKind{group: "g", version: "v", resource: "r", kind: "k"}
existing := make(map[crdGroupVersionResourceKind]*crdInformerInfo)
// start new informer
kw.existingOrStartNewCrdInformer(crd, existing, factory)
assert.Len(t, kw.crdInformers, 1)
for atomic.LoadInt64(&kw.activeCrdInformer) == 0 { // wait for the go routine to start
time.Sleep(time.Millisecond)
}
// refresh - start existing informer
existing = kw.pullCrdInformers()
assert.Len(t, kw.crdInformers, 0)
assert.Len(t, existing, 1)
kw.existingOrStartNewCrdInformer(crd, existing, factory)
assert.Len(t, kw.crdInformers, 1)
assert.Len(t, existing, 0)
assert.Equal(t, int64(1), atomic.LoadInt64(&kw.activeCrdInformer))
// cleanup the informer
stopUnwantedCrdInformers(kw.pullCrdInformers())
for atomic.LoadInt64(&kw.activeCrdInformer) != 0 { // wait for the go routine to exit
time.Sleep(time.Millisecond)
}
}

Просмотреть файл

@ -53,6 +53,7 @@ type SloopConfig struct {
DisplayContext string `json:"displayContext"`
ApiServerHost string `json:"apiServerHost"`
WatchCrds bool `json:"watchCrds"`
CrdRefreshInterval time.Duration `json:"crdRefreshInterval"`
ThresholdForGC float64 `json:"threshold for GC"`
RestoreDatabaseFile string `json:"restoreDatabaseFile"`
BadgerDiscardRatio float64 `json:"badgerDiscardRatio"`
@ -99,6 +100,7 @@ func registerFlags(fs *flag.FlagSet, config *SloopConfig) {
fs.StringVar(&config.DisplayContext, "display-context", "", "Use this to override the display context. When running in k8s the context is empty string. This lets you override that (mainly useful if you are running many copies of sloop on different clusters) ")
fs.StringVar(&config.ApiServerHost, "apiserver-host", "", "Kubernetes API server endpoint")
fs.BoolVar(&config.WatchCrds, "watch-crds", true, "Watch for activity for CRDs")
fs.DurationVar(&config.CrdRefreshInterval, "crd-refresh-interval", time.Duration(5*time.Minute), "Frequency between CRD Informer refresh")
fs.StringVar(&config.RestoreDatabaseFile, "restore-database-file", "", "Restore database from backup file into current context.")
fs.Float64Var(&config.BadgerDiscardRatio, "badger-discard-ratio", 0.99, "Badger value log GC uses this value to decide if it wants to compact a vlog file. The lower the value of discardRatio the higher the number of !badger!move keys. And thus more the number of !badger!move keys, the size on disk keeps on increasing over time.")
fs.Float64Var(&config.ThresholdForGC, "gc-threshold", 0.8, "Threshold for GC to start garbage collecting")

Просмотреть файл

@ -102,7 +102,7 @@ func RealMain() error {
return errors.Wrap(err, "failed to create kubernetes client")
}
kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.ApiServerHost, kubeContext)
kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.CrdRefreshInterval, conf.ApiServerHost, kubeContext)
if err != nil {
return errors.Wrap(err, "failed to initialize kubeWatcher")
}