Merge pull request #117 from ryanzhang-oss/add-join-leave

feat: add work agent join/leave
This commit is contained in:
Fei Guo 2022-09-14 10:47:17 -07:00 коммит произвёл GitHub
Родитель ba21e65fff b69a93e116
Коммит 9f2434d147
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 247 добавлений и 44 удалений

Просмотреть файл

@ -102,10 +102,18 @@ func main() {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if err := controllers.Start(ctrl.SetupSignalHandler(), hubConfig, ctrl.GetConfigOrDie(), setupLog, opts); err != nil {
ctx := ctrl.SetupSignalHandler()
hubMgr, _, err := controllers.CreateControllers(ctx, hubConfig, ctrl.GetConfigOrDie(), setupLog, opts)
if err != nil {
setupLog.Error(err, "problem running controllers")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
klog.Info("starting hub manager")
defer klog.Info("shutting down hub manager")
if err := hubMgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running hub manager")
}
}
func getKubeConfig(hubkubeconfig string) (*restclient.Config, error) {

Просмотреть файл

@ -22,7 +22,7 @@ limitations under the License.
package v1alpha1
import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

Просмотреть файл

@ -48,7 +48,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo
}
}
if !resStillExist {
klog.V(5).InfoS("find an orphaned resource in the member cluster",
klog.V(2).InfoS("find an orphaned resource in the member cluster",
"parent resource", work.GetName(), "orphaned resource", resourceMeta.ResourceIdentifier)
staleRes = append(staleRes, resourceMeta)
}
@ -77,7 +77,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo
}
}
if !resRecorded {
klog.V(5).InfoS("discovered a new manifest resource",
klog.V(2).InfoS("discovered a new manifest resource",
"parent Work", work.GetName(), "manifest", manifestCond.Identifier)
obj, err := r.spokeDynamicClient.Resource(schema.GroupVersionResource{
Group: manifestCond.Identifier.Group,
@ -86,7 +86,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo
}).Namespace(manifestCond.Identifier.Namespace).Get(ctx, manifestCond.Identifier.Name, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
klog.V(4).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
klog.V(2).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
continue
case err != nil:
klog.ErrorS(err, "failed to retrieve the manifest", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
@ -134,7 +134,7 @@ func (r *ApplyWorkReconciler) deleteStaleManifest(ctx context.Context, staleMani
}
}
if !found {
klog.V(4).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner)
klog.V(2).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner)
continue
}
if len(newOwners) == 0 {

Просмотреть файл

@ -47,8 +47,6 @@ import (
const (
workFieldManagerName = "work-api-agent"
AnnotationLastAppliedManifest = "fleet.azure.com/last-applied-manifest"
)
// ApplyWorkReconciler reconciles a Work object
@ -59,10 +57,12 @@ type ApplyWorkReconciler struct {
restMapper meta.RESTMapper
recorder record.EventRecorder
concurrency int
workNameSpace string
joined bool
}
func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, joined bool) *ApplyWorkReconciler {
func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client,
restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, workNameSpace string) *ApplyWorkReconciler {
return &ApplyWorkReconciler{
client: hubClient,
spokeDynamicClient: spokeDynamicClient,
@ -70,7 +70,7 @@ func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.
restMapper: restMapper,
recorder: recorder,
concurrency: concurrency,
joined: joined,
workNameSpace: workNameSpace,
}
}
@ -85,7 +85,7 @@ type applyResult struct {
// Reconcile implement the control loop logic for Work object.
func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if !r.joined {
klog.V(3).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName)
klog.V(2).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
klog.InfoS("work apply controller reconcile loop triggered.", "work", req.NamespacedName)
@ -95,7 +95,7 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err := r.client.Get(ctx, req.NamespacedName, work)
switch {
case apierrors.IsNotFound(err):
klog.V(4).InfoS("the work resource is deleted", "work", req.NamespacedName)
klog.V(2).InfoS("the work resource is deleted", "work", req.NamespacedName)
return ctrl.Result{}, nil
case err != nil:
klog.ErrorS(err, "failed to retrieve the work", "work", req.NamespacedName)
@ -150,7 +150,7 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// we can't proceed to update the applied
return ctrl.Result{}, err
} else if len(staleRes) > 0 {
klog.V(3).InfoS("successfully garbage-collected all stale manifests", work.Kind, kLogObjRef, "number of GCed res", len(staleRes))
klog.V(2).InfoS("successfully garbage-collected all stale manifests", work.Kind, kLogObjRef, "number of GCed res", len(staleRes))
for _, res := range staleRes {
klog.V(5).InfoS("successfully garbage-collected a stale manifest", work.Kind, kLogObjRef, "res", res)
}
@ -187,7 +187,7 @@ func (r *ApplyWorkReconciler) garbageCollectAppliedWork(ctx context.Context, wor
err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy})
switch {
case apierrors.IsNotFound(err):
klog.V(4).InfoS("the appliedWork is already deleted", "appliedWork", work.Name)
klog.V(2).InfoS("the appliedWork is already deleted", "appliedWork", work.Name)
case err != nil:
klog.ErrorS(err, "failed to delete the appliedWork", "appliedWork", work.Name)
return ctrl.Result{}, err
@ -273,9 +273,9 @@ func (r *ApplyWorkReconciler) applyManifests(ctx context.Context, manifests []wo
if result.err == nil {
result.generation = appliedObj.GetGeneration()
if result.updated {
klog.V(4).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", kLogObjRef, "new ObservedGeneration", result.generation)
klog.V(2).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", kLogObjRef, "new ObservedGeneration", result.generation)
} else {
klog.V(5).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", kLogObjRef)
klog.V(2).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", kLogObjRef)
}
} else {
klog.ErrorS(result.err, "manifest upsert failed", "gvr", gvr, "manifest", kLogObjRef)
@ -322,7 +322,7 @@ func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema.
actual, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Create(
ctx, manifestObj, metav1.CreateOptions{FieldManager: workFieldManagerName})
if err == nil {
klog.V(4).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef)
klog.V(2).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef)
return actual, true, nil
}
return nil, false, err
@ -394,7 +394,7 @@ func (r *ApplyWorkReconciler) patchCurrentResource(ctx context.Context, gvr sche
klog.ErrorS(patchErr, "failed to patch the manifest", "gvr", gvr, "manifest", manifestRef)
return nil, false, patchErr
}
klog.V(3).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef)
klog.V(2).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef)
return manifestObj, true, nil
}
@ -426,6 +426,47 @@ func (r *ApplyWorkReconciler) generateWorkCondition(results []applyResult, work
return errs
}
// Join starts to reconcile
func (r *ApplyWorkReconciler) Join(ctx context.Context) error {
if !r.joined {
klog.InfoS("mark the apply work reconciler joined")
}
r.joined = true
return nil
}
// Leave start
func (r *ApplyWorkReconciler) Leave(ctx context.Context) error {
var works workv1alpha1.WorkList
if r.joined {
klog.InfoS("mark the apply work reconciler left")
}
r.joined = false
// list all the work object we created in the member cluster namespace
listOpts := []client.ListOption{
client.InNamespace(r.workNameSpace),
}
if err := r.client.List(ctx, &works, listOpts...); err != nil {
klog.ErrorS(err, "failed to list all the work object", "clusterNS", r.workNameSpace)
return client.IgnoreNotFound(err)
}
// we leave the resources on the member cluster for now
for _, work := range works.Items {
staleWork := work.DeepCopy()
if controllerutil.ContainsFinalizer(staleWork, workFinalizer) {
controllerutil.RemoveFinalizer(staleWork, workFinalizer)
if updateErr := r.client.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil {
klog.ErrorS(updateErr, "failed to remove the work finalizer from the work",
"clusterNS", r.workNameSpace, "work", klog.KObj(staleWork))
return updateErr
}
}
}
klog.V(2).InfoS("successfully removed all the work finalizers in the cluster namespace",
"clusterNS", r.workNameSpace, "number of work", len(works.Items))
return nil
}
// SetupWithManager wires up the controller.
func (r *ApplyWorkReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).

Просмотреть файл

@ -12,12 +12,14 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
)
// createWorkWithManifest creates a work given a manifest
func createWorkWithManifest(workNamespace string, manifest runtime.Object) *workv1alpha1.Work {
manifestCopy := manifest.DeepCopyObject()
newWork := workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work-" + utilrand.String(5),
@ -27,7 +29,7 @@ func createWorkWithManifest(workNamespace string, manifest runtime.Object) *work
Workload: workv1alpha1.WorkloadTemplate{
Manifests: []workv1alpha1.Manifest{
{
RawExtension: runtime.RawExtension{Object: manifest},
RawExtension: runtime.RawExtension{Object: manifestCopy},
},
},
},
@ -78,3 +80,16 @@ func waitForWorkToApply(workName, workNS string) *workv1alpha1.Work {
}, timeout, interval).Should(BeTrue())
return &resultWork
}
// waitForWorkToBeHandled waits for a work to have a finalizer
func waitForWorkToBeHandled(workName, workNS string) *workv1alpha1.Work {
var resultWork workv1alpha1.Work
Eventually(func() bool {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: workName, Namespace: workNS}, &resultWork)
if err != nil {
return false
}
return controllerutil.ContainsFinalizer(&resultWork, workFinalizer)
}, timeout, interval).Should(BeTrue())
return &resultWork
}

