List CRDs via cache and avoid extra work on pod update (#232)
* Two main items: - Utilize the client cache for listing of CRDs - Avoids going to API Server for every list call. - Resolve the part of #182 related to the API Server calls. - Avoid calling sync loop at every pod update. Only call where there is a change in scheduled node name. * Address review comments - rename watcher variables to informer which is more closer to functionality
This commit is contained in:
Родитель
3d3fe6bf22
Коммит
ca7ec2a455
|
@ -144,9 +144,9 @@ const (
|
|||
)
|
||||
|
||||
const (
|
||||
AzureIDResource = "azureidentities"
|
||||
AzureIDBindingResource = "azureidentitybindings"
|
||||
AureAssignedIDResource = "azureassignedidentities"
|
||||
AzureIDResource = "azureidentities"
|
||||
AzureIDBindingResource = "azureidentitybindings"
|
||||
AzureAssignedIDResource = "azureassignedidentities"
|
||||
)
|
||||
|
||||
// AzureIdentityBindingSpec matches the pod with the Identity.
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/Azure/aad-pod-identity/pkg/stats"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
@ -18,9 +18,12 @@ import (
|
|||
)
|
||||
|
||||
type Client struct {
|
||||
rest *rest.RESTClient
|
||||
BindingWatcher cache.SharedInformer
|
||||
IdWatcher cache.SharedInformer
|
||||
rest *rest.RESTClient
|
||||
BindingListWatch *cache.ListWatch
|
||||
BindingInformer cache.SharedInformer
|
||||
IDListWatch *cache.ListWatch
|
||||
IDInformer cache.SharedInformer
|
||||
AssignedIDListWatch *cache.ListWatch
|
||||
}
|
||||
|
||||
type ClientInt interface {
|
||||
|
@ -52,22 +55,31 @@ func NewCRDClient(config *rest.Config, eventCh chan aadpodid.EventType) (crdClie
|
|||
return nil, err
|
||||
}
|
||||
|
||||
bindingWatcher, err := newBindingWatcher(restClient, eventCh)
|
||||
bindingListWatch := newBindingListWatch(restClient)
|
||||
|
||||
bindingInformer, err := newBindingInformer(restClient, eventCh, bindingListWatch)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
idWatcher, err := newIdWatcher(restClient, eventCh)
|
||||
idListWatch := newIDListWatch(restClient)
|
||||
|
||||
idInformer, err := newIDInformer(restClient, eventCh, idListWatch)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
assignedIDListWatch := newAssignedIDListWatch(restClient)
|
||||
|
||||
return &Client{
|
||||
rest: restClient,
|
||||
BindingWatcher: bindingWatcher,
|
||||
IdWatcher: idWatcher,
|
||||
rest: restClient,
|
||||
BindingListWatch: bindingListWatch,
|
||||
BindingInformer: bindingInformer,
|
||||
IDInformer: idInformer,
|
||||
IDListWatch: idListWatch,
|
||||
AssignedIDListWatch: assignedIDListWatch,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -96,15 +108,19 @@ func newRestClient(config *rest.Config) (r *rest.RESTClient, err error) {
|
|||
return restClient, nil
|
||||
}
|
||||
|
||||
func newBindingWatcher(r *rest.RESTClient, eventCh chan aadpodid.EventType) (cache.SharedInformer, error) {
|
||||
azBindingWatcher := cache.NewSharedInformer(
|
||||
cache.NewListWatchFromClient(r, aadpodid.AzureIDBindingResource, v1.NamespaceAll, fields.Everything()),
|
||||
func newBindingListWatch(r *rest.RESTClient) *cache.ListWatch {
|
||||
return cache.NewListWatchFromClient(r, aadpodid.AzureIDBindingResource, v1.NamespaceAll, fields.Everything())
|
||||
}
|
||||
|
||||
func newBindingInformer(r *rest.RESTClient, eventCh chan aadpodid.EventType, lw *cache.ListWatch) (cache.SharedInformer, error) {
|
||||
azBindingInformer := cache.NewSharedInformer(
|
||||
lw,
|
||||
&aadpodid.AzureIdentityBinding{},
|
||||
time.Minute*10)
|
||||
if azBindingWatcher == nil {
|
||||
if azBindingInformer == nil {
|
||||
return nil, fmt.Errorf("Could not create watcher for %s", aadpodid.AzureIDBindingResource)
|
||||
}
|
||||
azBindingWatcher.AddEventHandler(
|
||||
azBindingInformer.AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
glog.V(6).Infof("Binding created")
|
||||
|
@ -120,18 +136,22 @@ func newBindingWatcher(r *rest.RESTClient, eventCh chan aadpodid.EventType) (cac
|
|||
},
|
||||
},
|
||||
)
|
||||
return azBindingWatcher, nil
|
||||
return azBindingInformer, nil
|
||||
}
|
||||
|
||||
func newIdWatcher(r *rest.RESTClient, eventCh chan aadpodid.EventType) (cache.SharedInformer, error) {
|
||||
azIdWatcher := cache.NewSharedInformer(
|
||||
cache.NewListWatchFromClient(r, aadpodid.AzureIDResource, v1.NamespaceAll, fields.Everything()),
|
||||
func newIDListWatch(r *rest.RESTClient) *cache.ListWatch {
|
||||
return cache.NewListWatchFromClient(r, aadpodid.AzureIDResource, v1.NamespaceAll, fields.Everything())
|
||||
}
|
||||
|
||||
func newIDInformer(r *rest.RESTClient, eventCh chan aadpodid.EventType, lw *cache.ListWatch) (cache.SharedInformer, error) {
|
||||
azIDInformer := cache.NewSharedInformer(
|
||||
lw,
|
||||
&aadpodid.AzureIdentity{},
|
||||
time.Minute*10)
|
||||
if azIdWatcher == nil {
|
||||
if azIDInformer == nil {
|
||||
return nil, fmt.Errorf("Could not create Identity watcher for %s", aadpodid.AzureIDResource)
|
||||
}
|
||||
azIdWatcher.AddEventHandler(
|
||||
azIDInformer.AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
glog.V(6).Infof("Identity created")
|
||||
|
@ -147,12 +167,16 @@ func newIdWatcher(r *rest.RESTClient, eventCh chan aadpodid.EventType) (cache.Sh
|
|||
},
|
||||
},
|
||||
)
|
||||
return azIdWatcher, nil
|
||||
return azIDInformer, nil
|
||||
}
|
||||
|
||||
func newAssignedIDListWatch(r *rest.RESTClient) *cache.ListWatch {
|
||||
return cache.NewListWatchFromClient(r, aadpodid.AzureAssignedIDResource, v1.NamespaceAll, fields.Everything())
|
||||
}
|
||||
|
||||
func (c *Client) Start(exit <-chan struct{}) {
|
||||
go c.BindingWatcher.Run(exit)
|
||||
go c.IdWatcher.Run(exit)
|
||||
go c.BindingInformer.Run(exit)
|
||||
go c.IDInformer.Run(exit)
|
||||
glog.Info("CRD watchers started")
|
||||
}
|
||||
|
||||
|
@ -191,50 +215,48 @@ func (c *Client) CreateAssignedIdentity(assignedIdentity *aadpodid.AzureAssigned
|
|||
|
||||
func (c *Client) ListBindings() (res *[]aadpodid.AzureIdentityBinding, err error) {
|
||||
begin := time.Now()
|
||||
var ret aadpodid.AzureIdentityBindingList
|
||||
err = c.rest.Get().Namespace(v1.NamespaceAll).Resource("azureidentitybindings").Do().Into(&ret)
|
||||
|
||||
ret, err := c.BindingListWatch.List(v1.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
stats.Update(stats.BindingList, time.Since(begin))
|
||||
return &ret.Items, nil
|
||||
return &ret.(*aadpodid.AzureIdentityBindingList).Items, nil
|
||||
}
|
||||
|
||||
func (c *Client) ListAssignedIDs() (res *[]aadpodid.AzureAssignedIdentity, err error) {
|
||||
begin := time.Now()
|
||||
var ret aadpodid.AzureAssignedIdentityList
|
||||
err = c.rest.Get().Namespace(v1.NamespaceAll).Resource("azureassignedidentities").Do().Into(&ret)
|
||||
ret, err := c.AssignedIDListWatch.List(v1.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
stats.Update(stats.AssignedIDList, time.Since(begin))
|
||||
return &ret.Items, nil
|
||||
return &ret.(*aadpodid.AzureAssignedIdentityList).Items, nil
|
||||
}
|
||||
|
||||
func (c *Client) ListIds() (res *[]aadpodid.AzureIdentity, err error) {
|
||||
begin := time.Now()
|
||||
var ret aadpodid.AzureIdentityList
|
||||
err = c.rest.Get().Namespace(v1.NamespaceAll).Resource("azureidentities").Do().Into(&ret)
|
||||
ret, err := c.IDListWatch.List(v1.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
stats.Update(stats.IDList, time.Since(begin))
|
||||
return &ret.Items, nil
|
||||
return &ret.(*aadpodid.AzureIdentityList).Items, nil
|
||||
}
|
||||
|
||||
//ListPodIds - given a pod with pod name space
|
||||
func (c *Client) ListPodIds(podns, podname string) (*[]aadpodid.AzureIdentity, error) {
|
||||
var azAssignedIDList aadpodid.AzureAssignedIdentityList
|
||||
var matchedIds []aadpodid.AzureIdentity
|
||||
err := c.rest.Get().Namespace(v1.NamespaceAll).Resource("azureassignedidentities").Do().Into(&azAssignedIDList)
|
||||
azAssignedIDList, err := c.AssignedIDListWatch.List(v1.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
for _, v := range azAssignedIDList.Items {
|
||||
|
||||
var matchedIds []aadpodid.AzureIdentity
|
||||
for _, v := range azAssignedIDList.(*aadpodid.AzureAssignedIdentityList).Items {
|
||||
if v.Spec.Pod == podname && v.Spec.PodNamespace == podns {
|
||||
matchedIds = append(matchedIds, *v.Spec.AzureIdentityRef)
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
informersv1 "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
@ -49,8 +50,13 @@ func addPodHandler(i informersv1.PodInformer, eventCh chan aadpodid.EventType) {
|
|||
|
||||
},
|
||||
UpdateFunc: func(OldObj, newObj interface{}) {
|
||||
glog.V(6).Infof("Pod Updated")
|
||||
eventCh <- aadpodid.PodUpdated
|
||||
// We are only interested in updates to pod if the node changes.
|
||||
// Having this check will ensure that mic sync loop does not do extra work
|
||||
// for every pod update.
|
||||
if (OldObj.(*v1.Pod)).Spec.NodeName != (newObj.(*v1.Pod)).Spec.NodeName {
|
||||
glog.V(6).Infof("Pod Updated")
|
||||
eventCh <- aadpodid.PodUpdated
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
|
|
Загрузка…
Ссылка в новой задаче