diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go index 94e0a21e..5a9533b2 100644 --- a/internal/workflow/workflow.go +++ b/internal/workflow/workflow.go @@ -35,8 +35,8 @@ // definition rather than producing an output. Unlike Actions and Tasks, they // execute multiple times and must produce exactly the same workflow // modifications each time. As such, they should be pure functions of their -// inputs. Producing different modifications, or running multiple expansions -// concurrently, is an error that will corrupt the workflow's state. +// inputs. Producing different modifications is an error that will corrupt +// the workflow's state. A workflow will run at most one expansion at a time. // // Once a Definition is complete, call Start to set its parameters and // instantiate it into a Workflow. Call Run to execute the workflow until @@ -486,8 +486,7 @@ func (d *dependency) ready(w *Workflow) bool { // Unlike normal tasks, expansions may run multiple times and must produce // the exact same changes to the definition each time. // -// Running more than one expansion concurrently is an error and will corrupt -// the workflow. +// A workflow will run at most one expansion at a time. func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1] { return addExpansion[O1](d, name, f, nil, opts) } @@ -807,6 +806,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter doneOnce := ctx.Done() for { running := 0 + runningExpansion := false // Whether an expansion is running, and hasn't completed yet. allDone := true for _, task := range w.tasks { if !task.created { @@ -834,11 +834,16 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter if !ready { continue } + if task.def.isExpansion && runningExpansion { + // Don't start a new expansion until the currently running one completes. + continue + } task.started = true running++ listener.TaskStateChanged(w.ID, task.def.name, task.toExported()) taskCopy := *task if task.def.isExpansion { + runningExpansion = true defCopy := w.def.shallowClone() go func() { stateChan <- runExpansion(defCopy, taskCopy, args) }() } else { @@ -861,6 +866,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter case state := <-stateChan: if state.def.isExpansion && state.finished && state.err == nil { state.err = w.expand(state.expanded) + runningExpansion = false } listener.TaskStateChanged(w.ID, state.def.name, state.toExported()) w.tasks[state.def] = &state diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go index b85be29c..61276fba 100644 --- a/internal/workflow/workflow_test.go +++ b/internal/workflow/workflow_test.go @@ -332,6 +332,74 @@ func TestManualRetry(t *testing.T) { } } +// Test that manual retry works on tasks that come from different expansions. +// +// This is similar to how the Go minor release workflow plans builders for +// both releases. It previously failed due to expansions racing with with other, +// leading to "unknown task" errors when retrying. See go.dev/issue/70249. +func TestManualRetryMultipleExpansions(t *testing.T) { + // Create two sub-workflows, each one with an expansion that adds one work task. + // The work tasks fail on the first try, and require being successfully restarted + // for the workflow to complete. + var counters, retried [2]int + wd := wf.New(wf.ACL{}) + sub1 := wd.Sub("sub1") + sub2 := wd.Sub("sub2") + for i, wd := range []*wf.Definition{sub1, sub2} { + out := wf.Expand0(wd, fmt.Sprintf("expand %d", i+1), func(wd *wf.Definition) (wf.Value[string], error) { + return wf.Task0(wd, fmt.Sprintf("work %d", i+1), func(ctx *wf.TaskContext) (string, error) { + ctx.DisableRetries() + counters[i]++ + if counters[i] == 1 { + return "", fmt.Errorf("first try fail") + } + return "", nil + }), nil + }) + wf.Output(wd, "out", out) + } + + w := startWorkflow(t, wd, nil) + listener := &errorListener{ + taskName: "work 1", + callback: func(string) { + go func() { + retried[0]++ + err := w.RetryTask(context.Background(), "work 1") + if err != nil { + t.Errorf(`RetryTask("work 1") failed: %v`, err) + } + }() + }, + Listener: &errorListener{ + taskName: "work 2", + callback: func(string) { + go func() { + retried[1]++ + err := w.RetryTask(context.Background(), "work 2") + if err != nil { + t.Errorf(`RetryTask("work 2") failed: %v`, err) + } + }() + }, + Listener: &verboseListener{t}, + }, + } + runWorkflow(t, w, listener) + if counters[0] != 2 { + t.Errorf("sub1 task ran %v times, wanted 2", counters[0]) + } + if retried[0] != 1 { + t.Errorf("sub1 task was retried %v times, wanted 1", retried[0]) + } + if counters[1] != 2 { + t.Errorf("sub2 task ran %v times, wanted 2", counters[1]) + } + if retried[1] != 1 { + t.Errorf("sub2 task was retried %v times, wanted 1", retried[1]) + } +} + func TestAutomaticRetry(t *testing.T) { counter := 0 needsRetry := func(ctx *wf.TaskContext) (string, error) {