* attempt to implement waitAll

* WaitAll with options

* cover context cancel case in WaitAll

* update test comments

* waitAll

* continueWith

* test continue with

* unittest for ContinueWith

* tweaks
This commit is contained in:
Haitao Chen 2020-10-06 00:27:33 -07:00 коммит произвёл GitHub
Родитель 84f3eb2f19
Коммит e0e3198f1a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 95 добавлений и 0 удалений

18
continue_with.go Normal file
Просмотреть файл

@ -0,0 +1,18 @@
package asynctask
import "context"
// ContinueFunc is a function that can be connected to previous task with ContinueWith
type ContinueFunc func(context.Context, interface{}) (interface{}, error)
// ContinueWith start the function when current task is done.
// result from previous task will be passed in, if no error.
func (tsk *TaskStatus) ContinueWith(ctx context.Context, next ContinueFunc) *TaskStatus {
return Start(ctx, func(fCtx context.Context) (interface{}, error) {
result, err := tsk.Wait(fCtx)
if err != nil {
return nil, err
}
return next(fCtx, result)
})
}

77
continue_with_test.go Normal file
Просмотреть файл

@ -0,0 +1,77 @@
package asynctask_test
import (
"context"
"testing"
"time"
"github.com/Azure/go-asynctask"
"github.com/stretchr/testify/assert"
)
func getAdvancedCountingTask(countFrom, step int, sleepInterval time.Duration) asynctask.AsyncFunc {
return func(ctx context.Context) (interface{}, error) {
t := ctx.Value(testContextKey).(*testing.T)
result := countFrom
for i := 0; i < step; i++ {
select {
case <-time.After(sleepInterval):
t.Logf(" working %d", i)
result++
case <-ctx.Done():
t.Log("work canceled")
return result, nil
}
}
return result, nil
}
}
func TestContinueWith(t *testing.T) {
t.Parallel()
ctx := newTestContext(t)
t1 := asynctask.Start(ctx, getAdvancedCountingTask(0, 10, 20*time.Millisecond))
t2 := t1.ContinueWith(ctx, func(fCtx context.Context, input interface{}) (interface{}, error) {
fromPrevTsk := input.(int)
return getAdvancedCountingTask(fromPrevTsk, 10, 20*time.Millisecond)(fCtx)
})
t3 := t1.ContinueWith(ctx, func(fCtx context.Context, input interface{}) (interface{}, error) {
fromPrevTsk := input.(int)
return getAdvancedCountingTask(fromPrevTsk, 12, 20*time.Millisecond)(fCtx)
})
result, err := t2.Wait(ctx)
assert.NoError(t, err)
assert.Equal(t, asynctask.StateCompleted, t2.State(), "Task should complete with no error")
assert.Equal(t, result, 20)
result, err = t3.Wait(ctx)
assert.NoError(t, err)
assert.Equal(t, asynctask.StateCompleted, t3.State(), "Task should complete with no error")
assert.Equal(t, result, 22)
}
func TestContinueWithFailureCase(t *testing.T) {
t.Parallel()
ctx := newTestContext(t)
t1 := asynctask.Start(ctx, getErrorTask("devide by 0", 10*time.Millisecond))
t2 := t1.ContinueWith(ctx, func(fCtx context.Context, input interface{}) (interface{}, error) {
fromPrevTsk := input.(int)
return getAdvancedCountingTask(fromPrevTsk, 10, 20*time.Millisecond)(fCtx)
})
t3 := t1.ContinueWith(ctx, func(fCtx context.Context, input interface{}) (interface{}, error) {
fromPrevTsk := input.(int)
return getAdvancedCountingTask(fromPrevTsk, 12, 20*time.Millisecond)(fCtx)
})
_, err := t2.Wait(ctx)
assert.Error(t, err)
assert.Equal(t, asynctask.StateFailed, t2.State(), "Task2 should fail since preceeding task failed")
assert.Equal(t, "devide by 0", err.Error())
_, err = t3.Wait(ctx)
assert.Error(t, err)
assert.Equal(t, asynctask.StateFailed, t3.State(), "Task3 should fail since preceeding task failed")
assert.Equal(t, "devide by 0", err.Error())
}