зеркало из https://github.com/Azure/orkestra.git
Use a Workflow Status Thread Rather than Short Requeue (#382)
* Add tracking for previous applications * Add purging logic with list * Fix failing test case * Fix failing test case * Add new func test cases * Add get diff functionality to graph * Create a rollback new application workflow * Combine the purge of newer applications with rollback workflow * Create an executor interface for creating templates * Increase default workflow executor version * Use a workflow reconciler to update app group status * Add finalizer to catch deletion * Fix calling both rollback and reverse * Add rolledback status to json printer * Fix reverse workflow not working properly
This commit is contained in:
Родитель
e22ff10fdb
Коммит
363b8a8946
|
@ -33,16 +33,18 @@ const (
|
|||
DefaultSucceededRequeue = 5 * time.Minute
|
||||
|
||||
AppGroupNameKey = "appgroup"
|
||||
AppGroupFinalizer = "application-group-finalizer"
|
||||
AppGroupFinalizer = "orkestra.azure.microsoft.com/finalizer"
|
||||
|
||||
LastSuccessfulAnnotation = "orkestra.azure.microsoft.com/last-successful-applicationgroup"
|
||||
LastSuccessfulAnnotation = "orkestra.azure.microsoft.com/last-successful-appgroup"
|
||||
ParentChartAnnotation = "orkestra.azure.microsoft.com/parent-chart"
|
||||
|
||||
HeritageLabel = "orkestra.azure.microsoft.com/heritage"
|
||||
HeritageValue = "orkestra"
|
||||
|
||||
OwnershipLabel = "orkestra.azure.microsoft.com/owner"
|
||||
ChartLabel = "orkestra.azure.microsoft.com/chart"
|
||||
OwnershipLabel = "orkestra.azure.microsoft.com/owner"
|
||||
WorkflowTypeLabel = "orkestra.azure.microsoft.com/workflow-type"
|
||||
WorkflowAppGroupGenerationLabel = "orkestra.azure.microsoft.com/appgroup-generation"
|
||||
ChartLabel = "orkestra.azure.microsoft.com/chart"
|
||||
|
||||
ForwardWorkflow WorkflowType = "forward"
|
||||
ReverseWorkflow WorkflowType = "reverse"
|
||||
|
@ -305,6 +307,21 @@ func (in *Release) SetValues(values map[string]interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ReadyProgressing sets the meta.ReadyCondition to 'Unknown', with the given
|
||||
// meta.Progressing reason and message
|
||||
func (in *ApplicationGroup) ReadyProgressing() {
|
||||
in.Status.Conditions = []metav1.Condition{}
|
||||
meta.SetResourceCondition(in, meta.ReadyCondition, metav1.ConditionUnknown, meta.ProgressingReason, "workflow is reconciling...")
|
||||
}
|
||||
|
||||
// ReadyTerminating sets the meta.ReadyCondition to 'Unknown', with the given
|
||||
// meta.Progressing reason and message
|
||||
func (in *ApplicationGroup) ReadyTerminating() {
|
||||
in.Status.Conditions = []metav1.Condition{}
|
||||
in.Status.ObservedGeneration = in.Generation
|
||||
meta.SetResourceCondition(in, meta.ReadyCondition, metav1.ConditionFalse, meta.TerminatingReason, "application group is terminating...")
|
||||
}
|
||||
|
||||
// ReadySucceeded sets the meta.ReadyCondition to 'True', with the given
|
||||
// meta.Succeeded reason and message
|
||||
func (in *ApplicationGroup) ReadySucceeded() {
|
||||
|
@ -387,6 +404,7 @@ func (in *ApplicationGroup) SetLastSuccessful() {
|
|||
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status"
|
||||
// +kubebuilder:printcolumn:name="Reason",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].reason"
|
||||
// +kubebuilder:printcolumn:name="Message",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].message"
|
||||
// +kubebuilder:printcolumn:name="RolledBack",type="string",JSONPath=".status.conditions[?(@.type==\"RollbackWorkflowSucceeded\")].status"
|
||||
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
|
||||
|
||||
// ApplicationGroup is the Schema for the applicationgroups API
|
||||
|
|
|
@ -29,6 +29,9 @@ spec:
|
|||
- jsonPath: .status.conditions[?(@.type=="Ready")].message
|
||||
name: Message
|
||||
type: string
|
||||
- jsonPath: .status.conditions[?(@.type=="RollbackWorkflowSucceeded")].status
|
||||
name: RolledBack
|
||||
type: string
|
||||
- jsonPath: .metadata.creationTimestamp
|
||||
name: Age
|
||||
type: date
|
||||
|
|
|
@ -29,6 +29,9 @@ spec:
|
|||
- jsonPath: .status.conditions[?(@.type=="Ready")].message
|
||||
name: Message
|
||||
type: string
|
||||
- jsonPath: .status.conditions[?(@.type=="RollbackWorkflowSucceeded")].status
|
||||
name: RolledBack
|
||||
type: string
|
||||
- jsonPath: .metadata.creationTimestamp
|
||||
name: Age
|
||||
type: date
|
||||
|
|
|
@ -6,6 +6,26 @@ metadata:
|
|||
creationTimestamp: null
|
||||
name: manager-role
|
||||
rules:
|
||||
- apiGroups:
|
||||
- argoproj.io
|
||||
resources:
|
||||
- workflows
|
||||
verbs:
|
||||
- create
|
||||
- delete
|
||||
- get
|
||||
- list
|
||||
- patch
|
||||
- update
|
||||
- watch
|
||||
- apiGroups:
|
||||
- argoproj.io
|
||||
resources:
|
||||
- workflows/status
|
||||
verbs:
|
||||
- get
|
||||
- patch
|
||||
- update
|
||||
- apiGroups:
|
||||
- orkestra.azure.microsoft.com
|
||||
resources:
|
||||
|
|
|
@ -5,6 +5,8 @@ package controllers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/Azure/Orkestra/pkg/meta"
|
||||
|
||||
"github.com/Azure/Orkestra/pkg/helpers"
|
||||
|
||||
|
@ -71,11 +73,10 @@ func (r *ApplicationGroupReconciler) Reconcile(ctx context.Context, req ctrl.Req
|
|||
patch := client.MergeFrom(appGroup.DeepCopy())
|
||||
|
||||
statusHelper := &helpers.StatusHelper{
|
||||
Client: r.Client,
|
||||
Logger: logr,
|
||||
PatchFrom: patch,
|
||||
Recorder: r.Recorder,
|
||||
WorkflowClientBuilder: r.WorkflowClientBuilder,
|
||||
Client: r.Client,
|
||||
Logger: logr,
|
||||
PatchFrom: patch,
|
||||
Recorder: r.Recorder,
|
||||
}
|
||||
reconcileHelper := helpers.ReconcileHelper{
|
||||
Client: r.Client,
|
||||
|
@ -102,15 +103,17 @@ func (r *ApplicationGroupReconciler) Reconcile(ctx context.Context, req ctrl.Req
|
|||
|
||||
if !appGroup.DeletionTimestamp.IsZero() {
|
||||
statusHelper.MarkTerminating(appGroup)
|
||||
result, err := reconcileHelper.Reverse(ctx)
|
||||
if !result.Requeue && err == nil {
|
||||
// Remove the finalizer because we have finished reversing
|
||||
if err := reconcileHelper.Reverse(ctx); errors.Is(err, meta.ErrForwardWorkflowNotFound) {
|
||||
controllerutil.RemoveFinalizer(appGroup, v1alpha1.AppGroupFinalizer)
|
||||
if err := r.Patch(ctx, appGroup, patch); err != nil {
|
||||
return result, err
|
||||
logr.Error(err, "failed to patch the release to remove the appgroup finalizer")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
} else if err != nil {
|
||||
logr.Error(err, "failed to generate the reverse workflow")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
return result, err
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
// Add finalizer if it doesn't already exist
|
||||
if !controllerutil.ContainsFinalizer(appGroup, v1alpha1.AppGroupFinalizer) {
|
||||
|
@ -125,32 +128,14 @@ func (r *ApplicationGroupReconciler) Reconcile(ctx context.Context, req ctrl.Req
|
|||
// Only do this if we have successfully completed a rollback
|
||||
if appGroup.Generation != appGroup.Status.ObservedGeneration {
|
||||
// Change the app group spec into a progressing state
|
||||
if err := statusHelper.MarkProgressing(ctx, appGroup); err != nil {
|
||||
logr.Error(err, "failed to patch the status into a progressing state")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
statusHelper.MarkProgressing(appGroup)
|
||||
if err := reconcileHelper.CreateOrUpdate(ctx); err != nil {
|
||||
logr.Error(err, "failed to reconcile creating or updating the appgroup")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
appGroup.Status.ObservedGeneration = appGroup.Generation
|
||||
}
|
||||
|
||||
// Update the status based on the current state of the helm charts
|
||||
// and the status of the workflows
|
||||
result, err = statusHelper.UpdateStatus(ctx, appGroup)
|
||||
if err != nil {
|
||||
logr.Error(err, "failed to update the status of the app group")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if shouldRemediate, err := r.ShouldRemediate(ctx, appGroup); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
} else if shouldRemediate {
|
||||
if lastSuccessfulSpec := appGroup.GetLastSuccessful(); lastSuccessfulSpec != nil {
|
||||
return reconcileHelper.Rollback(ctx, patch)
|
||||
}
|
||||
return reconcileHelper.Reverse(ctx)
|
||||
}
|
||||
return result, nil
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *ApplicationGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
|
@ -159,13 +144,3 @@ func (r *ApplicationGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
WithEventFilter(predicate.GenerationChangedPredicate{}).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *ApplicationGroupReconciler) ShouldRemediate(ctx context.Context, instance *v1alpha1.ApplicationGroup) (bool, error) {
|
||||
forwardClient := r.WorkflowClientBuilder.Forward(instance).Build()
|
||||
|
||||
isFailed, err := workflow.IsFailed(ctx, forwardClient)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return isFailed && !r.DisableRemediation, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package controllers_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/Orkestra/api/v1alpha1"
|
||||
|
@ -60,7 +61,7 @@ var _ = Describe("ApplicationGroup Controller", func() {
|
|||
_ = k8sClient.DeleteAllOf(ctx, &v1alpha13.Workflow{}, client.InNamespace(name))
|
||||
})
|
||||
|
||||
It("Should create Bookinfo spec successfully", func() {
|
||||
It("Should create and delete Bookinfo spec successfully", func() {
|
||||
By("Applying the bookinfo object to the cluster")
|
||||
err := k8sClient.Create(ctx, appGroup)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
@ -103,7 +104,8 @@ var _ = Describe("ApplicationGroup Controller", func() {
|
|||
if err := k8sClient.List(ctx, helmReleases, client.InNamespace(name)); err != nil {
|
||||
return false
|
||||
}
|
||||
return len(helmReleases.Items) == 0
|
||||
getErr := k8sClient.Get(ctx, key, appGroup)
|
||||
return len(helmReleases.Items) == 0 && errors.IsNotFound(getErr)
|
||||
}, defaultTimeout, time.Second).Should(BeTrue())
|
||||
})
|
||||
|
||||
|
@ -409,18 +411,17 @@ var _ = Describe("ApplicationGroup Controller", func() {
|
|||
By("Deleting the application group and deleting the workflow")
|
||||
err = k8sClient.Delete(ctx, appGroup)
|
||||
Expect(err).To(BeNil())
|
||||
wf := &v1alpha13.Workflow{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: defaultNamespace,
|
||||
},
|
||||
}
|
||||
|
||||
wf := &v1alpha13.Workflow{}
|
||||
Eventually(func() error {
|
||||
return k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-reverse", name), Namespace: defaultNamespace}, wf)
|
||||
}, time.Second*20, time.Second).Should(BeNil())
|
||||
err = k8sClient.Delete(ctx, wf)
|
||||
Expect(err).To(BeNil())
|
||||
|
||||
Eventually(func() bool {
|
||||
return errors.IsNotFound(k8sClient.Get(ctx, key, appGroup))
|
||||
}, time.Second*30, time.Second).Should(BeTrue())
|
||||
}, time.Second*20, time.Second).Should(BeTrue())
|
||||
})
|
||||
|
||||
It("should succeed to remove the new helm releases on application rollback", func() {
|
||||
|
|
|
@ -119,6 +119,15 @@ var _ = BeforeSuite(func() {
|
|||
}).SetupWithManager(k8sManager)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
err = (&controllers.WorkflowStatusReconciler{
|
||||
Client: k8sManager.GetClient(),
|
||||
Log: baseLogger,
|
||||
Scheme: k8sManager.GetScheme(),
|
||||
WorkflowClientBuilder: workflowClientBuilder,
|
||||
Recorder: k8sManager.GetEventRecorderFor("appgroup-controller"),
|
||||
}).SetupWithManager(k8sManager)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
go func() {
|
||||
err = k8sManager.Start(ctrl.SetupSignalHandler())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
|
|
@ -0,0 +1,240 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/Azure/Orkestra/pkg/meta"
|
||||
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
|
||||
"github.com/Azure/Orkestra/pkg/helpers"
|
||||
|
||||
"github.com/Azure/Orkestra/api/v1alpha1"
|
||||
workflowpkg "github.com/Azure/Orkestra/pkg/workflow"
|
||||
"github.com/go-logr/logr"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// WorkflowStatusReconciler reconciles workflows and their status
|
||||
type WorkflowStatusReconciler struct {
|
||||
client.Client
|
||||
Log logr.Logger
|
||||
Scheme *runtime.Scheme
|
||||
WorkflowClientBuilder *workflowpkg.Builder
|
||||
|
||||
// Recorder generates kubernetes events
|
||||
Recorder record.EventRecorder
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=argoproj.io,resources=workflows/status,verbs=get;update;patch
|
||||
|
||||
func (r *WorkflowStatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
workflow := &v1alpha13.Workflow{}
|
||||
logr := r.Log.WithValues("workflow_name", req.NamespacedName.Name)
|
||||
|
||||
if err := r.Get(ctx, req.NamespacedName, workflow); err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
logr.V(3).Info("workflow not found in the cluster")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
logr.Error(err, "failed to fetch workflow instance")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if !workflow.DeletionTimestamp.IsZero() {
|
||||
workflowPatch := client.MergeFrom(workflow.DeepCopy())
|
||||
logr.V(2).Info("got delete event for the workflow")
|
||||
controllerutil.RemoveFinalizer(workflow, v1alpha1.AppGroupFinalizer)
|
||||
if err := r.Patch(ctx, workflow, workflowPatch); err != nil {
|
||||
logr.Error(err, "failed to remove the finalizer from the workflow object on deletion")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// If the parent is in a deleting state, then we need to check if this workflow deletion is the
|
||||
// reverse workflow for this application group. If it is, then we remove the finalizer
|
||||
parent, workflowType, err := r.getParentAndWorkflowType(ctx, workflow)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
logr.V(3).Info("parent application group not found in the cluster")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
logr = logr.WithValues("workflow_type", workflowType, "parent", parent.Name)
|
||||
if !parent.DeletionTimestamp.IsZero() && workflowType == v1alpha1.ReverseWorkflow {
|
||||
patch := client.MergeFrom(parent.DeepCopy())
|
||||
logr.V(2).Info("removing the finalizer from the parent due to us losing the reverse workflow")
|
||||
controllerutil.RemoveFinalizer(parent, v1alpha1.AppGroupFinalizer)
|
||||
if err := r.Patch(ctx, parent, patch); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
parent, workflowType, err := r.getParentAndWorkflowType(ctx, workflow)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
logr.V(3).Info("parent application group not found in the cluster")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
logr = logr.WithValues("workflow_type", workflowType, "parent", parent.Name)
|
||||
|
||||
patch := client.MergeFrom(parent.DeepCopy())
|
||||
statusHelper := &helpers.StatusHelper{
|
||||
Client: r.Client,
|
||||
Logger: logr,
|
||||
PatchFrom: patch,
|
||||
Recorder: r.Recorder,
|
||||
}
|
||||
reconcileHelper := helpers.ReconcileHelper{
|
||||
Client: r.Client,
|
||||
Logger: logr,
|
||||
Instance: parent,
|
||||
WorkflowClientBuilder: r.WorkflowClientBuilder,
|
||||
StatusHelper: statusHelper,
|
||||
}
|
||||
|
||||
workflowAppGeneration, err := getWorkflowAppGeneration(workflow)
|
||||
if err != nil {
|
||||
logr.Error(err, "failed to get the workflow app generation")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// If the parent generation doesn't match this workflow's generation
|
||||
// we can ignore updates to this workflow
|
||||
if parent.Generation != workflowAppGeneration {
|
||||
logr.V(3).Info("workflow generation does not match parent generation")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
if parent.Generation != parent.Status.ObservedGeneration {
|
||||
logr.V(3).Info("parent is still reconciling")
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
|
||||
// Remove the finalizer from the parent application group if we have just completed reversing
|
||||
if !parent.DeletionTimestamp.IsZero() && workflowType == v1alpha1.ReverseWorkflow &&
|
||||
(workflowpkg.ToConditionReason(workflow.Status.Phase) == meta.SucceededReason || workflowpkg.ToConditionReason(workflow.Status.Phase) == meta.FailedReason) {
|
||||
// Remove the finalizer because we have finished reversing
|
||||
controllerutil.RemoveFinalizer(parent, v1alpha1.AppGroupFinalizer)
|
||||
if err := r.Patch(ctx, parent, patch); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Update the status based on the current state of the helm charts
|
||||
// and the status of the workflows
|
||||
if err := statusHelper.UpdateStatus(ctx, parent, workflow, workflowType); err != nil {
|
||||
logr.Error(err, "failed to update the chart status of the app group")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if err := statusHelper.UpdateFromWorkflowStatus(parent, workflow, workflowType); err != nil {
|
||||
logr.Error(err, "failed to update the workflow status of the app group")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if err := statusHelper.PatchStatus(ctx, parent); err != nil {
|
||||
logr.Error(err, "failed to patch the status of the parent based on the workflow")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if workflowType == v1alpha1.ForwardWorkflow &&
|
||||
workflowpkg.ToConditionReason(workflow.Status.Phase) == meta.FailedReason {
|
||||
if lastSuccessfulSpec := parent.GetLastSuccessful(); lastSuccessfulSpec != nil {
|
||||
if err := reconcileHelper.Rollback(ctx); err != nil {
|
||||
logr.Error(err, "failed to generate the rollback workflow")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
} else {
|
||||
if err := reconcileHelper.Reverse(ctx); err != nil {
|
||||
logr.Error(err, "failed to generate the reverse workflow")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func getWorkflowAppGeneration(workflow *v1alpha13.Workflow) (int64, error) {
|
||||
workflowAppGroupGenerationStr, ok := workflow.GetLabels()[v1alpha1.WorkflowAppGroupGenerationLabel]
|
||||
if !ok {
|
||||
return -1, fmt.Errorf("workflow app gorup generation not found for the child workflow")
|
||||
}
|
||||
workflowAppGroupGeneration, err := strconv.ParseInt(workflowAppGroupGenerationStr, 10, 64)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("workflow app gorup generation not able to be parsed for child workflow")
|
||||
}
|
||||
return workflowAppGroupGeneration, nil
|
||||
}
|
||||
|
||||
func orkestraOwnedPredicate() predicate.Predicate {
|
||||
return predicate.Funcs{
|
||||
CreateFunc: func(e event.CreateEvent) bool {
|
||||
workflow := e.Object.(*v1alpha13.Workflow)
|
||||
return hasValidOrkestraLabels(workflow)
|
||||
},
|
||||
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||
workflow := e.ObjectNew.(*v1alpha13.Workflow)
|
||||
return hasValidOrkestraLabels(workflow)
|
||||
},
|
||||
DeleteFunc: func(e event.DeleteEvent) bool {
|
||||
workflow := e.Object.(*v1alpha13.Workflow)
|
||||
return hasValidOrkestraLabels(workflow)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func hasValidOrkestraLabels(workflow *v1alpha13.Workflow) bool {
|
||||
return workflow.GetLabels()[v1alpha1.OwnershipLabel] != "" &&
|
||||
workflow.GetLabels()[v1alpha1.WorkflowTypeLabel] != "" &&
|
||||
workflow.GetLabels()[v1alpha1.WorkflowAppGroupGenerationLabel] != ""
|
||||
}
|
||||
|
||||
func (r *WorkflowStatusReconciler) getParentApplicationGroup(ctx context.Context, workflow *v1alpha13.Workflow) (*v1alpha1.ApplicationGroup, error) {
|
||||
parent := &v1alpha1.ApplicationGroup{}
|
||||
if appGroupName, ok := workflow.GetLabels()[v1alpha1.OwnershipLabel]; ok {
|
||||
if err := r.Get(ctx, types.NamespacedName{Name: appGroupName}, parent); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return parent, nil
|
||||
}
|
||||
|
||||
func (r *WorkflowStatusReconciler) getParentAndWorkflowType(ctx context.Context, workflow *v1alpha13.Workflow) (*v1alpha1.ApplicationGroup, v1alpha1.WorkflowType, error) {
|
||||
parent := &v1alpha1.ApplicationGroup{}
|
||||
appGroupName, ok := workflow.GetLabels()[v1alpha1.OwnershipLabel]
|
||||
if !ok {
|
||||
err := fmt.Errorf("ownership label not found for the child workflow")
|
||||
return nil, "", err
|
||||
}
|
||||
if err := r.Get(ctx, types.NamespacedName{Name: appGroupName}, parent); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
workflowTypeStr, ok := workflow.GetLabels()[v1alpha1.WorkflowTypeLabel]
|
||||
workflowType := v1alpha1.WorkflowType(workflowTypeStr)
|
||||
if !ok {
|
||||
err := fmt.Errorf("workflow type label not found for the child workflow")
|
||||
return nil, "", err
|
||||
}
|
||||
return parent, workflowType, nil
|
||||
}
|
||||
|
||||
func (r *WorkflowStatusReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v1alpha13.Workflow{}).
|
||||
WithEventFilter(orkestraOwnedPredicate()).
|
||||
Complete(r)
|
||||
}
|
12
main.go
12
main.go
|
@ -174,6 +174,18 @@ func main() {
|
|||
setupLog.Error(err, "unable to create controller", "controller", "ApplicationGroup")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = (&controllers.WorkflowStatusReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Log: baseLogger,
|
||||
Scheme: mgr.GetScheme(),
|
||||
WorkflowClientBuilder: workflow.NewBuilder(mgr.GetClient(), baseLogger).WithStagingRepo(workflowHelmURL).WithParallelism(workflowParallelism).InNamespace(workflow.GetNamespace()),
|
||||
Recorder: mgr.GetEventRecorderFor("appgroup-controller"),
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
setupLog.Error(err, "unable to create controller", "controller", "WorkflowStatus")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// +kubebuilder:scaffold:builder
|
||||
|
||||
setupLog.Info("starting manager")
|
||||
|
|
|
@ -85,35 +85,42 @@ func NewForwardGraph(appGroup *v1alpha1.ApplicationGroup) *Graph {
|
|||
applicationNode.Tasks[application.Name] = NewTaskNode(&application)
|
||||
appValues := application.Spec.Release.GetValues()
|
||||
|
||||
// Iterate through the subchart nodes
|
||||
for _, subChart := range application.Spec.Subcharts {
|
||||
subChartVersion := appGroup.Status.Applications[i].Subcharts[subChart.Name].Version
|
||||
chartName := utils.GetSubchartName(application.Name, subChart.Name)
|
||||
// We need to know that the subcharts were staged in order to build this graph
|
||||
if len(appGroup.Status.Applications) > i {
|
||||
// Iterate through the subchart nodes
|
||||
for _, subChart := range application.Spec.Subcharts {
|
||||
subChartStatus, ok := appGroup.Status.Applications[i].Subcharts[subChart.Name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
subChartVersion := subChartStatus.Version
|
||||
chartName := utils.GetSubchartName(application.Name, subChart.Name)
|
||||
|
||||
// Get the sub-chart values and assign that ot the release
|
||||
values, _ := SubChartValues(subChart.Name, application.GetValues())
|
||||
release := application.Spec.Release.DeepCopy()
|
||||
release.Values = values
|
||||
// Get the sub-chart values and assign that ot the release
|
||||
values, _ := SubChartValues(subChart.Name, application.GetValues())
|
||||
release := application.Spec.Release.DeepCopy()
|
||||
release.Values = values
|
||||
|
||||
subChartNode := &TaskNode{
|
||||
Name: subChart.Name,
|
||||
ChartName: chartName,
|
||||
ChartVersion: subChartVersion,
|
||||
Release: release,
|
||||
Parent: application.Name,
|
||||
Dependencies: subChart.Dependencies,
|
||||
Executors: []executor.Executor{executor.DefaultForward{}},
|
||||
subChartNode := &TaskNode{
|
||||
Name: subChart.Name,
|
||||
ChartName: chartName,
|
||||
ChartVersion: subChartVersion,
|
||||
Release: release,
|
||||
Parent: application.Name,
|
||||
Dependencies: subChart.Dependencies,
|
||||
Executors: []executor.Executor{executor.DefaultForward{}},
|
||||
}
|
||||
|
||||
applicationNode.Tasks[subChart.Name] = subChartNode
|
||||
|
||||
// Disable the sub-chart dependencies in the values of the parent chart
|
||||
appValues[subChart.Name] = map[string]interface{}{
|
||||
"enabled": false,
|
||||
}
|
||||
|
||||
// Add the node to the set of parent node dependencies
|
||||
applicationNode.Tasks[application.Name].Dependencies = append(applicationNode.Tasks[application.Name].Dependencies, subChart.Name)
|
||||
}
|
||||
|
||||
applicationNode.Tasks[subChart.Name] = subChartNode
|
||||
|
||||
// Disable the sub-chart dependencies in the values of the parent chart
|
||||
appValues[subChart.Name] = map[string]interface{}{
|
||||
"enabled": false,
|
||||
}
|
||||
|
||||
// Add the node to the set of parent node dependencies
|
||||
applicationNode.Tasks[application.Name].Dependencies = append(applicationNode.Tasks[application.Name].Dependencies, subChart.Name)
|
||||
}
|
||||
_ = applicationNode.Tasks[application.Name].Release.SetValues(appValues)
|
||||
|
||||
|
|
|
@ -14,9 +14,7 @@ import (
|
|||
"github.com/go-logr/logr"
|
||||
"github.com/jinzhu/copier"
|
||||
"helm.sh/helm/v3/pkg/chart"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -59,7 +57,7 @@ func (helper *ReconcileHelper) CreateOrUpdate(ctx context.Context) error {
|
|||
return fmt.Errorf("failed to reconcile the applications with: %w", err)
|
||||
}
|
||||
// Generate the Workflow object to submit to Argo
|
||||
forwardClient := helper.WorkflowClientBuilder.Forward(helper.Instance).Build()
|
||||
forwardClient := helper.WorkflowClientBuilder.Build(v1alpha1.ForwardWorkflow, helper.Instance)
|
||||
if err := workflow.Run(ctx, forwardClient); err != nil {
|
||||
helper.StatusHelper.MarkWorkflowTemplateGenerationFailed(helper.Instance, err)
|
||||
return fmt.Errorf("failed to run forward workflow with: %w", err)
|
||||
|
@ -67,49 +65,37 @@ func (helper *ReconcileHelper) CreateOrUpdate(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (helper *ReconcileHelper) Rollback(ctx context.Context, patch client.Patch) (ctrl.Result, error) {
|
||||
func (helper *ReconcileHelper) Rollback(ctx context.Context) error {
|
||||
helper.Info("Rolling back to last successful application group spec")
|
||||
rollbackClient := helper.WorkflowClientBuilder.Rollback(helper.Instance).Build()
|
||||
rollbackClient := helper.WorkflowClientBuilder.Build(v1alpha1.RollbackWorkflow, helper.Instance)
|
||||
|
||||
// Re-running the workflow will not re-generate it since we check if we have already started it
|
||||
if err := workflow.Run(ctx, rollbackClient); err != nil {
|
||||
helper.Error(err, "failed to create the workflow for rollback")
|
||||
return ctrl.Result{}, err
|
||||
return err
|
||||
}
|
||||
if isSucceeded, err := workflow.IsSucceeded(ctx, rollbackClient); err != nil {
|
||||
helper.Error(err, "failed to validate if the workflow is succeeded")
|
||||
return ctrl.Result{}, err
|
||||
} else if isSucceeded {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return reconcile.Result{RequeueAfter: v1alpha1.DefaultProgressingRequeue}, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (helper *ReconcileHelper) Reverse(ctx context.Context) (ctrl.Result, error) {
|
||||
reverseClient := helper.WorkflowClientBuilder.Reverse(helper.Instance).Build()
|
||||
forwardClient := helper.WorkflowClientBuilder.Forward(helper.Instance).Build()
|
||||
func (helper *ReconcileHelper) Reverse(ctx context.Context) error {
|
||||
reverseClient := helper.WorkflowClientBuilder.Build(v1alpha1.ReverseWorkflow, helper.Instance)
|
||||
forwardClient := helper.WorkflowClientBuilder.Build(v1alpha1.ForwardWorkflow, helper.Instance)
|
||||
helper.Info("Reversing the workflow")
|
||||
|
||||
// Re-running the workflow will not re-generate it since we check if we have already started it
|
||||
if err := workflow.Run(ctx, reverseClient); errors.Is(err, meta.ErrForwardWorkflowNotFound) {
|
||||
// Forward workflow wasn't found so we just return
|
||||
return ctrl.Result{}, nil
|
||||
// Forward workflow wasn't found so we just return the error
|
||||
return err
|
||||
} else if err != nil {
|
||||
helper.Error(err, "failed to generate reverse workflow")
|
||||
// if generation of reverse workflow failed, delete the forward workflow and return
|
||||
if err := workflow.DeleteWorkflow(ctx, forwardClient); err != nil {
|
||||
helper.Error(err, "failed to delete workflow CRO")
|
||||
return ctrl.Result{}, err
|
||||
return err
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
return nil
|
||||
}
|
||||
if isSucceeded, err := workflow.IsSucceeded(ctx, reverseClient); err != nil {
|
||||
helper.Error(err, "failed to validate if the workflow is succeeded")
|
||||
return ctrl.Result{}, err
|
||||
} else if isSucceeded {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{Requeue: true, RequeueAfter: v1alpha1.DefaultProgressingRequeue}, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (helper *ReconcileHelper) reconcileApplications() error {
|
||||
|
|
|
@ -3,6 +3,7 @@ package helpers
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
|
||||
"github.com/Azure/Orkestra/api/v1alpha1"
|
||||
"github.com/Azure/Orkestra/pkg/meta"
|
||||
|
@ -11,66 +12,54 @@ import (
|
|||
"github.com/go-logr/logr"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
type StatusHelper struct {
|
||||
client.Client
|
||||
logr.Logger
|
||||
PatchFrom client.Patch
|
||||
WorkflowClientBuilder *workflow.Builder
|
||||
Recorder record.EventRecorder
|
||||
PatchFrom client.Patch
|
||||
Recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func (helper *StatusHelper) UpdateStatus(ctx context.Context, instance *v1alpha1.ApplicationGroup) (ctrl.Result, error) {
|
||||
chartConditionMap, subChartConditionMap, err := helper.marshallChartStatus(ctx, instance)
|
||||
func (helper *StatusHelper) UpdateStatus(ctx context.Context, parent *v1alpha1.ApplicationGroup, instance *v1alpha13.Workflow, wfType v1alpha1.WorkflowType) error {
|
||||
chartConditionMap, subChartConditionMap, err := helper.marshallChartStatus(ctx, parent)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, err
|
||||
return err
|
||||
}
|
||||
instance.Status.Applications = getAppStatus(instance, chartConditionMap, subChartConditionMap)
|
||||
|
||||
// update the workflow status
|
||||
result, err := helper.updateWorkflowStatus(ctx, instance)
|
||||
if err != nil {
|
||||
return result, err
|
||||
parent.Status.Applications = getAppStatus(parent, chartConditionMap, subChartConditionMap)
|
||||
if wfType == v1alpha1.ForwardWorkflow {
|
||||
if workflow.ToConditionReason(instance.Status.Phase) == meta.FailedReason {
|
||||
helper.Info("workflow rollout is in a failed state")
|
||||
helper.MarkFailed(parent, fmt.Errorf("workflow in failed state"))
|
||||
} else if workflow.ToConditionReason(instance.Status.Phase) == meta.SucceededReason {
|
||||
helper.Info("workflow rollout is in a succeeded state")
|
||||
return helper.MarkSucceeded(ctx, parent)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (helper *StatusHelper) updateWorkflowStatus(ctx context.Context, instance *v1alpha1.ApplicationGroup) (ctrl.Result, error) {
|
||||
forwardClient := helper.WorkflowClientBuilder.Forward(instance).Build()
|
||||
reverseClient := helper.WorkflowClientBuilder.Reverse(instance).Build()
|
||||
rollbackClient := helper.WorkflowClientBuilder.Rollback(instance).Build()
|
||||
|
||||
for _, wfClient := range []workflow.Client{forwardClient, reverseClient, rollbackClient} {
|
||||
if err := workflow.UpdateStatus(ctx, wfClient); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
func (helper *StatusHelper) UpdateFromWorkflowStatus(parent *v1alpha1.ApplicationGroup, instance *v1alpha13.Workflow, wfType v1alpha1.WorkflowType) error {
|
||||
switch workflow.ToConditionReason(instance.Status.Phase) {
|
||||
case meta.FailedReason:
|
||||
helper.Logger.Info("workflow node is in failed state")
|
||||
workflow.SetFailed(parent, wfType, "workflow node is in failed state")
|
||||
case meta.SucceededReason:
|
||||
helper.Logger.Info("workflow has succeeded")
|
||||
workflow.SetSucceeded(parent, wfType)
|
||||
default:
|
||||
helper.Logger.Info("workflow is still progressing")
|
||||
workflow.SetProgressing(parent, wfType)
|
||||
}
|
||||
if isFailed, err := workflow.IsFailed(ctx, forwardClient); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
} else if isFailed {
|
||||
// TODO: make this error come from the node itself
|
||||
helper.MarkFailed(instance, fmt.Errorf("workflow in failed state"))
|
||||
return ctrl.Result{RequeueAfter: v1alpha1.GetInterval(instance)}, nil
|
||||
}
|
||||
if isSucceeded, err := workflow.IsSucceeded(ctx, forwardClient); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
} else if isSucceeded {
|
||||
if err := helper.MarkSucceeded(ctx, instance); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: v1alpha1.GetInterval(instance)}, nil
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: v1alpha1.DefaultProgressingRequeue}, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (helper *StatusHelper) MarkSucceeded(ctx context.Context, instance *v1alpha1.ApplicationGroup) error {
|
||||
// Set the last successful annotation for rollback scenarios
|
||||
instanceCopy := instance.DeepCopy()
|
||||
instanceCopy.SetLastSuccessful()
|
||||
if err := helper.Patch(ctx, instanceCopy, helper.PatchFrom); err != nil {
|
||||
patch := client.MergeFrom(instance.DeepCopy())
|
||||
instance.SetLastSuccessful()
|
||||
if err := helper.Patch(ctx, instance, patch); err != nil {
|
||||
helper.V(1).Error(err, "failed to patch the application group annotations")
|
||||
return err
|
||||
}
|
||||
|
@ -84,18 +73,14 @@ func (helper *StatusHelper) MarkSucceeded(ctx context.Context, instance *v1alpha
|
|||
// MarkProgressing resets the conditions of the ApplicationGroup to
|
||||
// metav1.Condition of type meta.ReadyCondition with status 'Unknown' and
|
||||
// meta.StartingReason reason and message.
|
||||
func (helper *StatusHelper) MarkProgressing(ctx context.Context, instance *v1alpha1.ApplicationGroup) error {
|
||||
instance.Status.Conditions = []metav1.Condition{}
|
||||
instance.Status.ObservedGeneration = instance.Generation
|
||||
meta.SetResourceCondition(instance, meta.ReadyCondition, metav1.ConditionUnknown, meta.ProgressingReason, "workflow is reconciling...")
|
||||
|
||||
return helper.PatchStatus(ctx, instance)
|
||||
func (helper *StatusHelper) MarkProgressing(instance *v1alpha1.ApplicationGroup) {
|
||||
instance.ReadyProgressing()
|
||||
}
|
||||
|
||||
// MarkTerminating sets the meta.ReadyCondition to 'False', with the given
|
||||
// meta.Terminating reason and message
|
||||
func (helper *StatusHelper) MarkTerminating(instance *v1alpha1.ApplicationGroup) {
|
||||
meta.SetResourceCondition(instance, meta.ReadyCondition, metav1.ConditionFalse, meta.TerminatingReason, "application group is terminating...")
|
||||
instance.ReadyTerminating()
|
||||
}
|
||||
|
||||
// MarkFailed sets the meta.ReadyCondition to 'False', with a failed reason
|
||||
|
@ -156,9 +141,9 @@ func (helper *StatusHelper) marshallChartStatus(ctx context.Context, appGroup *v
|
|||
|
||||
for _, hr := range helmReleases.Items {
|
||||
parent := hr.Name
|
||||
if v, ok := hr.GetAnnotations()["orkestra/parent-chart"]; ok {
|
||||
if annotation, ok := hr.GetAnnotations()[v1alpha1.ParentChartAnnotation]; ok {
|
||||
// Use the parent charts name
|
||||
parent = v
|
||||
parent = annotation
|
||||
}
|
||||
|
||||
// Add the associated conditions for that helm chart to the helm chart condition
|
||||
|
|
|
@ -3,6 +3,10 @@ package workflow
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"strconv"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
"github.com/Azure/Orkestra/pkg/meta"
|
||||
|
@ -14,8 +18,6 @@ import (
|
|||
"github.com/go-logr/logr"
|
||||
)
|
||||
|
||||
type ClientType string
|
||||
|
||||
var _ = ForwardWorkflowClient{}
|
||||
var _ = ReverseWorkflowClient{}
|
||||
var _ = RollbackWorkflowClient{}
|
||||
|
@ -45,6 +47,9 @@ type Client interface {
|
|||
// GetClient returns the k8s client associated with the workflow
|
||||
GetClient() client.Client
|
||||
|
||||
// GetWorkflow returns the workflow associated with the client
|
||||
GetWorkflow() *v1alpha13.Workflow
|
||||
|
||||
// GetAppGroup returns the app group from the workflow client
|
||||
GetAppGroup() *v1alpha1.ApplicationGroup
|
||||
}
|
||||
|
@ -56,10 +61,9 @@ type ClientOptions struct {
|
|||
}
|
||||
|
||||
type Builder struct {
|
||||
client client.Client
|
||||
clientType v1alpha1.WorkflowType
|
||||
options ClientOptions
|
||||
logger logr.Logger
|
||||
client client.Client
|
||||
options ClientOptions
|
||||
logger logr.Logger
|
||||
|
||||
workflow *v1alpha13.Workflow
|
||||
appGroup *v1alpha1.ApplicationGroup
|
||||
|
@ -100,31 +104,13 @@ func NewBuilder(client client.Client, logger logr.Logger) *Builder {
|
|||
}
|
||||
}
|
||||
|
||||
func NewBuilderFromClient(client Client) *Builder {
|
||||
return &Builder{
|
||||
client: client.GetClient(),
|
||||
options: client.GetOptions(),
|
||||
appGroup: client.GetAppGroup(),
|
||||
logger: client.GetLogger(),
|
||||
func NewClientFromClient(client Client, clientType v1alpha1.WorkflowType) Client {
|
||||
builder := &Builder{
|
||||
client: client.GetClient(),
|
||||
options: client.GetOptions(),
|
||||
logger: client.GetLogger(),
|
||||
}
|
||||
}
|
||||
|
||||
func (builder *Builder) Forward(appGroup *v1alpha1.ApplicationGroup) *Builder {
|
||||
builder.clientType = v1alpha1.Forward
|
||||
builder.appGroup = appGroup
|
||||
return builder
|
||||
}
|
||||
|
||||
func (builder *Builder) Reverse(appGroup *v1alpha1.ApplicationGroup) *Builder {
|
||||
builder.clientType = v1alpha1.Reverse
|
||||
builder.appGroup = appGroup
|
||||
return builder
|
||||
}
|
||||
|
||||
func (builder *Builder) Rollback(appGroup *v1alpha1.ApplicationGroup) *Builder {
|
||||
builder.clientType = v1alpha1.Rollback
|
||||
builder.appGroup = appGroup
|
||||
return builder
|
||||
return builder.Build(clientType, client.GetAppGroup())
|
||||
}
|
||||
|
||||
func (builder *Builder) WithParallelism(numNodes int64) *Builder {
|
||||
|
@ -142,14 +128,14 @@ func (builder *Builder) InNamespace(namespace string) *Builder {
|
|||
return builder
|
||||
}
|
||||
|
||||
func (builder *Builder) Build() Client {
|
||||
switch builder.clientType {
|
||||
func (builder *Builder) Build(clientType v1alpha1.WorkflowType, appGroup *v1alpha1.ApplicationGroup) Client {
|
||||
switch clientType {
|
||||
case v1alpha1.Forward:
|
||||
forwardClient := &ForwardWorkflowClient{
|
||||
Client: builder.client,
|
||||
Logger: builder.logger,
|
||||
ClientOptions: builder.options,
|
||||
appGroup: builder.appGroup,
|
||||
appGroup: appGroup,
|
||||
}
|
||||
return forwardClient
|
||||
case v1alpha1.Reverse:
|
||||
|
@ -157,7 +143,7 @@ func (builder *Builder) Build() Client {
|
|||
Client: builder.client,
|
||||
Logger: builder.logger,
|
||||
ClientOptions: builder.options,
|
||||
appGroup: builder.appGroup,
|
||||
appGroup: appGroup,
|
||||
}
|
||||
return reverseClient
|
||||
default:
|
||||
|
@ -165,7 +151,7 @@ func (builder *Builder) Build() Client {
|
|||
Client: builder.client,
|
||||
Logger: builder.logger,
|
||||
ClientOptions: builder.options,
|
||||
appGroup: builder.appGroup,
|
||||
appGroup: appGroup,
|
||||
}
|
||||
return rollbackClient
|
||||
}
|
||||
|
@ -182,6 +168,30 @@ func Run(ctx context.Context, wfClient Client) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Submit calls the base submit function for the workflow client
|
||||
func Submit(ctx context.Context, wfClient Client) error {
|
||||
controllerutil.AddFinalizer(wfClient.GetWorkflow(), v1alpha1.AppGroupFinalizer)
|
||||
wfClient.GetWorkflow().GetLabels()[v1alpha1.OwnershipLabel] = wfClient.GetAppGroup().Name
|
||||
wfClient.GetWorkflow().GetLabels()[v1alpha1.WorkflowAppGroupGenerationLabel] = strconv.FormatInt(wfClient.GetAppGroup().Generation, 10)
|
||||
|
||||
if err := wfClient.GetClient().Create(ctx, wfClient.GetWorkflow()); !errors.IsAlreadyExists(err) && err != nil {
|
||||
return fmt.Errorf("failed to CREATE argo workflow object: %w", err)
|
||||
} else if errors.IsAlreadyExists(err) {
|
||||
// If the workflow needs an update, delete the previous workflow and apply the new one
|
||||
// Argo Workflow does not rerun the workflow on UPDATE, so instead we cleanup and re-apply
|
||||
if err := DeleteWorkflow(ctx, wfClient); err != nil {
|
||||
return fmt.Errorf("failed to DELETE argo workflow object: %w", err)
|
||||
}
|
||||
if err := controllerutil.SetControllerReference(wfClient.GetAppGroup(), wfClient.GetWorkflow(), wfClient.GetClient().Scheme()); err != nil {
|
||||
return fmt.Errorf("unable to set ApplicationGroup as owner of Argo Workflow: %w", err)
|
||||
}
|
||||
if err := wfClient.GetClient().Create(ctx, wfClient.GetWorkflow()); err != nil {
|
||||
return fmt.Errorf("failed to CREATE argo workflow object: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Suspend sets the suspend flag on the workflow associated with the workflow client
|
||||
// if the workflow still exists on the cluster
|
||||
func Suspend(ctx context.Context, wfClient Client) error {
|
||||
|
@ -196,7 +206,6 @@ func Suspend(ctx context.Context, wfClient Client) error {
|
|||
if workflow.Spec.Suspend == nil || !*workflow.Spec.Suspend {
|
||||
wfClient.GetLogger().Info("suspending the workflow")
|
||||
patch := client.MergeFrom(workflow.DeepCopy())
|
||||
|
||||
suspend := true
|
||||
workflow.Spec.Suspend = &suspend
|
||||
if err := wfClient.GetClient().Patch(ctx, workflow, patch); err != nil {
|
||||
|
@ -216,57 +225,32 @@ func GetWorkflow(ctx context.Context, wc Client) (*v1alpha13.Workflow, error) {
|
|||
// DeleteWorkflow removes the workflow from the api server associated with
|
||||
// the workflow client
|
||||
func DeleteWorkflow(ctx context.Context, wfClient Client) error {
|
||||
workflow, err := GetWorkflow(ctx, wfClient)
|
||||
if client.IgnoreNotFound(err) != nil {
|
||||
workflow := &v1alpha13.Workflow{}
|
||||
if err := wfClient.GetClient().Get(ctx, types.NamespacedName{Name: wfClient.GetName(), Namespace: wfClient.GetNamespace()}, workflow); err != nil {
|
||||
return err
|
||||
} else if err != nil {
|
||||
return nil
|
||||
}
|
||||
return wfClient.GetClient().Delete(ctx, workflow)
|
||||
}
|
||||
|
||||
// UpdateStatus updates the status of the owning appGroup with the workflow condition type
|
||||
// of the workflow client
|
||||
func UpdateStatus(ctx context.Context, wfClient Client) error {
|
||||
wf, err := GetWorkflow(ctx, wfClient)
|
||||
if client.IgnoreNotFound(err) != nil {
|
||||
return err
|
||||
} else if err != nil {
|
||||
// We just return and don't update if we don't find the workflow
|
||||
return nil
|
||||
}
|
||||
switch toConditionReason(wf.Status.Phase) {
|
||||
case meta.FailedReason:
|
||||
wfClient.GetLogger().Info("workflow node is in failed state")
|
||||
SetFailed(wfClient, "workflow node is in failed state")
|
||||
case meta.SucceededReason:
|
||||
wfClient.GetLogger().Info("workflow has succeeded")
|
||||
SetSucceeded(wfClient)
|
||||
default:
|
||||
wfClient.GetLogger().Info("workflow is still progressing")
|
||||
SetProgressing(wfClient)
|
||||
}
|
||||
return nil
|
||||
deletePropagation := metav1.DeletePropagationForeground
|
||||
return wfClient.GetClient().Delete(ctx, workflow, &client.DeleteOptions{PropagationPolicy: &deletePropagation})
|
||||
}
|
||||
|
||||
// SetProgressing sets one of the workflow conditions in the progressing state
|
||||
func SetProgressing(wfClient Client) {
|
||||
if condition, ok := v1alpha1.WorkflowConditionMap[wfClient.GetType()]; ok {
|
||||
meta.SetResourceCondition(wfClient.GetAppGroup(), condition, metav1.ConditionUnknown, meta.ProgressingReason, "workflow is progressing...")
|
||||
func SetProgressing(parent *v1alpha1.ApplicationGroup, wfType v1alpha1.WorkflowType) {
|
||||
if condition, ok := v1alpha1.WorkflowConditionMap[wfType]; ok {
|
||||
meta.SetResourceCondition(parent, condition, metav1.ConditionUnknown, meta.ProgressingReason, "workflow is progressing...")
|
||||
}
|
||||
}
|
||||
|
||||
// SetSucceeded sets one of the workflow conditions in the succeeded state
|
||||
func SetSucceeded(wfClient Client) {
|
||||
if condition, ok := v1alpha1.WorkflowConditionMap[wfClient.GetType()]; ok {
|
||||
meta.SetResourceCondition(wfClient.GetAppGroup(), condition, metav1.ConditionTrue, meta.SucceededReason, "workflow succeeded")
|
||||
func SetSucceeded(parent *v1alpha1.ApplicationGroup, wfType v1alpha1.WorkflowType) {
|
||||
if condition, ok := v1alpha1.WorkflowConditionMap[wfType]; ok {
|
||||
meta.SetResourceCondition(parent, condition, metav1.ConditionTrue, meta.SucceededReason, "workflow succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
// SetFailed sets one of the workflow conditions in the failed state
|
||||
func SetFailed(wfClient Client, message string) {
|
||||
if condition, ok := v1alpha1.WorkflowConditionMap[wfClient.GetType()]; ok {
|
||||
meta.SetResourceCondition(wfClient.GetAppGroup(), condition, metav1.ConditionFalse, meta.FailedReason, message)
|
||||
func SetFailed(parent *v1alpha1.ApplicationGroup, wfType v1alpha1.WorkflowType, message string) {
|
||||
if condition, ok := v1alpha1.WorkflowConditionMap[wfType]; ok {
|
||||
meta.SetResourceCondition(parent, condition, metav1.ConditionFalse, meta.FailedReason, message)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,7 +267,7 @@ func IsFailed(ctx context.Context, wfClient Client) (bool, error) {
|
|||
if client.IgnoreNotFound(err) != nil {
|
||||
return false, fmt.Errorf("failed to get workflow: %w", err)
|
||||
}
|
||||
if toConditionReason(wf.Status.Phase) == meta.FailedReason {
|
||||
if ToConditionReason(wf.Status.Phase) == meta.FailedReason {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
|
@ -295,13 +279,13 @@ func IsSucceeded(ctx context.Context, wfClient Client) (bool, error) {
|
|||
if client.IgnoreNotFound(err) != nil {
|
||||
return false, fmt.Errorf("failed to get workflow: %w", err)
|
||||
}
|
||||
if toConditionReason(wf.Status.Phase) == meta.SucceededReason {
|
||||
if ToConditionReason(wf.Status.Phase) == meta.SucceededReason {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func toConditionReason(nodePhase v1alpha13.WorkflowPhase) string {
|
||||
func ToConditionReason(nodePhase v1alpha13.WorkflowPhase) string {
|
||||
switch nodePhase {
|
||||
case v1alpha13.WorkflowFailed:
|
||||
return meta.FailedReason
|
||||
|
|
|
@ -3,6 +3,8 @@ package workflow
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
|
||||
"github.com/Azure/Orkestra/pkg/graph"
|
||||
"github.com/Azure/Orkestra/pkg/templates"
|
||||
|
||||
|
@ -43,14 +45,18 @@ func (wc *ForwardWorkflowClient) GetAppGroup() *v1alpha1.ApplicationGroup {
|
|||
return wc.appGroup
|
||||
}
|
||||
|
||||
func (wc *ForwardWorkflowClient) GetWorkflow() *v1alpha13.Workflow {
|
||||
return wc.workflow
|
||||
}
|
||||
|
||||
func (wc *ForwardWorkflowClient) Generate(ctx context.Context) error {
|
||||
if wc.appGroup == nil {
|
||||
return fmt.Errorf("applicationGroup object cannot be nil")
|
||||
}
|
||||
|
||||
// Suspend the rollback or reverse workflows if they are running
|
||||
reverseClient := NewBuilderFromClient(wc).Reverse(wc.appGroup).Build()
|
||||
rollbackClient := NewBuilderFromClient(wc).Rollback(wc.appGroup).Build()
|
||||
reverseClient := NewClientFromClient(wc, v1alpha1.ReverseWorkflow)
|
||||
rollbackClient := NewClientFromClient(wc, v1alpha1.RollbackWorkflow)
|
||||
if err := Suspend(ctx, reverseClient); err != nil {
|
||||
return fmt.Errorf("failed to suspend reverse workflow: %w", err)
|
||||
}
|
||||
|
@ -76,38 +82,15 @@ func (wc *ForwardWorkflowClient) Generate(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (wc *ForwardWorkflowClient) Submit(ctx context.Context) error {
|
||||
if wc.workflow == nil {
|
||||
return fmt.Errorf("workflow object cannot be nil")
|
||||
}
|
||||
if wc.appGroup == nil {
|
||||
return fmt.Errorf("applicationGroup object cannot be nil")
|
||||
}
|
||||
|
||||
if err := wc.createTargetNamespaces(ctx); err != nil {
|
||||
return fmt.Errorf("failed to create the target namespaces: %w", err)
|
||||
}
|
||||
|
||||
// Create the Workflow
|
||||
wc.workflow.Labels[v1alpha1.OwnershipLabel] = wc.appGroup.Name
|
||||
wc.workflow.Labels[v1alpha1.WorkflowTypeLabel] = string(v1alpha1.ForwardWorkflow)
|
||||
if err := controllerutil.SetControllerReference(wc.appGroup, wc.workflow, wc.Scheme()); err != nil {
|
||||
return fmt.Errorf("unable to set ApplicationGroup as owner of Argo Workflow: %w", err)
|
||||
}
|
||||
if err := wc.Create(ctx, wc.workflow); !errors.IsAlreadyExists(err) && err != nil {
|
||||
return fmt.Errorf("failed to CREATE argo workflow object: %w", err)
|
||||
} else if errors.IsAlreadyExists(err) {
|
||||
// If the workflow needs an update, delete the previous workflow and apply the new one
|
||||
// Argo Workflow does not rerun the workflow on UPDATE, so intead we cleanup and reapply
|
||||
if err := wc.Delete(ctx, wc.workflow); err != nil {
|
||||
return fmt.Errorf("failed to DELETE argo workflow object: %w", err)
|
||||
}
|
||||
if err := controllerutil.SetControllerReference(wc.appGroup, wc.workflow, wc.Scheme()); err != nil {
|
||||
return fmt.Errorf("unable to set ApplicationGroup as owner of Argo Workflow: %w", err)
|
||||
}
|
||||
// If the argo Workflow object is NotFound and not AlreadyExists on the cluster
|
||||
// create a new object and submit it to the cluster
|
||||
if err := wc.Create(ctx, wc.workflow); err != nil {
|
||||
return fmt.Errorf("failed to CREATE argo workflow object: %w", err)
|
||||
}
|
||||
if err := Submit(ctx, wc); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,15 +3,14 @@ package workflow
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/Azure/Orkestra/pkg/graph"
|
||||
"github.com/Azure/Orkestra/pkg/templates"
|
||||
|
||||
"github.com/Azure/Orkestra/api/v1alpha1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/Azure/Orkestra/pkg/graph"
|
||||
"github.com/Azure/Orkestra/pkg/meta"
|
||||
"github.com/Azure/Orkestra/pkg/templates"
|
||||
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
"github.com/go-logr/logr"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
)
|
||||
|
@ -32,6 +31,10 @@ func (wc *ReverseWorkflowClient) GetAppGroup() *v1alpha1.ApplicationGroup {
|
|||
return wc.appGroup
|
||||
}
|
||||
|
||||
func (wc *ReverseWorkflowClient) GetWorkflow() *v1alpha13.Workflow {
|
||||
return wc.workflow
|
||||
}
|
||||
|
||||
func (wc *ReverseWorkflowClient) GetOptions() ClientOptions {
|
||||
return wc.ClientOptions
|
||||
}
|
||||
|
@ -47,8 +50,8 @@ func (wc *ReverseWorkflowClient) GetNamespace() string {
|
|||
func (wc *ReverseWorkflowClient) Generate(ctx context.Context) error {
|
||||
var err error
|
||||
|
||||
forwardClient := NewBuilderFromClient(wc).Forward(wc.appGroup).Build()
|
||||
rollbackClient := NewBuilderFromClient(wc).Rollback(wc.appGroup).Build()
|
||||
forwardClient := NewClientFromClient(wc, v1alpha1.ForwardWorkflow)
|
||||
rollbackClient := NewClientFromClient(wc, v1alpha1.RollbackWorkflow)
|
||||
|
||||
if err := Suspend(ctx, forwardClient); err != nil {
|
||||
return fmt.Errorf("failed to suspend forward workflow: %w", err)
|
||||
|
@ -74,28 +77,19 @@ func (wc *ReverseWorkflowClient) Generate(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (wc *ReverseWorkflowClient) Submit(ctx context.Context) error {
|
||||
forwardClient := NewBuilderFromClient(wc).Forward(wc.appGroup).Build()
|
||||
forwardClient := NewClientFromClient(wc, v1alpha1.ForwardWorkflow)
|
||||
forwardWorkflow, err := GetWorkflow(ctx, forwardClient)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return meta.ErrForwardWorkflowNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
obj := &v1alpha13.Workflow{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: wc.workflow.Name,
|
||||
Namespace: wc.workflow.Namespace,
|
||||
},
|
||||
wc.workflow.Labels[v1alpha1.WorkflowTypeLabel] = string(v1alpha1.ReverseWorkflow)
|
||||
if err := controllerutil.SetControllerReference(forwardWorkflow, wc.workflow, wc.Scheme()); err != nil {
|
||||
return fmt.Errorf("unable to set forward workflow as owner of Argo reverse Workflow: %w", err)
|
||||
}
|
||||
if err := wc.Get(ctx, client.ObjectKeyFromObject(obj), obj); client.IgnoreNotFound(err) != nil {
|
||||
return fmt.Errorf("failed to GET workflow object with an unrecoverable error: %w", err)
|
||||
} else if err != nil {
|
||||
if err := controllerutil.SetControllerReference(forwardWorkflow, wc.workflow, wc.Scheme()); err != nil {
|
||||
return fmt.Errorf("unable to set forward workflow as owner of Argo reverse Workflow: %w", err)
|
||||
}
|
||||
// If the argo Workflow object is NotFound and not AlreadyExists on the cluster
|
||||
// create a new object and submit it to the cluster
|
||||
if err = wc.Create(ctx, wc.workflow); err != nil {
|
||||
return fmt.Errorf("failed to CREATE argo workflow object: %w", err)
|
||||
}
|
||||
if err := Submit(ctx, wc); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,13 +3,14 @@ package workflow
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
|
||||
"github.com/Azure/Orkestra/pkg/graph"
|
||||
"github.com/Azure/Orkestra/pkg/templates"
|
||||
|
||||
"github.com/Azure/Orkestra/api/v1alpha1"
|
||||
"github.com/Azure/Orkestra/pkg/meta"
|
||||
"github.com/go-logr/logr"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
)
|
||||
|
@ -42,6 +43,10 @@ func (wc *RollbackWorkflowClient) GetAppGroup() *v1alpha1.ApplicationGroup {
|
|||
return wc.appGroup
|
||||
}
|
||||
|
||||
func (wc *RollbackWorkflowClient) GetWorkflow() *v1alpha13.Workflow {
|
||||
return wc.workflow
|
||||
}
|
||||
|
||||
func (wc *RollbackWorkflowClient) Generate(ctx context.Context) error {
|
||||
if wc.appGroup == nil {
|
||||
return fmt.Errorf("applicationGroup object cannot be nil")
|
||||
|
@ -75,13 +80,12 @@ func (wc *RollbackWorkflowClient) Generate(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (wc *RollbackWorkflowClient) Submit(ctx context.Context) error {
|
||||
// Create the new workflow, only if there is not already a rollback workflow that has been created
|
||||
wc.workflow.Labels[v1alpha1.OwnershipLabel] = wc.appGroup.Name
|
||||
wc.workflow.Labels[v1alpha1.WorkflowTypeLabel] = string(v1alpha1.RollbackWorkflow)
|
||||
if err := controllerutil.SetControllerReference(wc.appGroup, wc.workflow, wc.Scheme()); err != nil {
|
||||
return fmt.Errorf("unable to set ApplicationGroup as owner of Argo Workflow: %w", err)
|
||||
}
|
||||
if err := wc.Create(ctx, wc.workflow); !errors.IsAlreadyExists(err) && err != nil {
|
||||
return fmt.Errorf("failed to CREATE argo workflow object: %w", err)
|
||||
if err := Submit(ctx, wc); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче