Refactor the DAG application logic into Graph (#371)

* Refactor the DAG application logic into the graph creation

* Refactor the DAG application logic into the graph creation

* Add graph testing
This commit is contained in:
Jonathan Innis 2021-08-26 14:00:35 -07:00 коммит произвёл GitHub
Родитель 3989de1777
Коммит 5da5590041
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 1035 добавлений и 1284 удалений

Просмотреть файл

@ -22,8 +22,7 @@ endif
all: manager
# Create a local docker registry, start kind cluster, and install Orkestra
dev:
make kind-create
dev: kind-create
helm upgrade --install orkestra chart/orkestra --wait --atomic -n orkestra --create-namespace --values ${CI_VALUES}
debug: dev

Просмотреть файл

@ -244,12 +244,15 @@ func GetJSON(values map[string]interface{}) (*apiextensionsv1.JSON, error) {
}
// SetValues marshals the raw values into the JSON values
func (in *Application) SetValues(values map[string]interface{}) error {
func (in *Release) SetValues(values map[string]interface{}) error {
bytes, err := json.Marshal(values)
if err != nil {
return err
}
in.Spec.Release.Values.Raw = bytes
if in.Values == nil {
in.Values = &apiextensionsv1.JSON{}
}
in.Values.Raw = bytes
return nil
}

Просмотреть файл

@ -1,162 +1,152 @@
package workflow
import (
"bytes"
"encoding/base64"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
fluxhelmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1"
k8Yaml "k8s.io/apimachinery/pkg/util/yaml"
"github.com/Azure/Orkestra/api/v1alpha1"
"github.com/Azure/Orkestra/pkg/utils"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
)
type Graph struct {
nodes map[string]v1alpha13.NodeStatus
releases map[int][]fluxhelmv2beta1.HelmRelease
maxLevel int
Name string
Nodes map[string]*AppNode
}
type Node struct {
Status v1alpha13.NodeStatus
Level int
type AppNode struct {
Name string
Dependencies []string
Tasks map[string]*TaskNode
}
func Build(entry string, nodes map[string]v1alpha13.NodeStatus) (*Graph, error) {
if len(nodes) == 0 {
return nil, ErrNoNodesFound
}
type TaskNode struct {
Name string
ChartName string
ChartVersion string
Parent string
Release *v1alpha1.Release
Dependencies []string
}
func NewForwardGraph(appGroup *v1alpha1.ApplicationGroup) *Graph {
g := &Graph{
nodes: nodes,
releases: make(map[int][]fluxhelmv2beta1.HelmRelease),
Name: appGroup.Name,
Nodes: make(map[string]*AppNode),
}
e, ok := nodes[entry]
if !ok {
return nil, ErrEntryNodeNotFound
}
for i, application := range appGroup.Spec.Applications {
applicationNode := NewAppNode(&application)
applicationNode.Tasks[application.Name] = NewTaskNode(&application)
appValues := application.Spec.Release.GetValues()
err := g.bft(e)
if err != nil {
return nil, err
}
// Iterate through the subchart nodes
for _, subChart := range application.Spec.Subcharts {
subChartVersion := appGroup.Status.Applications[i].Subcharts[subChart.Name].Version
chartName := utils.GetSubchartName(application.Name, subChart.Name)
return g, nil
// Get the sub-chart values and assign that ot the release
values, _ := subChartValues(subChart.Name, application.GetValues())
release := application.Spec.Release.DeepCopy()
release.Values = values
subChartNode := &TaskNode{
Name: subChart.Name,
ChartName: chartName,
ChartVersion: subChartVersion,
Release: release,
Parent: application.Name,
Dependencies: subChart.Dependencies,
}
applicationNode.Tasks[subChart.Name] = subChartNode
// Disable the sub-chart dependencies in the values of the parent chart
appValues[subChart.Name] = map[string]interface{}{
"enabled": false,
}
// Add the node to the set of parent node dependencies
applicationNode.Tasks[application.Name].Dependencies = append(applicationNode.Tasks[application.Name].Dependencies, subChart.Name)
}
_ = applicationNode.Tasks[application.Name].Release.SetValues(appValues)
g.Nodes[applicationNode.Name] = applicationNode
}
return g
}
// bft performs the Breath First Traversal of the DAG
func (g *Graph) bft(node v1alpha13.NodeStatus) error {
visited := make(map[string]*Node)
level := 0
q := []v1alpha13.NodeStatus{}
q = append(q, node)
func NewReverseGraph(appGroup *v1alpha1.ApplicationGroup) *Graph {
g := NewForwardGraph(appGroup).clearDependencies()
visited[node.ID] = &Node{
Status: node,
Level: level,
}
for _, application := range appGroup.Spec.Applications {
// Iterate through the application dependencies and reverse the dependency relationship
for _, dep := range application.Dependencies {
if node, ok := g.Nodes[dep]; ok {
node.Dependencies = append(node.Dependencies, application.Name)
}
}
for _, subChart := range application.Spec.Subcharts {
subChartNode := g.Nodes[application.Name].Tasks[subChart.Name]
for len(q) > 0 {
level++
n := q[0]
for _, c := range n.Children {
ch := g.nodes[c]
if _, ok := visited[ch.ID]; !ok {
// don't visit the child if it is reachable indirectly
if !g.isIndirectChild(ch.ID, n) {
// don't visit failed nodes
if ch.Phase != v1alpha13.NodeSkipped &&
ch.Phase != v1alpha13.NodeFailed &&
ch.Phase != v1alpha13.NodeError {
visited[ch.ID] = &Node{
Status: ch,
Level: level,
}
q = append(q, ch)
}
// Sub-chart dependencies now depend on this sub-chart to reverse
for _, dep := range subChart.Dependencies {
if node, ok := g.Nodes[application.Name].Tasks[dep]; ok {
node.Dependencies = append(node.Dependencies, subChartNode.Name)
}
}
// Sub-chart now depends on the parent application chart to reverse
subChartNode.Dependencies = append(subChartNode.Dependencies, application.Name)
}
q = q[1:]
}
for _, v := range visited {
if v.Status.Type != v1alpha13.NodeTypePod {
continue
}
if v.Status.Inputs == nil {
return ErrInvalidInputsPtr
}
if len(v.Status.Inputs.Parameters) == 0 {
return ErrNilParametersSlice
}
if v.Status.Inputs.Parameters[0].Value == nil {
return ErrInvalidValuePtr
}
hrStr := v.Status.Inputs.Parameters[0].Value
hrBytes, err := base64.StdEncoding.DecodeString(string(*hrStr))
if err != nil {
return err
}
hr := fluxhelmv2beta1.HelmRelease{}
dec := k8Yaml.NewYAMLOrJSONDecoder(bytes.NewReader(hrBytes), 1000)
if err := dec.Decode(&hr); err != nil {
return err
}
if _, ok := g.releases[v.Level]; !ok {
g.releases[v.Level] = make([]fluxhelmv2beta1.HelmRelease, 0)
}
g.releases[v.Level] = append(g.releases[v.Level], hr)
}
g.maxLevel = level
return nil
return g
}
func (g *Graph) isIndirectChild(nodeID string, node v1alpha13.NodeStatus) bool {
for _, c := range node.Children {
ch := g.nodes[c]
if ch.ID != nodeID && g.isChild(nodeID, ch) {
return true
func (g *Graph) clearDependencies() *Graph {
for _, node := range g.Nodes {
node.Dependencies = nil
for _, task := range node.Tasks {
task.Dependencies = nil
}
}
return false
return g
}
func (g *Graph) isChild(nodeID string, node v1alpha13.NodeStatus) bool {
visited := make(map[string]bool)
q := []v1alpha13.NodeStatus{}
q = append(q, node)
func NewAppNode(application *v1alpha1.Application) *AppNode {
return &AppNode{
Name: application.Name,
Dependencies: application.Dependencies,
Tasks: make(map[string]*TaskNode),
}
}
visited[node.ID] = true
func NewTaskNode(application *v1alpha1.Application) *TaskNode {
return &TaskNode{
Name: application.Name,
ChartName: application.Spec.Chart.Name,
ChartVersion: application.Spec.Chart.Version,
Release: application.Spec.Release,
}
}
for len(q) > 0 {
n := q[0]
for _, c := range n.Children {
ch := g.nodes[c]
if ch.ID == nodeID {
return true
}
if !visited[ch.ID] {
visited[ch.ID] = true
q = append(q, ch)
func subChartValues(subChartName string, values map[string]interface{}) (*apiextensionsv1.JSON, error) {
data := make(map[string]interface{})
if scVals, ok := values[subChartName]; ok {
if vv, ok := scVals.(map[string]interface{}); ok {
for k, val := range vv {
data[k] = val
}
}
q = q[1:]
}
return false
}
func (g *Graph) Reverse() [][]fluxhelmv2beta1.HelmRelease {
reverseSlice := make([][]fluxhelmv2beta1.HelmRelease, 0)
for i := g.maxLevel; i >= 0; i-- {
if _, ok := g.releases[i]; ok {
reverseSlice = append(reverseSlice, g.releases[i])
if vv, ok := scVals.(map[string]string); ok {
for k, val := range vv {
data[k] = val
}
}
}
return reverseSlice
if gVals, ok := values[ValuesKeyGlobal]; ok {
if vv, ok := gVals.(map[string]interface{}); ok {
data[ValuesKeyGlobal] = vv
}
if vv, ok := gVals.(map[string]string); ok {
data[ValuesKeyGlobal] = vv
}
}
return v1alpha1.GetJSON(data)
}

Просмотреть файл

@ -1,55 +1,601 @@
package workflow
import (
"encoding/json"
"testing"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/Azure/Orkestra/api/v1alpha1"
"github.com/Azure/Orkestra/pkg/utils"
"github.com/google/go-cmp/cmp"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestBuild(t *testing.T) {
func Test_NewForwardGraph(t *testing.T) {
type args struct {
entry string
nodes map[string]v1alpha13.NodeStatus
appGroup *v1alpha1.ApplicationGroup
}
testmap := make(map[string]v1alpha13.NodeStatus)
testmap["dummy"] = v1alpha13.NodeStatus{}
tests := []struct {
name string
args args
want *Graph
err error
}{
//
// TODO: Add more test cases.
//
{
name: "testing nil nodes",
name: "Basic Ordered Set of Applications",
args: args{
entry: "",
nodes: nil,
appGroup: &v1alpha1.ApplicationGroup{
ObjectMeta: v1.ObjectMeta{
Name: "application",
},
Spec: v1alpha1.ApplicationGroupSpec{
Applications: []v1alpha1.Application{
{
DAG: v1alpha1.DAG{
Name: "application1",
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application1",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
},
},
{
DAG: v1alpha1.DAG{
Name: "application2",
Dependencies: []string{"application1"},
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application2",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
},
},
{
DAG: v1alpha1.DAG{
Name: "application3",
Dependencies: []string{"application2"},
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application3",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
},
},
},
},
},
},
want: &Graph{
Name: "application",
Nodes: map[string]*AppNode{
"application1": {
Name: "application1",
Tasks: map[string]*TaskNode{
"application1": {
Name: "application1",
ChartName: "application1",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
},
},
},
"application2": {
Name: "application2",
Dependencies: []string{"application1"},
Tasks: map[string]*TaskNode{
"application2": {
Name: "application2",
ChartName: "application2",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
},
},
},
"application3": {
Name: "application3",
Dependencies: []string{"application2"},
Tasks: map[string]*TaskNode{
"application3": {
Name: "application3",
ChartName: "application3",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
},
},
},
},
},
want: nil,
err: ErrNoNodesFound,
},
{
name: "testing unknown entry",
name: "Applications with Subcharts",
args: args{
entry: "unknown",
nodes: testmap,
appGroup: &v1alpha1.ApplicationGroup{
ObjectMeta: v1.ObjectMeta{
Name: "application",
},
Spec: v1alpha1.ApplicationGroupSpec{
Applications: []v1alpha1.Application{
{
DAG: v1alpha1.DAG{
Name: "application1",
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application1",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
Subcharts: []v1alpha1.DAG{
{
Name: "subchart1",
Dependencies: []string{"subchart2"},
},
{
Name: "subchart2",
},
{
Name: "subchart3",
Dependencies: []string{"subchart1", "subchart2"},
},
},
},
},
{
DAG: v1alpha1.DAG{
Name: "application2",
Dependencies: []string{"application1"},
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application2",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
Subcharts: []v1alpha1.DAG{
{
Name: "subchart1",
},
},
},
},
{
DAG: v1alpha1.DAG{
Name: "application3",
Dependencies: []string{"application2"},
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application3",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
},
},
},
},
Status: v1alpha1.ApplicationGroupStatus{
Applications: []v1alpha1.ApplicationStatus{
{
Subcharts: map[string]v1alpha1.ChartStatus{
"subchart1": {
Version: "0.1.0",
},
"subchart2": {
Version: "0.1.0",
},
"subchart3": {
Version: "0.1.0",
},
},
},
{
Subcharts: map[string]v1alpha1.ChartStatus{
"subchart1": {
Version: "0.1.0",
},
},
},
},
},
},
},
want: &Graph{
Name: "application",
Nodes: map[string]*AppNode{
"application1": {
Name: "application1",
Tasks: map[string]*TaskNode{
"application1": {
Name: "application1",
ChartName: "application1",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{"subchart1":{"enabled":false},"subchart2":{"enabled":false},"subchart3":{"enabled":false}}`),
},
},
Dependencies: []string{"subchart1", "subchart2", "subchart3"},
},
"subchart1": {
Name: "subchart1",
ChartName: utils.GetSubchartName("application1", "subchart1"),
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
Parent: "application1",
Dependencies: []string{"subchart2"},
},
"subchart2": {
Name: "subchart2",
ChartName: utils.GetSubchartName("application1", "subchart2"),
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
Parent: "application1",
},
"subchart3": {
Name: "subchart3",
ChartName: utils.GetSubchartName("application1", "subchart3"),
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
Parent: "application1",
Dependencies: []string{"subchart1", "subchart2"},
},
},
},
"application2": {
Name: "application2",
Dependencies: []string{"application1"},
Tasks: map[string]*TaskNode{
"application2": {
Name: "application2",
ChartName: "application2",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{"subchart1":{"enabled":false}}`),
},
},
Dependencies: []string{"subchart1"},
},
"subchart1": {
Name: "subchart1",
ChartName: utils.GetSubchartName("application2", "subchart1"),
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
Parent: "application2",
},
},
},
"application3": {
Name: "application3",
Dependencies: []string{"application2"},
Tasks: map[string]*TaskNode{
"application3": {
Name: "application3",
ChartName: "application3",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
},
},
},
},
},
want: nil,
err: ErrEntryNodeNotFound,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := Build(tt.args.entry, tt.args.nodes)
if err != tt.err {
t.Errorf("Build() error = %v, want %v", err, tt.err)
}
if got != tt.want {
t.Errorf("Build() = %v, want %v", got, tt.want)
got := NewForwardGraph(tt.args.appGroup)
if !cmp.Equal(got, tt.want) {
t.Errorf("NewForwardGraph() = %v", cmp.Diff(got, tt.want))
}
})
}
}
func Test_NewReverseGraph(t *testing.T) {
type args struct {
appGroup *v1alpha1.ApplicationGroup
}
tests := []struct {
name string
args args
want *Graph
}{
{
name: "Reverse Basic Ordered Graph",
args: args{
appGroup: &v1alpha1.ApplicationGroup{
ObjectMeta: v1.ObjectMeta{
Name: "application",
},
Spec: v1alpha1.ApplicationGroupSpec{
Applications: []v1alpha1.Application{
{
DAG: v1alpha1.DAG{
Name: "application1",
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application1",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
},
},
{
DAG: v1alpha1.DAG{
Name: "application2",
Dependencies: []string{"application1"},
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application2",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
},
},
{
DAG: v1alpha1.DAG{
Name: "application3",
Dependencies: []string{"application2"},
},
Spec: v1alpha1.ApplicationSpec{
Chart: &v1alpha1.ChartRef{
Name: "application3",
Version: "0.1.0",
},
Release: &v1alpha1.Release{},
},
},
},
},
},
},
want: &Graph{
Name: "application",
Nodes: map[string]*AppNode{
"application1": {
Name: "application1",
Dependencies: []string{"application2"},
Tasks: map[string]*TaskNode{
"application1": {
Name: "application1",
ChartName: "application1",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
},
},
},
"application2": {
Name: "application2",
Dependencies: []string{"application3"},
Tasks: map[string]*TaskNode{
"application2": {
Name: "application2",
ChartName: "application2",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
},
},
},
"application3": {
Name: "application3",
Tasks: map[string]*TaskNode{
"application3": {
Name: "application3",
ChartName: "application3",
ChartVersion: "0.1.0",
Release: &v1alpha1.Release{
Values: &apiextensionsv1.JSON{
Raw: []byte(`{}`),
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewReverseGraph(tt.args.appGroup)
if !cmp.Equal(got, tt.want) {
t.Errorf("NewReverseGraph() = %v", cmp.Diff(got, tt.want))
}
})
}
}
func Test_clearDependencies(t *testing.T) {
type args struct {
graph *Graph
}
tests := []struct {
name string
args args
want *Graph
}{
{
name: "Clear Dependencies at Each Level",
args: args{
graph: &Graph{
Name: "application",
Nodes: map[string]*AppNode{
"application1": {
Dependencies: []string{"application2"},
Tasks: map[string]*TaskNode{
"application1": {
Dependencies: []string{"application2", "application1"},
},
},
},
"application2": {
Name: "application2",
Dependencies: []string{"application3"},
Tasks: map[string]*TaskNode{
"application2": {
Dependencies: []string{},
},
},
},
"application3": {
Name: "application3",
Tasks: map[string]*TaskNode{
"application3": {
Dependencies: []string{"here", "there"},
},
},
},
},
},
},
want: &Graph{
Name: "application",
Nodes: map[string]*AppNode{
"application1": {
Tasks: map[string]*TaskNode{
"application1": {},
},
},
"application2": {
Name: "application2",
Tasks: map[string]*TaskNode{
"application2": {},
},
},
"application3": {
Name: "application3",
Tasks: map[string]*TaskNode{
"application3": {},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.args.graph.clearDependencies()
if !cmp.Equal(got, tt.want) {
t.Errorf("NewReverseGraph() = %v", cmp.Diff(got, tt.want))
}
})
}
}
func Test_subChartValues(t *testing.T) {
type args struct {
sc string
av apiextensionsv1.JSON
}
tests := []struct {
name string
args args
want apiextensionsv1.JSON
}{
{
name: "withGlobalSuchart",
args: args{
sc: "subchart",
av: apiextensionsv1.JSON{
Raw: []byte(`{"global":{"keyG":"valueG"},"subchart":{"keySC":"valueSC"}}`),
},
},
want: apiextensionsv1.JSON{
Raw: []byte(`{"global":{"keyG":"valueG"},"keySC":"valueSC"}`),
},
},
{
name: "withOnlyGlobal",
args: args{
sc: "subchart",
av: apiextensionsv1.JSON{
Raw: []byte(`{"global":{"keyG":"valueG"}}`),
},
},
want: apiextensionsv1.JSON{
Raw: []byte(`{"global":{"keyG":"valueG"}}`),
},
},
{
name: "withOnlySubchart",
args: args{
sc: "subchart",
av: apiextensionsv1.JSON{
Raw: []byte(`{"subchart":{"keySC":"valueSC"}}`),
},
},
want: apiextensionsv1.JSON{
Raw: []byte(`{"keySC":"valueSC"}`),
},
},
{
name: "withNone",
args: args{
sc: "subchart",
av: apiextensionsv1.JSON{
Raw: []byte(""),
},
},
want: apiextensionsv1.JSON{
Raw: []byte("{}"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
values := make(map[string]interface{})
_ = json.Unmarshal(tt.args.av.Raw, &values)
if got, _ := subChartValues(tt.args.sc, values); !cmp.Equal(*got, tt.want) {
t.Errorf("subchartValues() = %v", cmp.Diff(*got, tt.want))
}
})
}

Просмотреть файл

@ -8,155 +8,67 @@ import (
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
fluxhelmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1"
fluxsourcev1beta1 "github.com/fluxcd/source-controller/api/v1beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func generateTemplates(instance *v1alpha1.ApplicationGroup, options ClientOptions) (*v1alpha13.Template, []v1alpha13.Template, error) {
if instance == nil {
return nil, nil, fmt.Errorf("applicationGroup cannot be nil")
}
templates, err := generateAppDAGTemplates(instance, options.namespace, options.parallelism)
func generateTemplates(graph *Graph, options ClientOptions) (*v1alpha13.Template, []v1alpha13.Template, error) {
templateMap, err := generateAppDAGTemplates(graph, options.namespace, options.parallelism)
if err != nil {
return nil, nil, fmt.Errorf("failed to generate application DAG templates : %w", err)
}
// Create the entry template from the app dag templates
entryTemplate := &v1alpha13.Template{
Name: EntrypointTemplateName,
DAG: &v1alpha13.DAGTemplate{
Tasks: make([]v1alpha13.DAGTask, len(instance.Spec.Applications)),
},
Name: EntrypointTemplateName,
DAG: &v1alpha13.DAGTemplate{},
Parallelism: options.parallelism,
}
for i, tpl := range templates {
entryTemplate.DAG.Tasks[i] = v1alpha13.DAGTask{
Name: utils.ConvertToDNS1123(tpl.Name),
Template: utils.ConvertToDNS1123(tpl.Name),
Dependencies: utils.ConvertSliceToDNS1123(instance.Spec.Applications[i].Dependencies),
}
var templateSlice []v1alpha13.Template
for name, template := range templateMap {
entryTemplate.DAG.Tasks = append(entryTemplate.DAG.Tasks, v1alpha13.DAGTask{
Name: utils.ConvertToDNS1123(template.Name),
Template: utils.ConvertToDNS1123(template.Name),
Dependencies: utils.ConvertSliceToDNS1123(graph.Nodes[name].Dependencies),
})
templateSlice = append(templateSlice, template)
}
return entryTemplate, templates, nil
return entryTemplate, templateSlice, nil
}
func generateAppDAGTemplates(appGroup *v1alpha1.ApplicationGroup, namespace string, parallelism *int64) ([]v1alpha13.Template, error) {
ts := make([]v1alpha13.Template, 0)
func generateAppDAGTemplates(graph *Graph, namespace string, parallelism *int64) (map[string]v1alpha13.Template, error) {
templateMap := make(map[string]v1alpha13.Template, 0)
for i, app := range appGroup.Spec.Applications {
var hasSubcharts bool
scStatus := appGroup.Status.Applications[i].Subcharts
// Create Subchart DAG only when the application chart has dependencies
if len(app.Spec.Subcharts) > 0 {
hasSubcharts = true
t := v1alpha13.Template{
Name: utils.ConvertToDNS1123(app.Name),
Parallelism: parallelism,
}
t.DAG = &v1alpha13.DAGTemplate{}
tasks, err := generateSubchartAndAppDAGTasks(appGroup.Name, namespace, &app, scStatus)
if err != nil {
return nil, fmt.Errorf("failed to generate Application Template DAG tasks : %w", err)
}
t.DAG.Tasks = tasks
ts = append(ts, t)
for name, node := range graph.Nodes {
template := v1alpha13.Template{
Name: utils.ConvertToDNS1123(node.Name),
Parallelism: parallelism,
DAG: &v1alpha13.DAGTemplate{
Tasks: []v1alpha13.DAGTask{},
},
}
if !hasSubcharts {
hr := helmReleaseBuilder(app.Spec.Release, namespace, app.Name, app.Spec.Chart.Name, app.Name, app.Spec.Chart.Version)
hr.Spec.Interval = app.Spec.Release.Interval
hr.Spec.Values = app.Spec.Release.Values
for _, task := range node.Tasks {
hr := createHelmRelease(task.Release, namespace, task.ChartName, task.ChartVersion)
hr.Labels = map[string]string{
ChartLabelKey: app.Name,
OwnershipLabel: appGroup.Name,
ChartLabelKey: task.ChartName,
OwnershipLabel: graph.Name,
HeritageLabel: Project,
}
hrStr := utils.HrToB64AnyStringPtr(hr)
tApp := v1alpha13.Template{
Name: utils.ConvertToDNS1123(app.Name),
Parallelism: parallelism,
DAG: &v1alpha13.DAGTemplate{
Tasks: []v1alpha13.DAGTask{
appDAGTaskBuilder(app.Name, getTimeout(app.Spec.Release.Timeout), hrStr),
},
},
if task.Parent != "" {
hr.Annotations = map[string]string{
v1alpha1.ParentChartAnnotation: task.Parent,
}
}
ts = append(ts, tApp)
hrStr := utils.HrToB64AnyStringPtr(hr)
template.DAG.Tasks = append(template.DAG.Tasks, appDAGTaskBuilder(task.Name, task.Dependencies, getTimeout(task.Release.Timeout), hrStr))
}
templateMap[name] = template
}
return ts, nil
return templateMap, nil
}
func generateSubchartAndAppDAGTasks(appGroupName, namespace string, app *v1alpha1.Application, subchartsStatus map[string]v1alpha1.ChartStatus) ([]v1alpha13.DAGTask, error) {
// XXX (nitishm) Should this be set to nil if no subcharts are found??
tasks := make([]v1alpha13.DAGTask, 0, len(app.Spec.Subcharts)+1)
for _, sc := range app.Spec.Subcharts {
subchartName := sc.Name
subchartVersion := subchartsStatus[subchartName].Version
hr, err := generateSubchartHelmRelease(app.Spec.Release, namespace, app.Spec.Chart.Name, subchartName, subchartVersion)
if err != nil {
return nil, err
}
hr.Annotations = map[string]string{
v1alpha1.ParentChartAnnotation: app.Name,
}
hr.Labels = map[string]string{
ChartLabelKey: app.Name,
OwnershipLabel: appGroupName,
HeritageLabel: Project,
}
hrStr := utils.HrToB64AnyStringPtr(hr)
task := appDAGTaskBuilder(subchartName, getTimeout(app.Spec.Release.Timeout), hrStr)
task.Dependencies = utils.ConvertSliceToDNS1123(sc.Dependencies)
tasks = append(tasks, task)
}
hr := helmReleaseBuilder(app.Spec.Release, namespace, app.Name, app.Spec.Chart.Name, app.Name, app.Spec.Chart.Version)
hr.Spec.Interval = app.Spec.Release.Interval
hr.Spec.Values = app.Spec.Release.Values
hr.Labels = map[string]string{
ChartLabelKey: app.Name,
OwnershipLabel: appGroupName,
HeritageLabel: Project,
}
// Force disable all subchart for the staged application chart
// to prevent duplication and possible collision of deployed resources
// Since the subchart should have been deployed in a prior DAG step,
// we must not redeploy it along with the parent application chart.
values := app.GetValues()
for _, d := range app.Spec.Subcharts {
values[d.Name] = map[string]interface{}{
"enabled": false,
}
}
if err := app.SetValues(values); err != nil {
return nil, err
}
hrStr := utils.HrToB64AnyStringPtr(hr)
task := appDAGTaskBuilder(app.Name, getTimeout(app.Spec.Release.Timeout), hrStr)
task.Dependencies = func() (out []string) {
for _, t := range tasks {
out = append(out, utils.ConvertToDNS1123(t.Name))
}
return out
}()
tasks = append(tasks, task)
return tasks, nil
}
func appDAGTaskBuilder(name string, timeout, hrStr *v1alpha13.AnyString) v1alpha13.DAGTask {
func appDAGTaskBuilder(name string, dependencies []string, timeout, hrStr *v1alpha13.AnyString) v1alpha13.DAGTask {
task := v1alpha13.DAGTask{
Name: utils.ConvertToDNS1123(name),
Template: HelmReleaseExecutorName,
@ -172,43 +84,25 @@ func appDAGTaskBuilder(name string, timeout, hrStr *v1alpha13.AnyString) v1alpha
},
},
},
Dependencies: dependencies,
}
return task
}
func generateSubchartHelmRelease(r *v1alpha1.Release, namespace, appChartName, subchartName, version string) (*fluxhelmv2beta1.HelmRelease, error) {
chName := utils.GetSubchartName(appChartName, subchartName)
hr := helmReleaseBuilder(r, namespace, chName, chName, subchartName, version)
var releaseValues map[string]interface{}
if r != nil {
releaseValues = r.GetValues()
}
val, err := subchartValues(subchartName, releaseValues)
if err != nil {
return nil, err
}
hr.Spec.Values = val
return hr, nil
}
func helmReleaseBuilder(r *v1alpha1.Release, namespace, objMetaName, chName, releaseName, version string) *fluxhelmv2beta1.HelmRelease {
if r == nil {
r = &v1alpha1.Release{}
}
hr := &fluxhelmv2beta1.HelmRelease{
func createHelmRelease(r *v1alpha1.Release, namespace, name, version string) *fluxhelmv2beta1.HelmRelease {
return &fluxhelmv2beta1.HelmRelease{
TypeMeta: v1.TypeMeta{
Kind: fluxhelmv2beta1.HelmReleaseKind,
APIVersion: fluxhelmv2beta1.GroupVersion.String(),
},
ObjectMeta: v1.ObjectMeta{
Name: utils.ConvertToDNS1123(objMetaName),
Name: utils.ConvertToDNS1123(name),
Namespace: r.TargetNamespace,
},
Spec: fluxhelmv2beta1.HelmReleaseSpec{
Chart: fluxhelmv2beta1.HelmChartTemplate{
Spec: fluxhelmv2beta1.HelmChartTemplateSpec{
Chart: utils.ConvertToDNS1123(chName),
Chart: utils.ConvertToDNS1123(name),
Version: version,
SourceRef: fluxhelmv2beta1.CrossNamespaceObjectReference{
Kind: fluxsourcev1beta1.HelmRepositoryKind,
@ -217,42 +111,15 @@ func helmReleaseBuilder(r *v1alpha1.Release, namespace, objMetaName, chName, rel
},
},
},
ReleaseName: utils.ConvertToDNS1123(releaseName),
ReleaseName: utils.ConvertToDNS1123(name),
TargetNamespace: r.TargetNamespace,
Timeout: r.Timeout,
Install: r.Install,
Upgrade: r.Upgrade,
Rollback: r.Rollback,
Uninstall: r.Uninstall,
Interval: r.Interval,
Values: r.Values,
},
}
return hr
}
func subchartValues(sc string, values map[string]interface{}) (*apiextensionsv1.JSON, error) {
data := make(map[string]interface{})
if scVals, ok := values[sc]; ok {
if vv, ok := scVals.(map[string]interface{}); ok {
for k, val := range vv {
data[k] = val
}
}
}
if gVals, ok := values[ValuesKeyGlobal]; ok {
if vv, ok := gVals.(map[string]interface{}); ok {
data[ValuesKeyGlobal] = vv
}
}
return v1alpha1.GetJSON(data)
}
func getTaskNamesFromHelmReleases(bucket []fluxhelmv2beta1.HelmRelease) []string {
out := []string{}
for _, hr := range bucket {
out = append(out, utils.ConvertToDNS1123(hr.GetReleaseName()+"-"+(hr.Namespace)))
}
return out
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -328,14 +328,6 @@ func toConditionReason(nodePhase v1alpha13.WorkflowPhase) string {
}
}
func getNodes(wf *v1alpha13.Workflow) map[string]v1alpha13.NodeStatus {
nodes := make(map[string]v1alpha13.NodeStatus)
for _, node := range wf.Status.Nodes {
nodes[node.ID] = node
}
return nodes
}
func initWorkflowObject(name, namespace string, parallelism *int64) *v1alpha13.Workflow {
return &v1alpha13.Workflow{
ObjectMeta: v1.ObjectMeta{

Просмотреть файл

@ -61,7 +61,8 @@ func (wc *ForwardWorkflowClient) Generate(ctx context.Context) error {
}
wc.workflow = initWorkflowObject(wc.appGroup.Name, wc.namespace, wc.parallelism)
entryTemplate, templates, err := generateTemplates(wc.GetAppGroup(), wc.GetOptions())
graph := NewForwardGraph(wc.GetAppGroup())
entryTemplate, templates, err := generateTemplates(graph, wc.GetOptions())
if err != nil {
return fmt.Errorf("failed to generate workflow: %w", err)
}

Просмотреть файл

@ -2,7 +2,6 @@ package workflow
import (
"context"
"encoding/base64"
"fmt"
"github.com/Azure/Orkestra/api/v1alpha1"
@ -11,9 +10,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/Azure/Orkestra/pkg/utils"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
fluxhelmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@ -68,12 +65,16 @@ func (wc *ReverseWorkflowClient) Generate(ctx context.Context) error {
}
wc.reverseWorkflow = initWorkflowObject(wc.getReverseName(), wc.namespace, wc.parallelism)
entry, err := wc.generateWorkflow()
graph := NewReverseGraph(wc.GetAppGroup())
entryTemplate, templates, err := generateTemplates(graph, wc.GetOptions())
if err != nil {
return fmt.Errorf("failed to generate argo reverse workflow: %w", err)
return fmt.Errorf("failed to generate workflow: %w", err)
}
updateWorkflowTemplates(wc.reverseWorkflow, *entry, wc.executor(HelmReleaseReverseExecutorName, Delete))
// Update with the app dag templates, entry template, and executor template
updateWorkflowTemplates(wc.reverseWorkflow, templates...)
updateWorkflowTemplates(wc.reverseWorkflow, *entryTemplate, wc.executor(HelmReleaseExecutorName, Delete))
return nil
}
@ -103,50 +104,6 @@ func (wc *ReverseWorkflowClient) Submit(ctx context.Context) error {
return nil
}
func (wc *ReverseWorkflowClient) generateWorkflow() (*v1alpha13.Template, error) {
graph, err := Build(wc.forwardWorkflow.Name, getNodes(wc.forwardWorkflow))
if err != nil {
return nil, fmt.Errorf("failed to build the wf status DAG: %w", err)
}
rev := graph.Reverse()
entry := &v1alpha13.Template{
Name: EntrypointTemplateName,
DAG: &v1alpha13.DAGTemplate{
Tasks: make([]v1alpha13.DAGTask, 0),
},
}
var prevbucket []fluxhelmv2beta1.HelmRelease
for _, bucket := range rev {
for _, hr := range bucket {
task := v1alpha13.DAGTask{
Name: utils.ConvertToDNS1123(hr.GetReleaseName() + "-" + hr.Namespace),
Template: HelmReleaseReverseExecutorName,
Arguments: v1alpha13.Arguments{
Parameters: []v1alpha13.Parameter{
{
Name: HelmReleaseArg,
Value: utils.ToAnyStringPtr(base64.StdEncoding.EncodeToString([]byte(utils.HrToYaml(hr)))),
},
},
},
Dependencies: utils.ConvertSliceToDNS1123(getTaskNamesFromHelmReleases(prevbucket)),
}
entry.DAG.Tasks = append(entry.DAG.Tasks, task)
}
prevbucket = bucket
}
if len(entry.DAG.Tasks) == 0 {
return nil, fmt.Errorf("entry template must have at least one task")
}
return entry, nil
}
func (wc *ReverseWorkflowClient) getReverseName() string {
return fmt.Sprintf("%s-reverse", wc.forwardWorkflow.Name)
}

Просмотреть файл

@ -60,14 +60,16 @@ func (wc *RollbackWorkflowClient) Generate(ctx context.Context) error {
rollbackWorkflowName := fmt.Sprintf("%s-rollback", rollbackInstance.Name)
wc.workflow = initWorkflowObject(rollbackWorkflowName, wc.namespace, wc.parallelism)
entryTemplate, templates, err := generateTemplates(rollbackInstance, wc.GetOptions())
wc.workflow = initWorkflowObject(rollbackWorkflowName, wc.namespace, wc.parallelism)
graph := NewForwardGraph(rollbackInstance)
entryTemplate, templates, err := generateTemplates(graph, wc.GetOptions())
if err != nil {
return fmt.Errorf("failed to generate argo workflow: %w", err)
return fmt.Errorf("failed to generate workflow: %w", err)
}
// Update with the app dag templates, entry template, and executor template
updateWorkflowTemplates(wc.workflow, templates...)
updateWorkflowTemplates(wc.workflow, *entryTemplate, wc.executor(HelmReleaseExecutorName, Install))
return nil
}

Просмотреть файл

@ -1 +0,0 @@
package workflow