зеркало из https://github.com/golang/build.git
internal/workflow: run at most one expansion at a time
We've inadvertently been running expansions concurrently during minor releases. For better or worse, it happened to be seemingly fine, and problems started to be noticeable only when needing to restart or approve tasks, which failed with puzzling "unknown task" errors. It's useful to be able to plan builders for the two Go releases in parallel, and it's completely fine for it not to happen concurrently. Instead of getting the workflow to arrange for that, it seems we can do it in the workflow package itself. The new TestManualRetryMultipleExpansions test fails before the change, and passes after. Fixes golang/go#70249. Change-Id: Id87323f77f573d9ac364010dfc0b8581e57ce9b8 Reviewed-on: https://go-review.googlesource.com/c/build/+/626335 Auto-Submit: Dmitri Shuralyov <dmitshur@golang.org> Reviewed-by: Dmitri Shuralyov <dmitshur@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Carlos Amedee <carlos@golang.org>
This commit is contained in:
Родитель
0f31c9541f
Коммит
e823c990d7
|
@ -35,8 +35,8 @@
|
|||
// definition rather than producing an output. Unlike Actions and Tasks, they
|
||||
// execute multiple times and must produce exactly the same workflow
|
||||
// modifications each time. As such, they should be pure functions of their
|
||||
// inputs. Producing different modifications, or running multiple expansions
|
||||
// concurrently, is an error that will corrupt the workflow's state.
|
||||
// inputs. Producing different modifications is an error that will corrupt
|
||||
// the workflow's state. A workflow will run at most one expansion at a time.
|
||||
//
|
||||
// Once a Definition is complete, call Start to set its parameters and
|
||||
// instantiate it into a Workflow. Call Run to execute the workflow until
|
||||
|
@ -486,8 +486,7 @@ func (d *dependency) ready(w *Workflow) bool {
|
|||
// Unlike normal tasks, expansions may run multiple times and must produce
|
||||
// the exact same changes to the definition each time.
|
||||
//
|
||||
// Running more than one expansion concurrently is an error and will corrupt
|
||||
// the workflow.
|
||||
// A workflow will run at most one expansion at a time.
|
||||
func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1] {
|
||||
return addExpansion[O1](d, name, f, nil, opts)
|
||||
}
|
||||
|
@ -807,6 +806,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
|
|||
doneOnce := ctx.Done()
|
||||
for {
|
||||
running := 0
|
||||
runningExpansion := false // Whether an expansion is running, and hasn't completed yet.
|
||||
allDone := true
|
||||
for _, task := range w.tasks {
|
||||
if !task.created {
|
||||
|
@ -834,11 +834,16 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
|
|||
if !ready {
|
||||
continue
|
||||
}
|
||||
if task.def.isExpansion && runningExpansion {
|
||||
// Don't start a new expansion until the currently running one completes.
|
||||
continue
|
||||
}
|
||||
task.started = true
|
||||
running++
|
||||
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
|
||||
taskCopy := *task
|
||||
if task.def.isExpansion {
|
||||
runningExpansion = true
|
||||
defCopy := w.def.shallowClone()
|
||||
go func() { stateChan <- runExpansion(defCopy, taskCopy, args) }()
|
||||
} else {
|
||||
|
@ -861,6 +866,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
|
|||
case state := <-stateChan:
|
||||
if state.def.isExpansion && state.finished && state.err == nil {
|
||||
state.err = w.expand(state.expanded)
|
||||
runningExpansion = false
|
||||
}
|
||||
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
|
||||
w.tasks[state.def] = &state
|
||||
|
|
|
@ -332,6 +332,74 @@ func TestManualRetry(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test that manual retry works on tasks that come from different expansions.
|
||||
//
|
||||
// This is similar to how the Go minor release workflow plans builders for
|
||||
// both releases. It previously failed due to expansions racing with with other,
|
||||
// leading to "unknown task" errors when retrying. See go.dev/issue/70249.
|
||||
func TestManualRetryMultipleExpansions(t *testing.T) {
|
||||
// Create two sub-workflows, each one with an expansion that adds one work task.
|
||||
// The work tasks fail on the first try, and require being successfully restarted
|
||||
// for the workflow to complete.
|
||||
var counters, retried [2]int
|
||||
wd := wf.New(wf.ACL{})
|
||||
sub1 := wd.Sub("sub1")
|
||||
sub2 := wd.Sub("sub2")
|
||||
for i, wd := range []*wf.Definition{sub1, sub2} {
|
||||
out := wf.Expand0(wd, fmt.Sprintf("expand %d", i+1), func(wd *wf.Definition) (wf.Value[string], error) {
|
||||
return wf.Task0(wd, fmt.Sprintf("work %d", i+1), func(ctx *wf.TaskContext) (string, error) {
|
||||
ctx.DisableRetries()
|
||||
counters[i]++
|
||||
if counters[i] == 1 {
|
||||
return "", fmt.Errorf("first try fail")
|
||||
}
|
||||
return "", nil
|
||||
}), nil
|
||||
})
|
||||
wf.Output(wd, "out", out)
|
||||
}
|
||||
|
||||
w := startWorkflow(t, wd, nil)
|
||||
listener := &errorListener{
|
||||
taskName: "work 1",
|
||||
callback: func(string) {
|
||||
go func() {
|
||||
retried[0]++
|
||||
err := w.RetryTask(context.Background(), "work 1")
|
||||
if err != nil {
|
||||
t.Errorf(`RetryTask("work 1") failed: %v`, err)
|
||||
}
|
||||
}()
|
||||
},
|
||||
Listener: &errorListener{
|
||||
taskName: "work 2",
|
||||
callback: func(string) {
|
||||
go func() {
|
||||
retried[1]++
|
||||
err := w.RetryTask(context.Background(), "work 2")
|
||||
if err != nil {
|
||||
t.Errorf(`RetryTask("work 2") failed: %v`, err)
|
||||
}
|
||||
}()
|
||||
},
|
||||
Listener: &verboseListener{t},
|
||||
},
|
||||
}
|
||||
runWorkflow(t, w, listener)
|
||||
if counters[0] != 2 {
|
||||
t.Errorf("sub1 task ran %v times, wanted 2", counters[0])
|
||||
}
|
||||
if retried[0] != 1 {
|
||||
t.Errorf("sub1 task was retried %v times, wanted 1", retried[0])
|
||||
}
|
||||
if counters[1] != 2 {
|
||||
t.Errorf("sub2 task ran %v times, wanted 2", counters[1])
|
||||
}
|
||||
if retried[1] != 1 {
|
||||
t.Errorf("sub2 task was retried %v times, wanted 1", retried[1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutomaticRetry(t *testing.T) {
|
||||
counter := 0
|
||||
needsRetry := func(ctx *wf.TaskContext) (string, error) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче