internal/workflow: add native retry support

Waiting for the workflow to stop before retrying is very annoying. Add
support for retrying while the workflow runs.

Previously, the workflow stopped when no more tasks were runnable. This
raised difficult questions: does the workflow accept retry commands
while it's stopped? Should there be a way to force it to do at least one
task before giving up? Rather than deal with those, change the behavior:
the Run function now returns only on completion or context cancelation.
Update the tests to match. As far as I know, relui itself doesn't care.

Note, though: there is now no way to resume a stopped workflow. I don't
think there's much reason for us to stop them now so I don't think it's
a big problem?

Fixes golang/go#54304.

Change-Id: I4de56a6c50d71dddf0eaafce2e9c135c65e4cfec
Reviewed-on: https://go-review.googlesource.com/c/build/+/422098
Reviewed-by: Jenny Rakoczy <jenny@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Auto-Submit: Heschi Kreinick <heschi@google.com>
Run-TryBot: Heschi Kreinick <heschi@google.com>
This commit is contained in:
Heschi Kreinick 2022-08-03 12:07:47 -04:00 коммит произвёл Gopher Robot
Родитель aa54a98a2a
Коммит ac2bd83359
9 изменённых файлов: 221 добавлений и 271 удалений

Просмотреть файл

@ -300,14 +300,14 @@ func testSecurity(t *testing.T, mergeFixes bool) {
if err != nil {
t.Fatal(err)
}
_, err = w.Run(deps.ctx, &verboseListener{t, deps.outputListener})
if mergeFixes && err != nil {
t.Fatal(err)
}
if !mergeFixes {
if err == nil {
t.Fatal("release succeeded without merging fixes to the public repository")
if mergeFixes {
_, err = w.Run(deps.ctx, &verboseListener{t, deps.outputListener})
if err != nil {
t.Fatal(err)
}
} else {
runToFailure(t, deps.ctx, w, "Check branch state matches source archive", &verboseListener{t, deps.outputListener})
return
}
checkTGZ(t, deps.buildTasks.DownloadURL, deps.publishedFiles, "src.tar.gz", &WebsiteFile{
@ -818,6 +818,40 @@ func (l *testLogger) Printf(format string, v ...interface{}) {
l.t.Logf("task %-10v: LOG: %s", l.task, fmt.Sprintf(format, v...))
}
func runToFailure(t *testing.T, ctx context.Context, w *workflow.Workflow, task string, wrap workflow.Listener) string {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
t.Helper()
var message string
listener := &errorListener{
taskName: task,
callback: func(m string) {
message = m
cancel()
},
Listener: wrap,
}
_, err := w.Run(ctx, listener)
if err == nil {
t.Fatalf("workflow unexpectedly succeeded")
}
return message
}
type errorListener struct {
taskName string
callback func(string)
workflow.Listener
}
func (l *errorListener) TaskStateChanged(id uuid.UUID, taskID string, st *workflow.TaskState) error {
if st.Name == l.taskName && st.Finished && st.Error != "" {
l.callback(st.Error)
}
l.Listener.TaskStateChanged(id, taskID, st)
return nil
}
// fakeSign acts like a human running the signbinaries job periodically.
func fakeSign(ctx context.Context, t *testing.T, dir string) {
seen := map[string]bool{}

Просмотреть файл

@ -84,7 +84,7 @@
name="task.reset"
type="submit"
value="Retry"
onclick="return this.form.reportValidity() && confirm('This will retry the task and clear workflow errors.\n\nReady to proceed?')" />
onclick="return this.form.reportValidity() && confirm('This will retry the task.\n\nReady to proceed?')" />
</form>
</div>
{{else if and (not .ApprovedAt.Valid) (.ReadyForApproval)}}

Просмотреть файл

@ -154,7 +154,7 @@ func (s *Server) homeHandler(w http.ResponseWriter, r *http.Request) {
}
hr := &homeResponse{SiteHeader: s.header}
for _, w := range ws {
if s.w.running[w.ID.String()] != nil {
if _, ok := s.w.running[w.ID.String()]; ok {
hr.ActiveWorkflows = append(hr.ActiveWorkflows, w)
continue
}
@ -300,43 +300,10 @@ func (s *Server) retryTaskHandler(w http.ResponseWriter, r *http.Request, params
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
if err := s.retryTask(r.Context(), id, params.ByName("name")); err != nil {
log.Printf("s.retryTask(_, %q, %q): %v", id, params.ByName("id"), err)
if errors.Is(err, sql.ErrNoRows) || errors.Is(err, pgx.ErrNoRows) {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
if err := s.w.RetryTask(r.Context(), id, params.ByName("name")); err != nil {
log.Printf("s.w.RetryTask(_, %q): %v", id, err)
}
if err := s.w.Resume(r.Context(), id); err != nil {
log.Printf("s.w.Resume(_, %q): %v", id, err)
}
http.Redirect(w, r, s.BaseLink("/"), http.StatusSeeOther)
}
func (s *Server) retryTask(ctx context.Context, id uuid.UUID, name string) error {
return s.db.BeginFunc(ctx, func(tx pgx.Tx) error {
q := db.New(tx)
wf, err := q.Workflow(ctx, id)
if err != nil {
return fmt.Errorf("q.Workflow: %w", err)
}
task, err := q.Task(ctx, db.TaskParams{WorkflowID: id, Name: name})
if err != nil {
return fmt.Errorf("q.Task: %w", err)
}
if _, err := q.ResetTask(ctx, db.ResetTaskParams{WorkflowID: id, Name: name, UpdatedAt: time.Now()}); err != nil {
return fmt.Errorf("q.ResetTask: %w", err)
}
if _, err := q.ResetWorkflow(ctx, db.ResetWorkflowParams{ID: id, UpdatedAt: time.Now()}); err != nil {
return fmt.Errorf("q.ResetWorkflow: %w", err)
}
l := s.w.l.Logger(id, name)
l.Printf("task reset. Previous state: %#v", task)
l.Printf("workflow reset. Previous state: %#v", wf)
return nil
})
http.Redirect(w, r, s.BaseLink("/workflows", id.String()), http.StatusSeeOther)
}
func (s *Server) approveTaskHandler(w http.ResponseWriter, r *http.Request, params httprouter.Params) {

Просмотреть файл

@ -437,161 +437,6 @@ func TestServerBaseLink(t *testing.T) {
}
}
func TestServerRetryTaskHandler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hourAgo := time.Now().Add(-1 * time.Hour)
wfID := uuid.New()
unchangedWorkflow := db.Workflow{
ID: wfID,
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
Name: nullString(`echo`),
Finished: true,
Output: `{"some": "thing"}`,
Error: "internal explosion",
CreatedAt: hourAgo, // cmpopts.EquateApproxTime
UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
}
cases := []struct {
desc string
params map[string]string
wantCode int
wantHeaders map[string]string
wantWorkflows []db.Workflow
}{
{
desc: "no params",
wantCode: http.StatusNotFound,
wantWorkflows: []db.Workflow{unchangedWorkflow},
},
{
desc: "invalid workflow id",
params: map[string]string{"id": "invalid", "name": "greeting"},
wantCode: http.StatusBadRequest,
wantWorkflows: []db.Workflow{unchangedWorkflow},
},
{
desc: "wrong workflow id",
params: map[string]string{"id": uuid.New().String(), "name": "greeting"},
wantCode: http.StatusNotFound,
wantWorkflows: []db.Workflow{unchangedWorkflow},
},
{
desc: "invalid task name",
params: map[string]string{"id": wfID.String(), "name": "invalid"},
wantCode: http.StatusNotFound,
wantWorkflows: []db.Workflow{
{
ID: wfID,
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
Name: nullString(`echo`),
Finished: true,
Output: `{"some": "thing"}`,
Error: "internal explosion",
CreatedAt: hourAgo, // cmpopts.EquateApproxTime
UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
},
},
},
{
desc: "successful reset",
params: map[string]string{"id": wfID.String(), "name": "greeting"},
wantCode: http.StatusSeeOther,
wantHeaders: map[string]string{
"Location": "/",
},
wantWorkflows: []db.Workflow{
{
ID: wfID,
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
Name: nullString(`echo`),
Output: "{}",
CreatedAt: hourAgo, // cmpopts.EquateApproxTime
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
},
},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
p := testDB(ctx, t)
q := db.New(p)
wf := db.CreateWorkflowParams{
ID: wfID,
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
Name: nullString(`echo`),
CreatedAt: hourAgo,
UpdatedAt: hourAgo,
}
if _, err := q.CreateWorkflow(ctx, wf); err != nil {
t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
}
wff := db.WorkflowFinishedParams{
ID: wf.ID,
Finished: true,
Output: `{"some": "thing"}`,
Error: "internal explosion",
UpdatedAt: hourAgo,
}
if _, err := q.WorkflowFinished(ctx, wff); err != nil {
t.Fatalf("WorkflowFinished(_, %v) = _, %v, wanted no error", wff, err)
}
gtg := db.CreateTaskParams{
WorkflowID: wf.ID,
Name: "greeting",
Finished: true,
Error: nullString("internal explosion"),
CreatedAt: hourAgo,
UpdatedAt: hourAgo,
}
if _, err := q.CreateTask(ctx, gtg); err != nil {
t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", gtg, err)
}
fw := db.CreateTaskParams{
WorkflowID: wf.ID,
Name: "farewell",
Finished: true,
Error: nullString("internal explosion"),
CreatedAt: hourAgo,
UpdatedAt: hourAgo,
}
if _, err := q.CreateTask(ctx, fw); err != nil {
t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", fw, err)
}
req := httptest.NewRequest(http.MethodPost, path.Join("/workflows/", c.params["id"], "tasks", c.params["name"], "retry"), nil)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
rec := httptest.NewRecorder()
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil)
s.m.ServeHTTP(rec, req)
resp := rec.Result()
if resp.StatusCode != c.wantCode {
t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
}
for k, v := range c.wantHeaders {
if resp.Header.Get(k) != v {
t.Errorf("resp.Header.Get(%q) = %q, wanted %q", k, resp.Header.Get(k), v)
}
}
if c.wantCode == http.StatusBadRequest {
return
}
wfs, err := q.Workflows(ctx)
if err != nil {
t.Fatalf("q.Workflows() = %v, %v, wanted no error", wfs, err)
}
if diff := cmp.Diff(c.wantWorkflows, wfs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute)); diff != "" {
t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff)
}
})
}
}
func TestServerApproveTaskHandler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Просмотреть файл

@ -27,8 +27,6 @@ type Listener interface {
WorkflowFinished(ctx context.Context, workflowID uuid.UUID, outputs map[string]interface{}, err error) error
}
type stopFunc func()
// Worker runs workflows, and persists their state.
type Worker struct {
dh *DefinitionHolder
@ -43,7 +41,12 @@ type Worker struct {
// running is a set of currently running Workflow ids. Run uses
// this set to prevent starting a simultaneous execution of a
// currently running Workflow.
running map[string]stopFunc
running map[string]runningWorkflow
}
type runningWorkflow struct {
w *workflow.Workflow
stop func()
}
// NewWorker returns a Worker ready to accept and run workflows.
@ -54,7 +57,7 @@ func NewWorker(dh *DefinitionHolder, db db.PGDBTX, l Listener) *Worker {
l: l,
done: make(chan struct{}),
pending: make(chan *workflow.Workflow, 1),
running: make(map[string]stopFunc),
running: make(map[string]runningWorkflow),
}
}
@ -98,7 +101,7 @@ func (w *Worker) markRunning(wf *workflow.Workflow, stop func()) error {
if _, ok := w.running[wf.ID.String()]; ok {
return fmt.Errorf("workflow %q already running", wf.ID)
}
w.running[wf.ID.String()] = stop
w.running[wf.ID.String()] = runningWorkflow{wf, stop}
return nil
}
@ -111,11 +114,11 @@ func (w *Worker) markStopped(wf *workflow.Workflow) {
func (w *Worker) cancelWorkflow(id uuid.UUID) bool {
w.mu.Lock()
defer w.mu.Unlock()
stop, ok := w.running[id.String()]
rwf, ok := w.running[id.String()]
if !ok {
return ok
}
stop()
rwf.stop()
return ok
}
@ -236,3 +239,14 @@ func (w *Worker) Resume(ctx context.Context, id uuid.UUID) error {
}
return w.run(res)
}
// RetryTask retries a task in a running workflow.
func (w *Worker) RetryTask(ctx context.Context, id uuid.UUID, name string) error {
w.mu.Lock()
rwf, ok := w.running[id.String()]
w.mu.Unlock()
if ok {
return fmt.Errorf("no workflow with id %q", id)
}
return rwf.w.RetryTask(ctx, name)
}

Просмотреть файл

@ -222,13 +222,16 @@ func TestWorkflowResumeRetry(t *testing.T) {
counter++
}
}
if counter > 4 {
return "", nil
}
return "", errors.New("expected")
})
workflow.Output(wd, "nothing", nothing)
dh.RegisterDefinition(t.Name(), wd)
// Run the workflow. It will try the task 3 times and then fail; stop the
// worker during its second run, then resume it and verify the task retries.
// Run the workflow. It will try the task up to 3 times. Stop the worker
// during its second run, then resume it and verify the task retries.
go func() {
for i := 0; i < 3; i++ {
<-blockingChan
@ -267,9 +270,6 @@ func TestWorkflowResumeRetry(t *testing.T) {
t.Fatalf("w.Resume(_, %v) = %v, wanted no error", wfid, err)
}
<-wfDone
if counter-3 != 2 {
t.Fatalf("task sent %v more times, wanted 2", counter-3)
}
}
func newTestEchoWorkflow() *workflow.Definition {

Просмотреть файл

@ -73,12 +73,16 @@ func TestAwaitFunc(t *testing.T) {
t.Fatalf("workflow.Start(%v, %v) = %v, %v, wanted no error", wd, nil, w, err)
}
go func() {
outputs, err := runWorkflow(t, ctx, w, nil)
if diff := cmp.Diff(c.want, outputs); diff != "" {
t.Errorf("runWorkflow() mismatch (-want +got):\n%s", diff)
}
if (err != nil) != c.wantErr {
t.Errorf("runworkflow() = _, %v, wantErr: %v", err, c.wantErr)
if c.wantErr {
runToFailure(t, ctx, w, "AwaitFunc", &verboseListener{t, nil})
} else {
outputs, err := runWorkflow(t, ctx, w, nil)
if err != nil {
t.Errorf("runworkflow() = _, %v", err)
}
if diff := cmp.Diff(c.want, outputs); diff != "" {
t.Errorf("runWorkflow() mismatch (-want +got):\n%s", diff)
}
}
close(done)
}()

Просмотреть файл

@ -466,9 +466,10 @@ func (tr *taskResult[T]) dependencies() []*taskDefinition {
// A Workflow is an instantiated workflow instance, ready to run.
type Workflow struct {
ID uuid.UUID
def *Definition
params map[string]interface{}
ID uuid.UUID
def *Definition
params map[string]interface{}
retryCommands chan retryCommand
tasks map[*taskDefinition]*taskState
}
@ -515,10 +516,11 @@ func (t *taskState) toExported() *TaskState {
// Start instantiates a workflow with the given parameters.
func Start(def *Definition, params map[string]interface{}) (*Workflow, error) {
w := &Workflow{
ID: uuid.New(),
def: def,
params: params,
tasks: map[*taskDefinition]*taskState{},
ID: uuid.New(),
def: def,
params: params,
tasks: map[*taskDefinition]*taskState{},
retryCommands: make(chan retryCommand, len(def.tasks)),
}
if err := w.validate(); err != nil {
return nil, err
@ -574,10 +576,11 @@ func (w *Workflow) validate() error {
// need to be populated.
func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
w := &Workflow{
ID: state.ID,
def: def,
params: state.Params,
tasks: map[*taskDefinition]*taskState{},
ID: state.ID,
def: def,
params: state.Params,
tasks: map[*taskDefinition]*taskState{},
retryCommands: make(chan retryCommand, len(def.tasks)),
}
if err := w.validate(); err != nil {
return nil, err
@ -618,7 +621,7 @@ func unmarshalNew(t reflect.Type, data []byte) (interface{}, error) {
return ptr.Elem().Interface(), nil
}
// Run runs a workflow to completion or quiescence and returns its outputs.
// Run runs a workflow to completion and returns its outputs.
// listener.TaskStateChanged will be called immediately, when each task starts,
// and when they finish. It should be used only for monitoring and persistence
// purposes. Register Outputs to read task results.
@ -632,6 +635,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
}
stateChan := make(chan taskState, 2*len(w.def.tasks))
doneOnce := ctx.Done()
for {
// If we have all the outputs, the workflow is done.
outValues := map[string]interface{}{}
@ -670,19 +674,38 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
}
}
// Exit if we've run everything we can given errors.
// Honor context cancellation only after all tasks have exited.
if running == 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, fmt.Errorf("workflow has progressed as far as it can")
}
}
state := <-stateChan
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
w.tasks[state.def] = &state
select {
case state := <-stateChan:
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
w.tasks[state.def] = &state
case retry := <-w.retryCommands:
def, ok := w.def.tasks[retry.name]
if !ok {
retry.reply <- fmt.Errorf("unknown task %q", retry.name)
break
}
state := w.tasks[def]
if !state.finished || state.err == nil {
retry.reply <- fmt.Errorf("cannot retry task that did not finish in error")
break
}
listener.Logger(w.ID, def.name).Printf("Manual retry requested")
stateChan <- taskState{def: def, w: w}
retry.reply <- nil
// Don't get stuck when cancellation comes in after all tasks have
// finished, but also don't busy wait if something's still running.
case <-doneOnce:
doneOnce = nil
}
}
}
@ -747,3 +770,19 @@ func (s *defaultListener) Logger(_ uuid.UUID, task string) Logger {
type defaultLogger struct{}
func (l *defaultLogger) Printf(format string, v ...interface{}) {}
type retryCommand struct {
name string
reply chan error
}
func (w *Workflow) RetryTask(ctx context.Context, name string) error {
reply := make(chan error)
w.retryCommands <- retryCommand{name, reply}
select {
case err := <-reply:
return err
case <-ctx.Done():
return ctx.Err()
}
}

Просмотреть файл

@ -81,9 +81,8 @@ func TestDependencyError(t *testing.T) {
dep := wf.Action0(wd, "failing action", action)
wf.Output(wd, "output", wf.Task0(wd, "task", task, wf.After(dep)))
w := startWorkflow(t, wd, nil)
l := &verboseListener{t: t}
if _, err := w.Run(context.Background(), l); err == nil {
t.Errorf("workflow finished successfully, expected an error")
if got, want := runToFailure(t, w, "failing action"), "hardcoded error"; got != want {
t.Errorf("got error %q, want %q", got, want)
}
}
@ -109,22 +108,6 @@ func TestSub(t *testing.T) {
}
}
func TestStuck(t *testing.T) {
fail := func(context.Context) (string, error) {
return "", fmt.Errorf("goodbye world")
}
wd := wf.New()
nothing := wf.Task0(wd, "fail", fail)
wf.Output(wd, "nothing", nothing)
w := startWorkflow(t, wd, nil)
_, err := w.Run(context.Background(), &verboseListener{t: t})
if err == nil || !strings.Contains(err.Error(), "as far as it can") {
t.Errorf("Run of stuck workflow = %v, wanted it to give up early", err)
}
}
func TestSplitJoin(t *testing.T) {
echo := func(ctx context.Context, arg string) (string, error) {
return arg, nil
@ -177,11 +160,7 @@ func TestParallelism(t *testing.T) {
wf.Output(wd, "out2", out2)
w := startWorkflow(t, wd, nil)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := w.Run(ctx, &verboseListener{t}); err != nil {
t.Fatal(err)
}
runWorkflow(t, w, nil)
}
func TestParameters(t *testing.T) {
@ -232,9 +211,41 @@ func TestParameterValue(t *testing.T) {
}
}
func TestRetry(t *testing.T) {
func TestManualRetry(t *testing.T) {
counter := 0
needsRetry := func(ctx context.Context) (string, error) {
needsRetry := func(ctx *wf.TaskContext) (string, error) {
ctx.DisableRetries()
counter++
if counter == 1 {
return "", fmt.Errorf("counter %v too low", counter)
}
return "hi", nil
}
wd := wf.New()
wf.Output(wd, "result", wf.Task0(wd, "needs retry", needsRetry))
w := startWorkflow(t, wd, nil)
retry := func(string) {
go func() {
w.RetryTask(context.Background(), "needs retry")
}()
}
listener := &errorListener{
taskName: "needs retry",
callback: retry,
Listener: &verboseListener{t},
}
runWorkflow(t, w, listener)
if counter != 2 {
t.Errorf("task ran %v times, wanted 2", counter)
}
}
func TestAutomaticRetry(t *testing.T) {
counter := 0
needsRetry := func(ctx *wf.TaskContext) (string, error) {
if counter < 2 {
counter++
return "", fmt.Errorf("counter %v too low", counter)
@ -255,7 +266,7 @@ func TestRetry(t *testing.T) {
}
}
func TestRetryDisabled(t *testing.T) {
func TestAutomaticRetryDisabled(t *testing.T) {
counter := 0
noRetry := func(ctx *wf.TaskContext) (string, error) {
ctx.DisableRetries()
@ -267,11 +278,9 @@ func TestRetryDisabled(t *testing.T) {
wf.Output(wd, "result", wf.Task0(wd, "no retry", noRetry))
w := startWorkflow(t, wd, nil)
_, err := w.Run(context.Background(), &verboseListener{t: t})
if err == nil || !strings.Contains(err.Error(), "as far as it can") {
t.Errorf("Run of failing workflow = %v, wanted it to fail", err)
if got, want := runToFailure(t, w, "no retry"), "do not pass go"; got != want {
t.Errorf("got error %q, want %q", got, want)
}
if counter != 1 {
t.Errorf("task with retries disabled ran %v times, wanted 1", counter)
}
@ -315,9 +324,12 @@ func testWatchdog(t *testing.T, success bool) {
wf.Output(wd, "result", wf.Task0(wd, "sleepy", maybeLog))
w := startWorkflow(t, wd, nil)
_, err := w.Run(context.Background(), &verboseListener{t: t})
if err == nil != success {
t.Errorf("got error %v, wanted success: %v", err, success)
if success {
runWorkflow(t, w, nil)
} else {
if got, want := runToFailure(t, w, "sleepy"), "assumed hung"; !strings.Contains(got, want) {
t.Errorf("got error %q, want %q", got, want)
}
}
}
@ -438,8 +450,8 @@ func TestBadMarshaling(t *testing.T) {
wd := wf.New()
wf.Output(wd, "greeting", wf.Task0(wd, "greet", greet))
w := startWorkflow(t, wd, nil)
if _, err := w.Run(context.Background(), &verboseListener{t}); err == nil {
t.Errorf("running a workflow with bad JSON should give an error, got none")
if got, want := runToFailure(t, w, "greet"), "JSON marshaling"; !strings.Contains(got, want) {
t.Errorf("got error %q, want %q", got, want)
}
}
@ -517,3 +529,38 @@ type testLogger struct {
func (l *testLogger) Printf(format string, v ...interface{}) {
l.t.Logf("task %-10v: LOG: %s", l.task, fmt.Sprintf(format, v...))
}
func runToFailure(t *testing.T, w *wf.Workflow, task string) string {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
t.Helper()
var message string
listener := &errorListener{
taskName: task,
callback: func(m string) {
message = m
// Allow other tasks to run before shutting down the workflow.
time.AfterFunc(50*time.Millisecond, cancel)
},
Listener: &verboseListener{t},
}
_, err := w.Run(ctx, listener)
if err == nil {
t.Fatalf("workflow unexpectedly succeeded")
}
return message
}
type errorListener struct {
taskName string
callback func(string)
wf.Listener
}
func (l *errorListener) TaskStateChanged(id uuid.UUID, taskID string, st *wf.TaskState) error {
if st.Name == l.taskName && st.Finished && st.Error != "" {
l.callback(st.Error)
}
l.Listener.TaskStateChanged(id, taskID, st)
return nil
}