diff --git a/cmd/workcontroller/workcontroller.go b/cmd/workcontroller/workcontroller.go index 81054ae..051e112 100644 --- a/cmd/workcontroller/workcontroller.go +++ b/cmd/workcontroller/workcontroller.go @@ -102,10 +102,18 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if err := controllers.Start(ctrl.SetupSignalHandler(), hubConfig, ctrl.GetConfigOrDie(), setupLog, opts); err != nil { + ctx := ctrl.SetupSignalHandler() + hubMgr, _, err := controllers.CreateControllers(ctx, hubConfig, ctrl.GetConfigOrDie(), setupLog, opts) + if err != nil { setupLog.Error(err, "problem running controllers") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } + + klog.Info("starting hub manager") + defer klog.Info("shutting down hub manager") + if err := hubMgr.Start(ctx); err != nil { + setupLog.Error(err, "problem running hub manager") + } } func getKubeConfig(hubkubeconfig string) (*restclient.Config, error) { diff --git a/pkg/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/v1alpha1/zz_generated.deepcopy.go index 515039b..56e2dff 100644 --- a/pkg/apis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/v1alpha1/zz_generated.deepcopy.go @@ -22,7 +22,7 @@ limitations under the License. package v1alpha1 import ( - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/pkg/controllers/applied_work_syncer.go b/pkg/controllers/applied_work_syncer.go index ff9d61a..416afae 100644 --- a/pkg/controllers/applied_work_syncer.go +++ b/pkg/controllers/applied_work_syncer.go @@ -48,7 +48,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo } } if !resStillExist { - klog.V(5).InfoS("find an orphaned resource in the member cluster", + klog.V(2).InfoS("find an orphaned resource in the member cluster", "parent resource", work.GetName(), "orphaned resource", resourceMeta.ResourceIdentifier) staleRes = append(staleRes, resourceMeta) } @@ -77,7 +77,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo } } if !resRecorded { - klog.V(5).InfoS("discovered a new manifest resource", + klog.V(2).InfoS("discovered a new manifest resource", "parent Work", work.GetName(), "manifest", manifestCond.Identifier) obj, err := r.spokeDynamicClient.Resource(schema.GroupVersionResource{ Group: manifestCond.Identifier.Group, @@ -86,7 +86,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo }).Namespace(manifestCond.Identifier.Namespace).Get(ctx, manifestCond.Identifier.Name, metav1.GetOptions{}) switch { case apierrors.IsNotFound(err): - klog.V(4).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier) + klog.V(2).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier) continue case err != nil: klog.ErrorS(err, "failed to retrieve the manifest", "parent Work", work.GetName(), "manifest", manifestCond.Identifier) @@ -134,7 +134,7 @@ func (r *ApplyWorkReconciler) deleteStaleManifest(ctx context.Context, staleMani } } if !found { - klog.V(4).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner) + klog.V(2).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner) continue } if len(newOwners) == 0 { diff --git a/pkg/controllers/apply_controller.go b/pkg/controllers/apply_controller.go index 2fc4881..b584fb3 100644 --- a/pkg/controllers/apply_controller.go +++ b/pkg/controllers/apply_controller.go @@ -47,8 +47,6 @@ import ( const ( workFieldManagerName = "work-api-agent" - - AnnotationLastAppliedManifest = "fleet.azure.com/last-applied-manifest" ) // ApplyWorkReconciler reconciles a Work object @@ -59,10 +57,12 @@ type ApplyWorkReconciler struct { restMapper meta.RESTMapper recorder record.EventRecorder concurrency int + workNameSpace string joined bool } -func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, joined bool) *ApplyWorkReconciler { +func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client, + restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, workNameSpace string) *ApplyWorkReconciler { return &ApplyWorkReconciler{ client: hubClient, spokeDynamicClient: spokeDynamicClient, @@ -70,7 +70,7 @@ func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic. restMapper: restMapper, recorder: recorder, concurrency: concurrency, - joined: joined, + workNameSpace: workNameSpace, } } @@ -85,7 +85,7 @@ type applyResult struct { // Reconcile implement the control loop logic for Work object. func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { if !r.joined { - klog.V(3).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName) + klog.V(2).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName) return ctrl.Result{RequeueAfter: time.Second * 5}, nil } klog.InfoS("work apply controller reconcile loop triggered.", "work", req.NamespacedName) @@ -95,7 +95,7 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err := r.client.Get(ctx, req.NamespacedName, work) switch { case apierrors.IsNotFound(err): - klog.V(4).InfoS("the work resource is deleted", "work", req.NamespacedName) + klog.V(2).InfoS("the work resource is deleted", "work", req.NamespacedName) return ctrl.Result{}, nil case err != nil: klog.ErrorS(err, "failed to retrieve the work", "work", req.NamespacedName) @@ -150,7 +150,7 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // we can't proceed to update the applied return ctrl.Result{}, err } else if len(staleRes) > 0 { - klog.V(3).InfoS("successfully garbage-collected all stale manifests", work.Kind, kLogObjRef, "number of GCed res", len(staleRes)) + klog.V(2).InfoS("successfully garbage-collected all stale manifests", work.Kind, kLogObjRef, "number of GCed res", len(staleRes)) for _, res := range staleRes { klog.V(5).InfoS("successfully garbage-collected a stale manifest", work.Kind, kLogObjRef, "res", res) } @@ -187,7 +187,7 @@ func (r *ApplyWorkReconciler) garbageCollectAppliedWork(ctx context.Context, wor err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy}) switch { case apierrors.IsNotFound(err): - klog.V(4).InfoS("the appliedWork is already deleted", "appliedWork", work.Name) + klog.V(2).InfoS("the appliedWork is already deleted", "appliedWork", work.Name) case err != nil: klog.ErrorS(err, "failed to delete the appliedWork", "appliedWork", work.Name) return ctrl.Result{}, err @@ -273,9 +273,9 @@ func (r *ApplyWorkReconciler) applyManifests(ctx context.Context, manifests []wo if result.err == nil { result.generation = appliedObj.GetGeneration() if result.updated { - klog.V(4).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", kLogObjRef, "new ObservedGeneration", result.generation) + klog.V(2).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", kLogObjRef, "new ObservedGeneration", result.generation) } else { - klog.V(5).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", kLogObjRef) + klog.V(2).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", kLogObjRef) } } else { klog.ErrorS(result.err, "manifest upsert failed", "gvr", gvr, "manifest", kLogObjRef) @@ -322,7 +322,7 @@ func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema. actual, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Create( ctx, manifestObj, metav1.CreateOptions{FieldManager: workFieldManagerName}) if err == nil { - klog.V(4).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef) + klog.V(2).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef) return actual, true, nil } return nil, false, err @@ -394,7 +394,7 @@ func (r *ApplyWorkReconciler) patchCurrentResource(ctx context.Context, gvr sche klog.ErrorS(patchErr, "failed to patch the manifest", "gvr", gvr, "manifest", manifestRef) return nil, false, patchErr } - klog.V(3).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef) + klog.V(2).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef) return manifestObj, true, nil } @@ -426,6 +426,47 @@ func (r *ApplyWorkReconciler) generateWorkCondition(results []applyResult, work return errs } +// Join starts to reconcile +func (r *ApplyWorkReconciler) Join(ctx context.Context) error { + if !r.joined { + klog.InfoS("mark the apply work reconciler joined") + } + r.joined = true + return nil +} + +// Leave start +func (r *ApplyWorkReconciler) Leave(ctx context.Context) error { + var works workv1alpha1.WorkList + if r.joined { + klog.InfoS("mark the apply work reconciler left") + } + r.joined = false + // list all the work object we created in the member cluster namespace + listOpts := []client.ListOption{ + client.InNamespace(r.workNameSpace), + } + if err := r.client.List(ctx, &works, listOpts...); err != nil { + klog.ErrorS(err, "failed to list all the work object", "clusterNS", r.workNameSpace) + return client.IgnoreNotFound(err) + } + // we leave the resources on the member cluster for now + for _, work := range works.Items { + staleWork := work.DeepCopy() + if controllerutil.ContainsFinalizer(staleWork, workFinalizer) { + controllerutil.RemoveFinalizer(staleWork, workFinalizer) + if updateErr := r.client.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil { + klog.ErrorS(updateErr, "failed to remove the work finalizer from the work", + "clusterNS", r.workNameSpace, "work", klog.KObj(staleWork)) + return updateErr + } + } + } + klog.V(2).InfoS("successfully removed all the work finalizers in the cluster namespace", + "clusterNS", r.workNameSpace, "number of work", len(works.Items)) + return nil +} + // SetupWithManager wires up the controller. func (r *ApplyWorkReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/pkg/controllers/apply_controller_helper_test.go b/pkg/controllers/apply_controller_helper_test.go index d62cc4e..e4aaf29 100644 --- a/pkg/controllers/apply_controller_helper_test.go +++ b/pkg/controllers/apply_controller_helper_test.go @@ -12,12 +12,14 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilrand "k8s.io/apimachinery/pkg/util/rand" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" ) // createWorkWithManifest creates a work given a manifest func createWorkWithManifest(workNamespace string, manifest runtime.Object) *workv1alpha1.Work { + manifestCopy := manifest.DeepCopyObject() newWork := workv1alpha1.Work{ ObjectMeta: metav1.ObjectMeta{ Name: "work-" + utilrand.String(5), @@ -27,7 +29,7 @@ func createWorkWithManifest(workNamespace string, manifest runtime.Object) *work Workload: workv1alpha1.WorkloadTemplate{ Manifests: []workv1alpha1.Manifest{ { - RawExtension: runtime.RawExtension{Object: manifest}, + RawExtension: runtime.RawExtension{Object: manifestCopy}, }, }, }, @@ -78,3 +80,16 @@ func waitForWorkToApply(workName, workNS string) *workv1alpha1.Work { }, timeout, interval).Should(BeTrue()) return &resultWork } + +// waitForWorkToBeHandled waits for a work to have a finalizer +func waitForWorkToBeHandled(workName, workNS string) *workv1alpha1.Work { + var resultWork workv1alpha1.Work + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: workName, Namespace: workNS}, &resultWork) + if err != nil { + return false + } + return controllerutil.ContainsFinalizer(&resultWork, workFinalizer) + }, timeout, interval).Should(BeTrue()) + return &resultWork +} diff --git a/pkg/controllers/apply_controller_integration_test.go b/pkg/controllers/apply_controller_integration_test.go index 46d5720..0e6ac4f 100644 --- a/pkg/controllers/apply_controller_integration_test.go +++ b/pkg/controllers/apply_controller_integration_test.go @@ -19,6 +19,7 @@ package controllers import ( "context" "encoding/json" + "fmt" "time" "github.com/google/go-cmp/cmp" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" ) @@ -64,7 +66,7 @@ var _ = Describe("Work Controller", func() { Expect(err).ToNot(HaveOccurred()) }) - Context("Test work propagation", func() { + Context("Test single work propagation", func() { It("Should have a configmap deployed correctly", func() { cmName := "testcm" cmNamespace := "default" @@ -225,7 +227,7 @@ var _ = Describe("Work Controller", func() { rawCM, err := json.Marshal(cm) Expect(err).Should(Succeed()) resultWork.Spec.Workload.Manifests[0].Raw = rawCM - Expect(k8sClient.Update(context.Background(), resultWork)).Should(Succeed()) + Expect(k8sClient.Update(ctx, resultWork)).Should(Succeed()) By("wait for the change of the work to be applied") waitForWorkToApply(work.GetName(), work.GetNamespace()) @@ -518,4 +520,133 @@ var _ = Describe("Work Controller", func() { Expect(cmp.Diff(resultWork.Status.ManifestConditions[0].Identifier, expectedResourceId)).Should(BeEmpty()) }) }) + + Context("Test multiple work propagation", func() { + var works []*workv1alpha1.Work + + AfterEach(func() { + for _, staleWork := range works { + err := k8sClient.Delete(context.Background(), staleWork) + Expect(err).ToNot(HaveOccurred()) + } + }) + + It("Test join and leave work correctly", func() { + By("create the works") + var configMap corev1.ConfigMap + cmNamespace := "default" + var cmNames []string + numWork := 10 + data := map[string]string{ + "test-key-1": "test-value-1", + "test-key-2": "test-value-2", + "test-key-3": "test-value-3", + } + + for i := 0; i < numWork; i++ { + cmName := "testcm-" + utilrand.String(10) + cmNames = append(cmNames, cmName) + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: cmNamespace, + }, + Data: data, + } + // make sure we can call join as many as possible + Expect(workController.Join(ctx)).Should(Succeed()) + work = createWorkWithManifest(workNamespace, cm) + err := k8sClient.Create(ctx, work) + Expect(err).ToNot(HaveOccurred()) + By(fmt.Sprintf("created the work = %s", work.GetName())) + works = append(works, work) + } + + By("make sure the works are handled") + for i := 0; i < numWork; i++ { + waitForWorkToBeHandled(works[i].GetName(), works[i].GetNamespace()) + } + + By("mark the work controller as leave") + Expect(workController.Leave(ctx)).Should(Succeed()) + + By("make sure the manifests have no finalizer and its status match the member cluster") + newData := map[string]string{ + "test-key-1": "test-value-1", + "test-key-2": "test-value-2", + "test-key-3": "test-value-3", + "new-test-key-1": "test-value-4", + "new-test-key-2": "test-value-5", + } + for i := 0; i < numWork; i++ { + var resultWork workv1alpha1.Work + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: works[i].GetName(), Namespace: workNamespace}, &resultWork)).Should(Succeed()) + Expect(controllerutil.ContainsFinalizer(work, workFinalizer)).Should(BeFalse()) + applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied) + if applyCond != nil && applyCond.Status == metav1.ConditionTrue && applyCond.ObservedGeneration == resultWork.Generation { + By("the work is applied, check if the applied config map is still there") + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(cmp.Diff(configMap.Data, data)).Should(BeEmpty()) + } else { + By("the work is not applied, verify that the applied config map is not there") + err := k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap) + Expect(apierrors.IsNotFound(err)).Should(BeTrue()) + } + // make sure that leave can be called as many times as possible + Expect(workController.Leave(ctx)).Should(Succeed()) + By(fmt.Sprintf("change the work = %s", work.GetName())) + cm = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cmNames[i], + Namespace: cmNamespace, + }, + Data: newData, + } + rawCM, err := json.Marshal(cm) + Expect(err).Should(Succeed()) + resultWork.Spec.Workload.Manifests[0].Raw = rawCM + Expect(k8sClient.Update(ctx, &resultWork)).Should(Succeed()) + } + + By("make sure the update in the work is not picked up") + Consistently(func() bool { + for i := 0; i < numWork; i++ { + By(fmt.Sprintf("updated the work = %s", works[i].GetName())) + var resultWork workv1alpha1.Work + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: works[i].GetName(), Namespace: workNamespace}, &resultWork) + Expect(err).Should(Succeed()) + Expect(controllerutil.ContainsFinalizer(&resultWork, workFinalizer)).Should(BeFalse()) + applyCond := meta.FindStatusCondition(resultWork.Status.Conditions, ConditionTypeApplied) + if applyCond != nil && applyCond.Status == metav1.ConditionTrue && applyCond.ObservedGeneration == resultWork.Generation { + return false + } + By("check if the config map is not changed") + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(cmp.Diff(configMap.Data, data)).Should(BeEmpty()) + } + return true + }, timeout, interval).Should(BeTrue()) + + By("enable the work controller again") + Expect(workController.Join(ctx)).Should(Succeed()) + + By("make sure the work change get picked up") + for i := 0; i < numWork; i++ { + resultWork := waitForWorkToApply(works[i].GetName(), works[i].GetNamespace()) + Expect(len(resultWork.Status.ManifestConditions)).Should(Equal(1)) + Expect(meta.IsStatusConditionTrue(resultWork.Status.ManifestConditions[0].Conditions, ConditionTypeApplied)).Should(BeTrue()) + By("the work is applied, check if the applied config map is updated") + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmNames[i], Namespace: cmNamespace}, &configMap)).Should(Succeed()) + Expect(cmp.Diff(configMap.Data, newData)).Should(BeEmpty()) + } + }) + }) }) diff --git a/pkg/controllers/manager.go b/pkg/controllers/manager.go index b6c0933..067471b 100644 --- a/pkg/controllers/manager.go +++ b/pkg/controllers/manager.go @@ -23,10 +23,10 @@ import ( "github.com/go-logr/logr" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" - "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/manager" ) const ( @@ -43,8 +43,8 @@ const ( maxWorkConcurrency = 5 ) -// Start the controllers with the supplied config -func Start(ctx context.Context, hubCfg, spokeCfg *rest.Config, setupLog logr.Logger, opts ctrl.Options) error { +// CreateControllers create the controllers with the supplied config +func CreateControllers(ctx context.Context, hubCfg, spokeCfg *rest.Config, setupLog logr.Logger, opts ctrl.Options) (manager.Manager, *ApplyWorkReconciler, error) { hubMgr, err := ctrl.NewManager(hubCfg, opts) if err != nil { setupLog.Error(err, "unable to create hub manager") @@ -71,24 +71,25 @@ func Start(ctx context.Context, hubCfg, spokeCfg *rest.Config, setupLog logr.Log os.Exit(1) } - if err = NewApplyWorkReconciler( + workController := NewApplyWorkReconciler( hubMgr.GetClient(), spokeDynamicClient, spokeClient, restMapper, hubMgr.GetEventRecorderFor("work_controller"), maxWorkConcurrency, - true, - ).SetupWithManager(hubMgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "Work") - return err + opts.Namespace, + ) + + if err = workController.SetupWithManager(hubMgr); err != nil { + setupLog.Error(err, "unable to create the controller", "controller", "Work") + return nil, nil, err } - klog.Info("starting hub manager") - defer klog.Info("shutting down hub manager") - if err := hubMgr.Start(ctx); err != nil { - setupLog.Error(err, "problem running hub manager") + if err = workController.Join(ctx); err != nil { + setupLog.Error(err, "unable to mark the controller joined", "controller", "Work") + return nil, nil, err } - return nil + return hubMgr, workController, nil } diff --git a/pkg/controllers/suite_test.go b/pkg/controllers/suite_test.go index 2a79b0d..eb06b95 100644 --- a/pkg/controllers/suite_test.go +++ b/pkg/controllers/suite_test.go @@ -32,6 +32,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/manager" workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" ) @@ -41,11 +42,12 @@ import ( var ( cfg *rest.Config // TODO: Seperate k8sClient into hub and spoke - k8sClient client.Client - testEnv *envtest.Environment - setupLog = ctrl.Log.WithName("test") - ctx context.Context - cancel context.CancelFunc + k8sClient client.Client + testEnv *envtest.Environment + workController *ApplyWorkReconciler + setupLog = ctrl.Log.WithName("test") + ctx context.Context + cancel context.CancelFunc ) func TestAPIs(t *testing.T) { @@ -73,18 +75,23 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) err = kruisev1alpha1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) - + opts := ctrl.Options{ + Scheme: scheme.Scheme, + } k8sClient, err = client.New(cfg, client.Options{ Scheme: scheme.Scheme, }) Expect(err).NotTo(HaveOccurred()) + By("start controllers") + var hubMgr manager.Manager + if hubMgr, workController, err = CreateControllers(ctx, cfg, cfg, setupLog, opts); err != nil { + setupLog.Error(err, "problem creating controllers") + os.Exit(1) + } go func() { - opts := ctrl.Options{ - Scheme: scheme.Scheme, - } ctx, cancel = context.WithCancel(context.Background()) - if err := Start(ctx, cfg, cfg, setupLog, opts); err != nil { + if err = hubMgr.Start(ctx); err != nil { setupLog.Error(err, "problem running controllers") os.Exit(1) }