internal: track starting of tasks

Update exported TaskState to include a "started" boolean. Record the
task started state in the database and show a different icon in the UI.

Tracking started enables several improvements. This change improves
resuming of workflows to mark tasks that started without finishing as
incomplete. This will prevent workflows resuming from an unknown state,
where a task may not be safe to resume by default, such as sending a
public announcement.

Also, this change makes minor tweaks to the UI for task ordering,
putting the most recently updated tasks at the top of the list. It also
fixes a minor but confusing bug around retrying "approval" tasks.

Updates golang/go#53382

Change-Id: Icff1c0df541a6e11a7bb0ab55beef60cd18a074a
Reviewed-on: https://go-review.googlesource.com/c/build/+/417221
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Jenny Rakoczy <jenny@golang.org>
Auto-Submit: Jenny Rakoczy <jenny@golang.org>
This commit is contained in:
Alex Rakoczy 2022-07-12 18:11:43 -04:00 коммит произвёл Jenny Rakoczy
Родитель 78590fbed9
Коммит 876116a7e6
13 изменённых файлов: 143 добавлений и 56 удалений

Просмотреть файл

@ -21,6 +21,7 @@ type Task struct {
UpdatedAt time.Time
ApprovedAt sql.NullTime
ReadyForApproval bool
Started bool
}
type TaskLog struct {

Просмотреть файл

@ -19,7 +19,7 @@ SET approved_at = $3,
updated_at = $3
WHERE workflow_id = $1
AND name = $2
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval, started
`
type ApproveTaskParams struct {
@ -41,14 +41,16 @@ func (q *Queries) ApproveTask(ctx context.Context, arg ApproveTaskParams) (Task,
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
)
return i, err
}
const createTask = `-- name: CreateTask :one
INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval)
INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at, approved_at,
ready_for_approval)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval, started
`
type CreateTaskParams struct {
@ -86,6 +88,7 @@ func (q *Queries) CreateTask(ctx context.Context, arg CreateTaskParams) (Task, e
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
)
return i, err
}
@ -154,13 +157,15 @@ func (q *Queries) CreateWorkflow(ctx context.Context, arg CreateWorkflowParams)
const resetTask = `-- name: ResetTask :one
UPDATE tasks
SET finished = FALSE,
result = DEFAULT,
error = DEFAULT,
updated_at = $3
SET finished = FALSE,
started = FALSE,
approved_at = DEFAULT,
result = DEFAULT,
error = DEFAULT,
updated_at = $3
WHERE workflow_id = $1
AND name = $2
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval, started
`
type ResetTaskParams struct {
@ -182,6 +187,7 @@ func (q *Queries) ResetTask(ctx context.Context, arg ResetTaskParams) (Task, err
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
)
return i, err
}
@ -218,7 +224,7 @@ func (q *Queries) ResetWorkflow(ctx context.Context, arg ResetWorkflowParams) (W
}
const task = `-- name: Task :one
SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at, tasks.approved_at, tasks.ready_for_approval
SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at, tasks.approved_at, tasks.ready_for_approval, tasks.started
FROM tasks
WHERE workflow_id = $1
AND name = $2
@ -243,6 +249,7 @@ func (q *Queries) Task(ctx context.Context, arg TaskParams) (Task, error) {
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
)
return i, err
}
@ -321,20 +328,42 @@ func (q *Queries) TaskLogsForTask(ctx context.Context, arg TaskLogsForTaskParams
}
const tasks = `-- name: Tasks :many
SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at, tasks.approved_at, tasks.ready_for_approval
WITH most_recent_logs AS (
SELECT workflow_id, task_name, MAX(updated_at) AS updated_at
FROM task_logs
GROUP BY workflow_id, task_name
)
SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at, tasks.approved_at, tasks.ready_for_approval, tasks.started,
GREATEST(most_recent_logs.updated_at, tasks.updated_at)::timestamptz AS most_recent_update
FROM tasks
ORDER BY updated_at
LEFT JOIN most_recent_logs ON tasks.workflow_id = most_recent_logs.workflow_id AND
tasks.name = most_recent_logs.task_name
ORDER BY most_recent_update DESC
`
func (q *Queries) Tasks(ctx context.Context) ([]Task, error) {
type TasksRow struct {
WorkflowID uuid.UUID
Name string
Finished bool
Result sql.NullString
Error sql.NullString
CreatedAt time.Time
UpdatedAt time.Time
ApprovedAt sql.NullTime
ReadyForApproval bool
Started bool
MostRecentUpdate time.Time
}
func (q *Queries) Tasks(ctx context.Context) ([]TasksRow, error) {
rows, err := q.db.Query(ctx, tasks)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Task
var items []TasksRow
for rows.Next() {
var i Task
var i TasksRow
if err := rows.Scan(
&i.WorkflowID,
&i.Name,
@ -345,6 +374,8 @@ func (q *Queries) Tasks(ctx context.Context) ([]Task, error) {
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
&i.MostRecentUpdate,
); err != nil {
return nil, err
}
@ -357,7 +388,7 @@ func (q *Queries) Tasks(ctx context.Context) ([]Task, error) {
}
const tasksForWorkflow = `-- name: TasksForWorkflow :many
SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at, tasks.approved_at, tasks.ready_for_approval
SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at, tasks.approved_at, tasks.ready_for_approval, tasks.started
FROM tasks
WHERE workflow_id = $1
ORDER BY created_at
@ -382,6 +413,7 @@ func (q *Queries) TasksForWorkflow(ctx context.Context, workflowID uuid.UUID) ([
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
); err != nil {
return nil, err
}
@ -433,7 +465,7 @@ UPDATE tasks
SET ready_for_approval = $3
WHERE workflow_id = $1
AND name = $2
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval, started
`
type UpdateTaskReadyForApprovalParams struct {
@ -455,26 +487,29 @@ func (q *Queries) UpdateTaskReadyForApproval(ctx context.Context, arg UpdateTask
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
)
return i, err
}
const upsertTask = `-- name: UpsertTask :one
INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO tasks (workflow_id, name, started, finished, result, error, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (workflow_id, name) DO UPDATE
SET workflow_id = excluded.workflow_id,
name = excluded.name,
started = excluded.started,
finished = excluded.finished,
result = excluded.result,
error = excluded.error,
updated_at = excluded.updated_at
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval
RETURNING workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval, started
`
type UpsertTaskParams struct {
WorkflowID uuid.UUID
Name string
Started bool
Finished bool
Result sql.NullString
Error sql.NullString
@ -486,6 +521,7 @@ func (q *Queries) UpsertTask(ctx context.Context, arg UpsertTaskParams) (Task, e
row := q.db.QueryRow(ctx, upsertTask,
arg.WorkflowID,
arg.Name,
arg.Started,
arg.Finished,
arg.Result,
arg.Error,
@ -503,6 +539,7 @@ func (q *Queries) UpsertTask(ctx context.Context, arg UpsertTaskParams) (Task, e
&i.UpdatedAt,
&i.ApprovedAt,
&i.ReadyForApproval,
&i.Started,
)
return i, err
}

Просмотреть файл

@ -45,6 +45,7 @@ func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, sta
_, err := q.UpsertTask(ctx, db.UpsertTaskParams{
WorkflowID: workflowID,
Name: taskName,
Started: state.Started,
Finished: state.Finished,
Result: sql.NullString{String: string(result), Valid: len(result) > 0},
Error: sql.NullString{String: state.Error, Valid: state.Error != ""},

Просмотреть файл

@ -0,0 +1,6 @@
-- Copyright 2022 The Go Authors. All rights reserved.
-- Use of this source code is governed by a BSD-style
-- license that can be found in the LICENSE file.
ALTER TABLE tasks
DROP COLUMN started;

Просмотреть файл

@ -0,0 +1,6 @@
-- Copyright 2022 The Go Authors. All rights reserved.
-- Use of this source code is governed by a BSD-style
-- license that can be found in the LICENSE file.
ALTER TABLE tasks
ADD COLUMN started bool NOT NULL DEFAULT FALSE;

Просмотреть файл

@ -18,16 +18,18 @@ VALUES ($1, $2, $3, $4, $5)
RETURNING *;
-- name: CreateTask :one
INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at, approved_at, ready_for_approval)
INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at, approved_at,
ready_for_approval)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING *;
-- name: UpsertTask :one
INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO tasks (workflow_id, name, started, finished, result, error, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (workflow_id, name) DO UPDATE
SET workflow_id = excluded.workflow_id,
name = excluded.name,
started = excluded.started,
finished = excluded.finished,
result = excluded.result,
error = excluded.error,
@ -35,9 +37,17 @@ ON CONFLICT (workflow_id, name) DO UPDATE
RETURNING *;
-- name: Tasks :many
SELECT tasks.*
WITH most_recent_logs AS (
SELECT workflow_id, task_name, MAX(updated_at) AS updated_at
FROM task_logs
GROUP BY workflow_id, task_name
)
SELECT tasks.*,
GREATEST(most_recent_logs.updated_at, tasks.updated_at)::timestamptz AS most_recent_update
FROM tasks
ORDER BY updated_at;
LEFT JOIN most_recent_logs ON tasks.workflow_id = most_recent_logs.workflow_id AND
tasks.name = most_recent_logs.task_name
ORDER BY most_recent_update DESC;
-- name: TasksForWorkflow :many
SELECT tasks.*
@ -85,10 +95,12 @@ RETURNING *;
-- name: ResetTask :one
UPDATE tasks
SET finished = FALSE,
result = DEFAULT,
error = DEFAULT,
updated_at = $3
SET finished = FALSE,
started = FALSE,
approved_at = DEFAULT,
result = DEFAULT,
error = DEFAULT,
updated_at = $3
WHERE workflow_id = $1
AND name = $2
RETURNING *;

Просмотреть файл

@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" enable-background="new 0 0 24 24" height="24px" viewBox="0 0 24 24" width="24px" fill="#bdc1c6"><g><rect fill="none" height="24" width="24"/></g><g><path d="M12,2C6.48,2,2,6.48,2,12c0,5.52,4.48,10,10,10s10-4.48,10-10C22,6.48,17.52,2,12,2z M7,13.5c-0.83,0-1.5-0.67-1.5-1.5 c0-0.83,0.67-1.5,1.5-1.5s1.5,0.67,1.5,1.5C8.5,12.83,7.83,13.5,7,13.5z M12,13.5c-0.83,0-1.5-0.67-1.5-1.5 c0-0.83,0.67-1.5,1.5-1.5s1.5,0.67,1.5,1.5C13.5,12.83,12.83,13.5,12,13.5z M17,13.5c-0.83,0-1.5-0.67-1.5-1.5 c0-0.83,0.67-1.5,1.5-1.5s1.5,0.67,1.5,1.5C18.5,12.83,17.83,13.5,17,13.5z"/></g></svg>

После

Ширина:  |  Высота:  |  Размер: 609 B

Просмотреть файл

@ -37,11 +37,16 @@
class="TaskList-itemStateIcon"
alt="finished"
src="{{baseLink "/static/images/check_circle_green_24dp.svg"}}" />
{{else if $task.Started}}
<img
class="TaskList-itemStateIcon"
alt="started"
src="{{baseLink "/static/images/pending_yellow_24dp.svg"}}" />
{{else}}
<img
class="TaskList-itemStateIcon"
alt="pending"
src="{{baseLink "/static/images/pending_yellow_24dp.svg"}}" />
src="{{baseLink "/static/images/pending_grey_24dp.svg"}}" />
{{end}}
</td>
<td class="TaskList-itemCol TaskList-itemName">
@ -51,7 +56,7 @@
{{$task.CreatedAt.UTC.Format "Mon Jan _2 2006 15:04:05"}}
</td>
<td class="TaskList-itemCol TaskList-itemUpdated">
{{$task.UpdatedAt.UTC.Format "Mon Jan _2 2006 15:04:05"}}
{{$task.MostRecentUpdate.UTC.Format "Mon Jan _2 2006 15:04:05"}}
</td>
<td class="TaskList-itemCol TaskList-itemResult">
{{if $task.ApprovedAt.Valid}}
@ -68,8 +73,7 @@
<input class="Button Button--small" name="task.reset" type="submit" value="Retry" onclick="return this.form.reportValidity() && confirm('This will retry the task and clear workflow errors.\n\nReady to proceed?')" />
</form>
</div>
{{end}}
{{if and (not $task.ApprovedAt.Valid) ($task.ReadyForApproval)}}
{{else if and (not $task.ApprovedAt.Valid) ($task.ReadyForApproval)}}
<div class="TaskList-approveTask">
<form action="{{baseLink (printf "/workflows/%s/tasks/%s/approve" $workflow.ID $task.Name)}}" method="post">
<input type="hidden" id="workflow.id" name="workflow.id" value="{{$workflow.ID}}" />

Просмотреть файл

@ -139,7 +139,7 @@ func (s *Server) BaseLink(target string) string {
type workflowDetail struct {
Workflow db.Workflow
Tasks []db.Task
Tasks []db.TasksRow
// TaskLogs is a map of all logs for a db.Task, keyed on
// (db.Task).Name
TaskLogs map[string][]db.TaskLog

Просмотреть файл

@ -192,14 +192,25 @@ func (w *Worker) Resume(ctx context.Context, id uuid.UUID) error {
}
taskStates := make(map[string]*workflow.TaskState)
for _, t := range tasks {
taskStates[t.Name] = &workflow.TaskState{
ts := &workflow.TaskState{
Name: t.Name,
Finished: t.Finished,
Error: t.Error.String,
}
if t.Result.Valid {
taskStates[t.Name].SerializedResult = []byte(t.Result.String)
// The worker may have crashed, or been re-deployed. Any
// started but unfinished tasks are in an unknown state.
// Mark them as such for human review.
if t.Started && !t.Finished {
ts.Finished = true
ts.Error = "task interrupted before completion"
if t.Error.Valid {
ts.Error = fmt.Sprintf("%s. Previous error: %s", ts.Error, t.Error.String)
}
}
if t.Result.Valid {
ts.SerializedResult = []byte(t.Result.String)
}
taskStates[t.Name] = ts
}
res, err := workflow.Resume(d, state, taskStates)
if err != nil {

Просмотреть файл

@ -68,6 +68,7 @@ func TestWorkerStartWorkflow(t *testing.T) {
{
WorkflowID: wfid,
Name: "echo",
Started: true,
Finished: true,
Result: nullString(`"greetings"`),
Error: sql.NullString{},
@ -110,6 +111,7 @@ func TestWorkerResume(t *testing.T) {
want := []db.Task{{
WorkflowID: wfid,
Name: "echo",
Started: true,
Finished: true,
Result: nullString(`"hello"`),
Error: sql.NullString{},
@ -166,27 +168,31 @@ func TestWorkflowResumeAll(t *testing.T) {
if err != nil {
t.Fatalf("q.Tasks() = %v, %v, wanted no error", tasks, err)
}
want := []db.Task{
want := []db.TasksRow{
{
WorkflowID: wfid1,
Name: "echo",
Finished: true,
Result: nullString(`"hello"`),
Error: sql.NullString{},
CreatedAt: time.Now(), // cmpopts.EquateApproxTime
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
WorkflowID: wfid1,
Name: "echo",
Started: true,
Finished: true,
Result: nullString(`"hello"`),
Error: sql.NullString{},
CreatedAt: time.Now(), // cmpopts.EquateApproxTime
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
MostRecentUpdate: time.Now(),
},
{
WorkflowID: wfid2,
Name: "echo",
Finished: true,
Result: nullString(`"hello"`),
Error: sql.NullString{},
CreatedAt: time.Now(), // cmpopts.EquateApproxTime
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
WorkflowID: wfid2,
Name: "echo",
Started: true,
Finished: true,
Result: nullString(`"hello"`),
Error: sql.NullString{},
CreatedAt: time.Now(), // cmpopts.EquateApproxTime
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
MostRecentUpdate: time.Now(),
},
}
sort := cmpopts.SortSlices(func(x db.Task, y db.Task) bool {
sort := cmpopts.SortSlices(func(x, y db.TasksRow) bool {
return x.WorkflowID.String() < y.WorkflowID.String()
})
if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute), sort); diff != "" {

Просмотреть файл

@ -346,6 +346,7 @@ type Listener interface {
// is true, either Result or Error will be populated.
type TaskState struct {
Name string
Started bool
Finished bool
Result interface{}
SerializedResult []byte
@ -425,6 +426,7 @@ func (t *taskState) toExported() *TaskState {
Finished: t.finished,
Result: t.result,
SerializedResult: append([]byte(nil), t.serializedResult...),
Started: t.started,
}
if t.err != nil {
state.Error = t.err.Error()

Просмотреть файл

@ -318,8 +318,8 @@ func TestResume(t *testing.T) {
t.Fatalf("canceled workflow returned error %v, wanted Canceled", err)
}
storage.assertState(t, w, map[string]*workflow.TaskState{
"run once": {Name: "run once", Finished: true, Result: "ran"},
"block": {Name: "block", Finished: true, Error: "context canceled"}, // We cancelled the workflow before it could save its state.
"run once": {Name: "run once", Started: true, Finished: true, Result: "ran"},
"block": {Name: "block", Started: true, Finished: true, Error: "context canceled"}, // We cancelled the workflow before it could save its state.
})
block = false
@ -338,8 +338,8 @@ func TestResume(t *testing.T) {
t.Errorf("runOnlyOnce ran %v times, wanted 1", runs)
}
storage.assertState(t, w, map[string]*workflow.TaskState{
"run once": {Name: "run once", Finished: true, Result: "ran"},
"block": {Name: "block", Finished: true, Result: "not blocked"},
"run once": {Name: "run once", Started: true, Finished: true, Result: "ran"},
"block": {Name: "block", Started: true, Finished: true, Result: "not blocked"},
})
}