Просмотреть файл

@ -19,6 +19,7 @@ package controllers
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/go-cmp/cmp"
@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
)
@ -64,7 +66,7 @@ var _ = Describe("Work Controller", func() {
Expect(err).ToNot(HaveOccurred())
})
Context("Test work propagation", func() {
Context("Test single work propagation", func() {
It("Should have a configmap deployed correctly", func() {
cmName := "testcm"
cmNamespace := "default"
@ -225,7 +227,7 @@ var _ = Describe("Work Controller", func() {
rawCM, err := json.Marshal(cm)
Expect(err).Should(Succeed())
resultWork.Spec.Workload.Manifests[0].Raw = rawCM
Expect(k8sClient.Update(context.Background(), resultWork)).Should(Succeed())
Expect(k8sClient.Update(ctx, resultWork)).Should(Succeed())
By("wait for the change of the work to be applied")
waitForWorkToApply(work.GetName(), work.GetNamespace())
@ -518,4 +520,133 @@ var _ = Describe("Work Controller", func() {
Expect(cmp.Diff(resultWork.Status.ManifestConditions[0].Identifier, expectedResourceId)).Should(BeEmpty())
})
})
Context("Test multiple work propagation", func() {
var works []*workv1alpha1.Work
AfterEach(func() {
for _, staleWork := range works {
err := k8sClient.Delete(context.Background(), staleWork)
Expect(err).ToNot(HaveOccurred())
}
})
It("Test join and leave work correctly", func() {
By("create the works")
var configMap corev1.ConfigMap
cmNamespace := "default"
var cmNames []string
numWork := 10
data := map[string]string{
"test-key-1": "test-value-1",
"test-key-2": "test-value-2",
"test-key-3": "test-value-3",
}
for i := 0; i < numWork; i++ {
cmName := "testcm-" + utilrand.String(10)
cmNames = append(cmNames, cmName)
cm = &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: cmName,
Namespace: cmNamespace,
},
Data: data,
}
// make sure we can call join as many as possible
Expect(workController.Join(ctx)).Should(Succeed())
work = createWorkWithManifest(workNamespace, cm)
err := k8sClient.Create(ctx, work)
Expect(err).ToNot(HaveOccurred())
By(fmt.Sprintf("created the work = %s", work.GetName()))
works = append(works, work)
}
By("make sure the works are handled")
for i := 0; i < numWork; i++ {
waitForWorkToBeHandled(works[i].GetName(), works[i].GetNamespace())
}
By("mark the work controller as leave")
Expect(workController.Leave(ctx)).Should(Succeed())
By("make sure the manifests have no finalizer and its status match the member cluster")
newData := map[string]string{
"test-key-1": "test-value-1",
"test-key-2": "test-value-2",
"test-key-3": "test-value-3",
"new-test-key-1": "test-value-4",
"new-test-key-2": "test-value-5",
}
for i := 0; i < numWork; i++ {
var resultWork workv1alpha1.Work
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: works[i].GetName(), Namespace: workNamespace}, &resultWork)).Should(Succeed())
Expect(controllerutil.ContainsFinalizer(work, workFinalizer)).Should(BeFalse())
applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied)
if applyCond != nil && applyCond.Status == metav1.ConditionTrue && applyCond.ObservedGeneration == resultWork.Generation {
By("the work is applied, check if the applied config map is still there")
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed())
Expect(cmp.Diff(configMap.Data, data)).Should(BeEmpty())
} else {
By("the work is not applied, verify that the applied config map is not there")
err := k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)
Expect(apierrors.IsNotFound(err)).Should(BeTrue())
}
// make sure that leave can be called as many times as possible
Expect(workController.Leave(ctx)).Should(Succeed())
By(fmt.Sprintf("change the work = %s", work.GetName()))
cm = &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: cmNames[i],
Namespace: cmNamespace,
},
Data: newData,
}
rawCM, err := json.Marshal(cm)
Expect(err).Should(Succeed())
resultWork.Spec.Workload.Manifests[0].Raw = rawCM
Expect(k8sClient.Update(ctx, &resultWork)).Should(Succeed())
}
By("make sure the update in the work is not picked up")
Consistently(func() bool {
for i := 0; i < numWork; i++ {
By(fmt.Sprintf("updated the work = %s", works[i].GetName()))
var resultWork workv1alpha1.Work
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: works[i].GetName(), Namespace: workNamespace}, &resultWork)
Expect(err).Should(Succeed())
Expect(controllerutil.ContainsFinalizer(&resultWork, workFinalizer)).Should(BeFalse())
applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied)
if applyCond != nil && applyCond.Status == metav1.ConditionTrue && applyCond.ObservedGeneration == resultWork.Generation {
return false
}
By("check if the config map is not changed")
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed())
Expect(cmp.Diff(configMap.Data, data)).Should(BeEmpty())
}
return true
}, timeout, interval).Should(BeTrue())
By("enable the work controller again")
Expect(workController.Join(ctx)).Should(Succeed())
By("make sure the work change get picked up")
for i := 0; i < numWork; i++ {
resultWork := waitForWorkToApply(works[i].GetName(), works[i].GetNamespace())
Expect(len(resultWork.Status.ManifestConditions)).Should(Equal(1))
Expect(meta.IsStatusConditionTrue(resultWork.Status.ManifestConditions[0].Conditions, ConditionTypeApplied)).Should(BeTrue())
By("the work is applied, check if the applied config map is updated")
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed())
Expect(cmp.Diff(configMap.Data, newData)).Should(BeEmpty())
}
})
})
})

