DAG workflow reversal on err or delete (#103)

Closes #93

Revert the DAG workflow by deleting the HelmReleases in the reverse
order while honoring the dependency order.

Signed-off-by: nitishm <nitishm@microsoft.com>

Co-authored-by: nitishm <nitishm@microsoft.com>
This commit is contained in:
Nitish Malhotra 2021-03-17 10:32:11 -07:00 коммит произвёл GitHub
Родитель 681d02e127
Коммит 5e2620c0a6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 209 добавлений и 38 удалений

Просмотреть файл

@ -102,12 +102,24 @@ func (r *ApplicationGroupReconciler) Reconcile(req ctrl.Request) (ctrl.Result, e
if !appGroup.DeletionTimestamp.IsZero() {
// If finalizer is found, remove it and requeue
if appGroup.Finalizers != nil {
defer func() {
appGroup.Finalizers = nil
_ = r.Update(ctx, &appGroup)
}()
logr.Info("cleaning up the applicationgroup resource")
// TODO: Take remediation action
// Reverse the entire workflow to remove all the Helm Releases
appGroup.Finalizers = nil
_ = r.Update(ctx, &appGroup)
return ctrl.Result{Requeue: true}, nil
r.lastSuccessfulApplicationGroup = nil
if _, ok := appGroup.Annotations[lastSuccessfulApplicationGroupKey]; ok {
appGroup.Annotations[lastSuccessfulApplicationGroupKey] = ""
}
requeue = false
err = r.cleanupWorkflow(ctx, logr, appGroup)
if err != nil {
logr.Error(err, "failed to clean up workflow")
return r.handleResponseAndEvent(ctx, logr, appGroup, requeue, err)
}
}
// Do nothing
return ctrl.Result{Requeue: false}, nil
@ -290,10 +302,6 @@ func (r *ApplicationGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ApplicationGroupReconciler) handleResponseAndEvent(ctx context.Context, logr logr.Logger, grp orkestrav1alpha1.ApplicationGroup, requeue bool, err error) (ctrl.Result, error) {
var errStr string
if err != nil {
errStr = err.Error()
}
grp.Status.Error = errStr
_ = r.Status().Update(ctx, &grp)
@ -309,14 +317,14 @@ func (r *ApplicationGroupReconciler) handleResponseAndEvent(ctx context.Context,
}
if errStr != "" {
r.Recorder.Event(&grp, "Warning", "ReconcileError", fmt.Sprintf("Failed to reconcile ApplicationGroup %s with Error %s", grp.Name, errStr))
r.Recorder.Event(&grp, "Warning", "ReconcileError", fmt.Sprintf("Failed to reconcile ApplicationGroup %s with Error : %s", grp.Name, errStr))
}
if err != nil {
return r.handleRemediation(ctx, logr, grp, err)
}
if !requeue {
return reconcile.Result{Requeue: true, RequeueAfter: requeueAfter * 6}, err
return reconcile.Result{Requeue: true, RequeueAfter: requeueAfter * 3}, err
}
return reconcile.Result{Requeue: requeue}, err
}
@ -370,6 +378,7 @@ func initApplications(appGroup *orkestrav1alpha1.ApplicationGroup) {
}
func (r *ApplicationGroupReconciler) handleRemediation(ctx context.Context, logr logr.Logger, g orkestrav1alpha1.ApplicationGroup, err error) (ctrl.Result, error) {
// Rollback to previous successful spec
if r.lastSuccessfulApplicationGroup != nil {
if errors.Is(err, ErrHelmReleaseInFailureStatus) {
// Delete the HelmRelease(s) - parent and subchart(s)
@ -404,7 +413,14 @@ func (r *ApplicationGroupReconciler) handleRemediation(ctx context.Context, logr
}
}
}
return reconcile.Result{Requeue: false}, err
}
// Reverse and cleanup the workflow and associated helmreleases
err2 := r.cleanupWorkflow(ctx, logr, g)
if err2 != nil {
err = fmt.Errorf("failed to clean up workflow : %w", err2)
}
return reconcile.Result{Requeue: false}, err
}
@ -431,3 +447,47 @@ func (r *ApplicationGroupReconciler) rollbackFailedHelmReleases(ctx context.Cont
}
return nil
}
func (r *ApplicationGroupReconciler) cleanupWorkflow(ctx context.Context, logr logr.Logger, g orkestrav1alpha1.ApplicationGroup) error {
nodes := make(map[string]v1alpha12.NodeStatus)
wfs := v1alpha12.WorkflowList{}
listOption := client.MatchingLabels{
workflow.OwnershipLabel: g.Name,
workflow.HeritageLabel: workflow.Project,
}
err := r.List(ctx, &wfs, listOption)
if err != nil {
return err
}
if wfs.Items.Len() != 0 {
logr.Info("Reversing the workflow")
wf := wfs.Items[0]
for _, node := range wf.Status.Nodes {
nodes[node.ID] = node
}
graph, err := workflow.Build(g.Name, nodes)
if err != nil {
logr.Error(err, "failed to build the wf status DAG")
return err
}
rev := graph.Reverse()
for _, bucket := range rev {
for _, hr := range bucket {
// HACK - to be fixed
_ = r.Client.Delete(ctx, &hr)
time.Sleep(time.Second * 2)
// TODO (nitishm) Use the helm package to delete the release and wait for it to be cleaned up
}
}
err = r.Client.Delete(ctx, &wf)
if err != nil {
return err
}
}
return nil
}

Просмотреть файл

@ -4,30 +4,31 @@ metadata:
name: dev
spec:
applications:
- name: kafka-dev
dependencies:
- redis-dev
spec:
namespace: "orkestra"
repo: bitnami
groupID: "dev"
subcharts:
# subchart ordering
- name: zookeeper
dependencies: []
# HelmRelease spec fields
# https://docs.fluxcd.io/projects/helm-operator/en/1.0.0-rc9/references/helmrelease-custom-resource.html#helmrelease-custom-resource
chart:
repository: "https://charts.bitnami.com/bitnami"
name: kafka
version: 12.4.1
overlays:
global:
imagePullSecrets: []
zookeerper:
enabled: true
targetNamespace: "kafka-dev-ns"
forceUpgrade: true
# ERROR: Kafka fails on kind cluster causing the workflow to stall
# - name: kafka-dev
# dependencies:
# - redis-dev
# spec:
# namespace: "orkestra"
# repo: bitnami
# groupID: "dev"
# subcharts:
# # subchart ordering
# - name: zookeeper
# dependencies: []
# # HelmRelease spec fields
# # https://docs.fluxcd.io/projects/helm-operator/en/1.0.0-rc9/references/helmrelease-custom-resource.html#helmrelease-custom-resource
# chart:
# repository: "https://charts.bitnami.com/bitnami"
# name: kafka
# version: 12.4.1
# overlays:
# global:
# imagePullSecrets: []
# zookeerper:
# enabled: true
# targetNamespace: "kafka-dev-ns"
# forceUpgrade: true
- name: redis-dev
spec:
namespace: "orkestra"

Просмотреть файл

@ -313,6 +313,7 @@ func (a *argo) generateAppDAGTemplates(ctx context.Context, g *v1alpha1.Applicat
Spec: app.DeepCopy().Spec.HelmReleaseSpec,
}
hr.Spec.Wait = boolToBoolPtr(true)
hr.Spec.ReleaseName = convertToDNS1123(app.Name)
hr.Labels = map[string]string{
@ -406,6 +407,7 @@ func (a *argo) generateSubchartAndAppDAGTasks(ctx context.Context, g *v1alpha1.A
}
hr.Spec.ReleaseName = convertToDNS1123(app.Name)
hr.Spec.Wait = boolToBoolPtr(true)
hr.Labels = map[string]string{
ChartLabelKey: app.Name,
@ -478,10 +480,6 @@ func defaultExecutor() v1alpha12.Template {
}
}
func strToStrPtr(s string) *string {
return &s
}
func hrToYAML(hr helmopv1.HelmRelease) string {
b, err := yaml.Marshal(hr)
if err != nil {
@ -502,6 +500,7 @@ func generateSubchartHelmRelease(a helmopv1.HelmReleaseSpec, appName, scName, ve
Namespace: targetNS,
},
Spec: helmopv1.HelmReleaseSpec{
Wait: boolToBoolPtr(true),
ReleaseName: convertToDNS1123(scName),
ChartSource: helmopv1.ChartSource{
RepoChartSource: &helmopv1.RepoChartSource{},
@ -563,3 +562,11 @@ func convertSliceToDNS1123(in []string) []string {
}
return out
}
func boolToBoolPtr(in bool) *bool {
return &in
}
func strToStrPtr(in string) *string {
return &in
}

103
pkg/workflow/graph.go Normal file
Просмотреть файл

@ -0,0 +1,103 @@
package workflow
import (
"bytes"
"fmt"
v1alpha12 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
helmopv1 "github.com/fluxcd/helm-operator/pkg/apis/helm.fluxcd.io/v1"
k8Yaml "k8s.io/apimachinery/pkg/util/yaml"
)
type Graph struct {
nodes map[string]v1alpha12.NodeStatus
releases map[int][]helmopv1.HelmRelease
maxLevel int
}
type Node struct {
Status v1alpha12.NodeStatus
Level int
}
func Build(entry string, nodes map[string]v1alpha12.NodeStatus) (*Graph, error) {
if nodes == nil || len(nodes) == 0 {
return nil, fmt.Errorf("no nodes found in the graph")
}
g := &Graph{
nodes: nodes,
releases: make(map[int][]helmopv1.HelmRelease),
}
e, ok := nodes[entry]
if !ok {
return nil, fmt.Errorf("\"entry\" node not found")
}
err := g.bft(e)
if err != nil {
return nil, err
}
return g, nil
}
// bft performs the Breath First Traversal of the DAG
func (g *Graph) bft(node v1alpha12.NodeStatus) error {
visited := make(map[string]*Node)
level := 0
q := []v1alpha12.NodeStatus{}
q = append(q, node)
visited[node.ID] = &Node{
Status: node,
Level: level,
}
for len(q) > 0 {
level++
n := q[0]
for _, c := range n.Children {
ch := g.nodes[c]
if _, ok := visited[ch.ID]; !ok {
visited[ch.ID] = &Node{
Status: ch,
Level: level,
}
q = append(q, ch)
}
}
q = q[1:]
}
for _, v := range visited {
if v.Status.Type != v1alpha12.NodeTypePod {
continue
}
hrStr := v.Status.Inputs.Parameters[0].Value
hr := helmopv1.HelmRelease{}
dec := k8Yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(*hrStr)), 1000)
if err := dec.Decode(&hr); err != nil {
return err
}
if _, ok := g.releases[v.Level]; !ok {
g.releases[v.Level] = make([]helmopv1.HelmRelease, 0)
}
g.releases[v.Level] = append(g.releases[v.Level], hr)
}
g.maxLevel = level
return nil
}
func (g *Graph) Reverse() [][]helmopv1.HelmRelease {
reverseSlice := make([][]helmopv1.HelmRelease, 0)
for i := g.maxLevel; i >= 0; i-- {
if _, ok := g.releases[i]; ok {
reverseSlice = append(reverseSlice, g.releases[i])
}
}
return reverseSlice
}