Просмотреть файл

@ -23,10 +23,10 @@ import (
"github.com/go-logr/logr"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
const (
@ -43,8 +43,8 @@ const (
maxWorkConcurrency = 5
)
// Start the controllers with the supplied config
func Start(ctx context.Context, hubCfg, spokeCfg *rest.Config, setupLog logr.Logger, opts ctrl.Options) error {
// CreateControllers create the controllers with the supplied config
func CreateControllers(ctx context.Context, hubCfg, spokeCfg *rest.Config, setupLog logr.Logger, opts ctrl.Options) (manager.Manager, *ApplyWorkReconciler, error) {
hubMgr, err := ctrl.NewManager(hubCfg, opts)
if err != nil {
setupLog.Error(err, "unable to create hub manager")
@ -71,24 +71,25 @@ func Start(ctx context.Context, hubCfg, spokeCfg *rest.Config, setupLog logr.Log
os.Exit(1)
}
if err = NewApplyWorkReconciler(
workController := NewApplyWorkReconciler(
hubMgr.GetClient(),
spokeDynamicClient,
spokeClient,
restMapper,
hubMgr.GetEventRecorderFor("work_controller"),
maxWorkConcurrency,
true,
).SetupWithManager(hubMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Work")
return err
opts.Namespace,
)
if err = workController.SetupWithManager(hubMgr); err != nil {
setupLog.Error(err, "unable to create the controller", "controller", "Work")
return nil, nil, err
}
klog.Info("starting hub manager")
defer klog.Info("shutting down hub manager")
if err := hubMgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running hub manager")
if err = workController.Join(ctx); err != nil {
setupLog.Error(err, "unable to mark the controller joined", "controller", "Work")
return nil, nil, err
}
return nil
return hubMgr, workController, nil
}

Просмотреть файл

@ -32,6 +32,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
)
@ -41,11 +42,12 @@ import (
var (
cfg *rest.Config
// TODO: Seperate k8sClient into hub and spoke
k8sClient client.Client
testEnv *envtest.Environment
setupLog = ctrl.Log.WithName("test")
ctx context.Context
cancel context.CancelFunc
k8sClient client.Client
testEnv *envtest.Environment
workController *ApplyWorkReconciler
setupLog = ctrl.Log.WithName("test")
ctx context.Context
cancel context.CancelFunc
)
func TestAPIs(t *testing.T) {
@ -73,18 +75,23 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
err = kruisev1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
opts := ctrl.Options{
Scheme: scheme.Scheme,
}
k8sClient, err = client.New(cfg, client.Options{
Scheme: scheme.Scheme,
})
Expect(err).NotTo(HaveOccurred())
By("start controllers")
var hubMgr manager.Manager
if hubMgr, workController, err = CreateControllers(ctx, cfg, cfg, setupLog, opts); err != nil {
setupLog.Error(err, "problem creating controllers")
os.Exit(1)
}
go func() {
opts := ctrl.Options{
Scheme: scheme.Scheme,
}
ctx, cancel = context.WithCancel(context.Background())
if err := Start(ctx, cfg, cfg, setupLog, opts); err != nil {
if err = hubMgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running controllers")
os.Exit(1)
}