зеркало из https://github.com/golang/build.git
internal/relui: email on schedule failure
The user who scheduled a workflow may not be closely watching at the time of failure. This adds functionality to send an email if a scheduled workflow cannot progress. The current implementation logs on stall, but it's trivial to configure to email as some tasks do. For golang/go#54476 Change-Id: Id0deefd3c1b07f569585600a583ba4e04f8f7be1 Reviewed-on: https://go-review.googlesource.com/c/build/+/444695 Run-TryBot: Jenny Rakoczy <jenny@golang.org> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Heschi Kreinick <heschi@google.com>
This commit is contained in:
Родитель
3dfc64c7bd
Коммит
67d624370c
|
@ -73,6 +73,10 @@ func main() {
|
|||
addressVarFlag(&annMail.From, "announce-mail-from", "The From address to use for the (pre-)announcement mail.")
|
||||
addressVarFlag(&annMail.To, "announce-mail-to", "The To address to use for the (pre-)announcement mail.")
|
||||
addressListVarFlag(&annMail.BCC, "announce-mail-bcc", "The BCC address list to use for the (pre-)announcement mail.")
|
||||
var schedMail task.MailHeader
|
||||
addressVarFlag(&schedMail.From, "schedule-mail-from", "The From address to use for the scheduled workflow failure mail.")
|
||||
addressVarFlag(&schedMail.To, "schedule-mail-to", "The To address to use for the the scheduled workflow failure mail.")
|
||||
addressListVarFlag(&schedMail.BCC, "schedule-mail-bcc", "The BCC address list to use for the scheduled workflow failure mail.")
|
||||
var twitterAPI secret.TwitterCredentials
|
||||
secret.JSONVarFlag(&twitterAPI, "twitter-api-secret", "Twitter API secret to use for workflows involving tweeting.")
|
||||
masterKey := secret.Flag("builder-master-key", "Builder master key")
|
||||
|
@ -212,11 +216,6 @@ func main() {
|
|||
}
|
||||
dh.RegisterDefinition("Tag x/ repos", tagTasks.NewDefinition())
|
||||
|
||||
w := relui.NewWorker(dh, dbPool, relui.NewPGListener(dbPool))
|
||||
go w.Run(ctx)
|
||||
if err := w.ResumeAll(ctx); err != nil {
|
||||
log.Printf("w.ResumeAll() = %v", err)
|
||||
}
|
||||
var base *url.URL
|
||||
if *baseURL != "" {
|
||||
base, err = url.Parse(*baseURL)
|
||||
|
@ -224,6 +223,17 @@ func main() {
|
|||
log.Fatalf("url.Parse(%q) = %v, %v", *baseURL, base, err)
|
||||
}
|
||||
}
|
||||
l := &relui.PGListener{
|
||||
DB: dbPool,
|
||||
BaseURL: base,
|
||||
ScheduleFailureMailHeader: schedMail,
|
||||
SendMail: relui.LogOnlyMailer,
|
||||
}
|
||||
w := relui.NewWorker(dh, dbPool, l)
|
||||
go w.Run(ctx)
|
||||
if err := w.ResumeAll(ctx); err != nil {
|
||||
log.Printf("w.ResumeAll() = %v", err)
|
||||
}
|
||||
s := relui.NewServer(dbPool, w, base, siteHeader, ms)
|
||||
log.Fatalln(https.ListenAndServe(ctx, &ochttp.Handler{Handler: GRPCHandler(grpcServer, s)}))
|
||||
}
|
||||
|
|
|
@ -544,6 +544,11 @@ type verboseListener struct {
|
|||
outputListener func(string, interface{})
|
||||
}
|
||||
|
||||
func (l *verboseListener) WorkflowStalled(workflowID uuid.UUID) error {
|
||||
l.t.Logf("workflow %q: stalled", workflowID.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *workflow.TaskState) error {
|
||||
switch {
|
||||
case !st.Finished:
|
||||
|
|
|
@ -4,7 +4,9 @@
|
|||
|
||||
package relui
|
||||
|
||||
import "embed"
|
||||
import (
|
||||
"embed"
|
||||
)
|
||||
|
||||
// static is our static web server content.
|
||||
//
|
||||
|
|
|
@ -5,26 +5,51 @@
|
|||
package relui
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"log"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v4"
|
||||
"golang.org/x/build/internal/relui/db"
|
||||
"golang.org/x/build/internal/task"
|
||||
"golang.org/x/build/internal/workflow"
|
||||
)
|
||||
|
||||
func NewPGListener(db db.PGDBTX) *PGListener {
|
||||
return &PGListener{db}
|
||||
}
|
||||
|
||||
// PGListener implements workflow.Listener for recording workflow state.
|
||||
type PGListener struct {
|
||||
db db.PGDBTX
|
||||
DB db.PGDBTX
|
||||
|
||||
BaseURL *url.URL
|
||||
|
||||
ScheduleFailureMailHeader task.MailHeader
|
||||
SendMail func(task.MailHeader, task.MailContent) error
|
||||
|
||||
templ *template.Template
|
||||
}
|
||||
|
||||
// WorkflowStalled is called when no tasks are runnable.
|
||||
func (l *PGListener) WorkflowStalled(workflowID uuid.UUID) error {
|
||||
wf, err := db.New(l.DB).Workflow(context.Background(), workflowID)
|
||||
if err != nil || wf.ScheduleID.Int32 == 0 {
|
||||
return err
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
body := scheduledFailureEmailBody{Workflow: wf}
|
||||
if err := l.template("scheduled_workflow_failure_email.txt").Execute(&buf, body); err != nil {
|
||||
log.Printf("WorkflowFinished: Execute(_, %v) = %q", body, err)
|
||||
return err
|
||||
}
|
||||
return l.SendMail(l.ScheduleFailureMailHeader, task.MailContent{
|
||||
Subject: fmt.Sprintf("[relui] Scheduled workflow %q failed", wf.Name.String),
|
||||
BodyText: buf.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// TaskStateChanged is called whenever a task is updated by the
|
||||
|
@ -38,7 +63,7 @@ func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, sta
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.db.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||||
err = l.DB.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||||
q := db.New(tx)
|
||||
updated := time.Now()
|
||||
_, err := q.UpsertTask(ctx, db.UpsertTaskParams{
|
||||
|
@ -62,7 +87,7 @@ func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, sta
|
|||
|
||||
// WorkflowStarted persists a new workflow execution in the database.
|
||||
func (l *PGListener) WorkflowStarted(ctx context.Context, workflowID uuid.UUID, name string, params map[string]interface{}, scheduleID int) error {
|
||||
q := db.New(l.db)
|
||||
q := db.New(l.DB)
|
||||
m, err := json.Marshal(params)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -80,11 +105,16 @@ func (l *PGListener) WorkflowStarted(ctx context.Context, workflowID uuid.UUID,
|
|||
return err
|
||||
}
|
||||
|
||||
type scheduledFailureEmailBody struct {
|
||||
Workflow db.Workflow
|
||||
Err error
|
||||
}
|
||||
|
||||
// WorkflowFinished saves the final state of a workflow after its run
|
||||
// has completed.
|
||||
func (l *PGListener) WorkflowFinished(ctx context.Context, workflowID uuid.UUID, outputs map[string]interface{}, workflowErr error) error {
|
||||
log.Printf("WorkflowCompleted(%q, %v, %q)", workflowID, outputs, workflowErr)
|
||||
q := db.New(l.db)
|
||||
log.Printf("WorkflowFinished(%q, %v, %q)", workflowID, outputs, workflowErr)
|
||||
q := db.New(l.DB)
|
||||
m, err := json.Marshal(outputs)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -102,9 +132,21 @@ func (l *PGListener) WorkflowFinished(ctx context.Context, workflowID uuid.UUID,
|
|||
return err
|
||||
}
|
||||
|
||||
func (l *PGListener) template(name string) *template.Template {
|
||||
if l.templ == nil {
|
||||
helpers := map[string]any{"baseLink": l.baseLink}
|
||||
l.templ = template.Must(template.New("").Funcs(helpers).ParseFS(templates, "templates/*.txt"))
|
||||
}
|
||||
return l.templ.Lookup(name)
|
||||
}
|
||||
|
||||
func (l *PGListener) baseLink(target string, extras ...string) string {
|
||||
return BaseLink(l.BaseURL)(target, extras...)
|
||||
}
|
||||
|
||||
func (l *PGListener) Logger(workflowID uuid.UUID, taskName string) workflow.Logger {
|
||||
return &postgresLogger{
|
||||
db: l.db,
|
||||
db: l.DB,
|
||||
workflowID: workflowID,
|
||||
taskName: taskName,
|
||||
}
|
||||
|
@ -136,3 +178,8 @@ func (l *postgresLogger) Printf(format string, v ...interface{}) {
|
|||
log.Printf("l.Printf(%q, %v) = %v", format, v, err)
|
||||
}
|
||||
}
|
||||
|
||||
func LogOnlyMailer(header task.MailHeader, content task.MailContent) error {
|
||||
log.Println("Logging but not sending mail:", header, content)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,9 @@ package relui
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/mail"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -14,6 +17,7 @@ import (
|
|||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/build/internal/relui/db"
|
||||
"golang.org/x/build/internal/task"
|
||||
"golang.org/x/build/internal/workflow"
|
||||
)
|
||||
|
||||
|
@ -76,7 +80,7 @@ func TestListenerTaskStateChanged(t *testing.T) {
|
|||
t.Fatalf("q.CreateWorkflow(%v, %v) = %v, wanted no error", ctx, wfp, err)
|
||||
}
|
||||
|
||||
l := &PGListener{db: dbp}
|
||||
l := &PGListener{DB: dbp}
|
||||
err = l.TaskStateChanged(wf.ID, "TestTask", c.state)
|
||||
if err != nil {
|
||||
t.Fatalf("l.TaskStateChanged(%v, %q, %v) = %v, wanted no error", wf.ID, "TestTask", c.state, err)
|
||||
|
@ -110,7 +114,7 @@ func TestListenerLogger(t *testing.T) {
|
|||
t.Fatalf("q.UpsertTask(%v, %v) = %v, wanted no error", ctx, params, err)
|
||||
}
|
||||
|
||||
l := &PGListener{db: dbp}
|
||||
l := &PGListener{DB: dbp}
|
||||
l.Logger(wf.ID, "TestTask").Printf("A fancy log line says %q", "hello")
|
||||
|
||||
logs, err := q.TaskLogs(ctx)
|
||||
|
@ -128,3 +132,99 @@ func TestListenerLogger(t *testing.T) {
|
|||
t.Errorf("q.TaskLogs(_, %q) mismatch (-want +got):\n%s", wf.ID, diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPGListenerWorkflowStalledNotification(t *testing.T) {
|
||||
cases := []struct {
|
||||
desc string
|
||||
schedule bool
|
||||
taskErr bool
|
||||
}{
|
||||
{
|
||||
desc: "scheduled workflow failure sends notification",
|
||||
schedule: true,
|
||||
taskErr: true,
|
||||
},
|
||||
{
|
||||
desc: "scheduled workflow success sends nothing",
|
||||
schedule: true,
|
||||
},
|
||||
{
|
||||
desc: "unscheduled workflow success sends nothing",
|
||||
},
|
||||
{
|
||||
desc: "unscheduled workflow failure sends nothing",
|
||||
taskErr: true,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.desc, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
p := testDB(ctx, t)
|
||||
var schedID int
|
||||
if c.schedule {
|
||||
sched, err := db.New(p).CreateSchedule(ctx, db.CreateScheduleParams{WorkflowName: c.desc})
|
||||
if err != nil {
|
||||
t.Fatalf("CreateSchedule() = %v, wanted no error", err)
|
||||
}
|
||||
schedID = int(sched.ID)
|
||||
}
|
||||
wd := workflow.New()
|
||||
complete := workflow.Task0(wd, "complete", func(ctx context.Context) (string, error) {
|
||||
if c.taskErr {
|
||||
return "", fmt.Errorf("c.taskErr: %t", c.taskErr)
|
||||
}
|
||||
return "done", nil
|
||||
})
|
||||
workflow.Output(wd, "finished", complete)
|
||||
dh := NewDefinitionHolder()
|
||||
dh.RegisterDefinition(c.desc, wd)
|
||||
|
||||
header := task.MailHeader{
|
||||
From: mail.Address{Address: "from-address@golang.test"},
|
||||
To: mail.Address{Address: "to-address@golang.test"},
|
||||
}
|
||||
var gotHeader task.MailHeader
|
||||
var gotContent task.MailContent
|
||||
pgl := &PGListener{
|
||||
DB: p,
|
||||
SendMail: func(h task.MailHeader, c task.MailContent) error {
|
||||
gotHeader, gotContent = h, c
|
||||
return nil
|
||||
},
|
||||
ScheduleFailureMailHeader: header,
|
||||
}
|
||||
listener := &testWorkflowListener{
|
||||
Listener: pgl,
|
||||
onFinished: cancel,
|
||||
}
|
||||
w := NewWorker(dh, p, listener)
|
||||
|
||||
id, err := w.StartWorkflow(ctx, c.desc, nil, schedID)
|
||||
if err != nil {
|
||||
t.Fatalf("w.StartWorkflow(_, %q, %v, %d) = %v, %v, wanted no error", c.desc, nil, schedID, id, err)
|
||||
}
|
||||
listener.onStalled = func() {
|
||||
w.cancelWorkflow(id)
|
||||
}
|
||||
if err := w.Run(ctx); !errors.Is(err, context.Canceled) {
|
||||
t.Errorf("w.Run() = %v, wanted %v", err, context.Canceled)
|
||||
}
|
||||
|
||||
wantSend := c.taskErr && c.schedule
|
||||
if (gotContent.Subject == "") == wantSend {
|
||||
t.Errorf("gotContent.Subject = %q, wanted empty: %t", gotContent.Subject, !c.taskErr)
|
||||
}
|
||||
if (gotContent.BodyText == "") == wantSend {
|
||||
t.Errorf("gotContent.BodyText = %q, wanted empty: %t", gotContent.BodyText, !c.taskErr)
|
||||
}
|
||||
var wantHeader task.MailHeader
|
||||
if wantSend {
|
||||
wantHeader = header
|
||||
}
|
||||
if diff := cmp.Diff(wantHeader, gotHeader); diff != "" {
|
||||
t.Errorf("WorkflowFinished(_, %q) mismatch (-want +got):\n%s", id, diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ func TestSchedulerCreate(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
p := testDB(ctx, t)
|
||||
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
|
||||
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
|
||||
row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
|
||||
if (err != nil) != c.wantErr {
|
||||
t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wantErr: %t", c.sched, c.workflowName, c.params, row, err, c.wantErr)
|
||||
|
@ -243,7 +243,7 @@ func TestSchedulerResume(t *testing.T) {
|
|||
defer cancel()
|
||||
p := testDB(ctx, t)
|
||||
q := db.New(p)
|
||||
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
|
||||
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
|
||||
|
||||
for _, csp := range c.scheds {
|
||||
if _, err := q.CreateSchedule(ctx, csp); err != nil {
|
||||
|
@ -294,7 +294,7 @@ func TestScheduleDelete(t *testing.T) {
|
|||
defer cancel()
|
||||
p := testDB(ctx, t)
|
||||
q := db.New(p)
|
||||
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
|
||||
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
|
||||
row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
|
||||
if err != nil {
|
||||
t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wanted no error", c.sched, c.workflowName, c.params, row, err)
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
{{- /*gotype: golang.org/x/build/internal/relui.scheduledFailureEmailBody*/ -}}
|
||||
The scheduled workflow "{{.Workflow.Name.String}}" ({{.Workflow.ID.String}}) failed at {{.Workflow.UpdatedAt.UTC.Format "2006/01/02 15:04 MST"}}.
|
||||
|
||||
See the following page for details:
|
||||
|
||||
{{baseLink "/workflows/" .Workflow.ID.String}}
|
|
@ -151,20 +151,26 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
s.m.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (s *Server) BaseLink(target string, extras ...string) string {
|
||||
u, err := url.Parse(target)
|
||||
if err != nil {
|
||||
log.Printf("BaseLink: url.Parse(%q) = %v, %v", target, u, err)
|
||||
return path.Join(append([]string{target}, extras...)...)
|
||||
}
|
||||
u.Path = path.Join(append([]string{u.Path}, extras...)...)
|
||||
if s.baseURL == nil || u.IsAbs() {
|
||||
func BaseLink(baseURL *url.URL) func(target string, extras ...string) string {
|
||||
return func(target string, extras ...string) string {
|
||||
u, err := url.Parse(target)
|
||||
if err != nil {
|
||||
log.Printf("BaseLink: url.Parse(%q) = %v, %v", target, u, err)
|
||||
return path.Join(append([]string{target}, extras...)...)
|
||||
}
|
||||
u.Path = path.Join(append([]string{u.Path}, extras...)...)
|
||||
if baseURL == nil || u.IsAbs() {
|
||||
return u.String()
|
||||
}
|
||||
u.Scheme = baseURL.Scheme
|
||||
u.Host = baseURL.Host
|
||||
u.Path = path.Join(baseURL.Path, u.Path)
|
||||
return u.String()
|
||||
}
|
||||
u.Scheme = s.baseURL.Scheme
|
||||
u.Host = s.baseURL.Host
|
||||
u.Path = path.Join(s.baseURL.Path, u.Path)
|
||||
return u.String()
|
||||
}
|
||||
|
||||
func (s *Server) BaseLink(target string, extras ...string) string {
|
||||
return BaseLink(s.baseURL)(target, extras...)
|
||||
}
|
||||
|
||||
type homeResponse struct {
|
||||
|
@ -215,7 +221,7 @@ func (s *Server) homeHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
for _, w := range ws {
|
||||
if _, ok := s.w.running[w.ID.String()]; ok {
|
||||
if ok := s.w.workflowRunning(w.ID); ok {
|
||||
hr.ActiveWorkflows = append(hr.ActiveWorkflows, w)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -121,7 +121,8 @@ func TestServerHomeHandler(t *testing.T) {
|
|||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil)
|
||||
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}), nil, SiteHeader{}, nil)
|
||||
|
||||
s.homeHandler(w, req)
|
||||
resp := w.Result()
|
||||
|
||||
|
@ -292,7 +293,7 @@ func TestServerCreateWorkflowHandler(t *testing.T) {
|
|||
rec := httptest.NewRecorder()
|
||||
q := db.New(p)
|
||||
|
||||
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil)
|
||||
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}), nil, SiteHeader{}, nil)
|
||||
s.createWorkflowHandler(rec, req)
|
||||
resp := rec.Result()
|
||||
|
||||
|
@ -626,7 +627,7 @@ func TestServerApproveTaskHandler(t *testing.T) {
|
|||
req := httptest.NewRequest(http.MethodPost, path.Join("/workflows/", c.params["id"], "tasks", url.PathEscape(c.params["name"]), "approve"), nil)
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
rec := httptest.NewRecorder()
|
||||
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil)
|
||||
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}), nil, SiteHeader{}, nil)
|
||||
|
||||
s.m.ServeHTTP(rec, req)
|
||||
resp := rec.Result()
|
||||
|
|
|
@ -132,6 +132,13 @@ func (w *Worker) run(wf *workflow.Workflow) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *Worker) workflowRunning(id uuid.UUID) bool {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
_, ok := w.running[id.String()]
|
||||
return ok
|
||||
}
|
||||
|
||||
// StartWorkflow persists and starts running a workflow.
|
||||
func (w *Worker) StartWorkflow(ctx context.Context, name string, params map[string]interface{}, scheduleID int) (uuid.UUID, error) {
|
||||
d := w.dh.Definition(name)
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -30,7 +29,7 @@ func TestWorkerStartWorkflow(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
dh := NewDefinitionHolder()
|
||||
w := NewWorker(dh, dbp, &testWorkflowListener{
|
||||
Listener: &PGListener{dbp},
|
||||
Listener: &PGListener{DB: dbp},
|
||||
onFinished: wg.Done,
|
||||
})
|
||||
|
||||
|
@ -91,7 +90,7 @@ func TestWorkerResume(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
dh := NewDefinitionHolder()
|
||||
w := NewWorker(dh, dbp, &testWorkflowListener{
|
||||
Listener: &PGListener{dbp},
|
||||
Listener: &PGListener{DB: dbp},
|
||||
onFinished: wg.Done,
|
||||
})
|
||||
|
||||
|
@ -130,7 +129,7 @@ func TestWorkerResumeMissingDefinition(t *testing.T) {
|
|||
defer cancel()
|
||||
dbp := testDB(ctx, t)
|
||||
q := db.New(dbp)
|
||||
w := NewWorker(NewDefinitionHolder(), dbp, &PGListener{dbp})
|
||||
w := NewWorker(NewDefinitionHolder(), dbp, &PGListener{DB: dbp})
|
||||
|
||||
cwp := db.CreateWorkflowParams{ID: uuid.New(), Name: nullString(t.Name()), Params: nullString("{}")}
|
||||
if wf, err := q.CreateWorkflow(ctx, cwp); err != nil {
|
||||
|
@ -150,7 +149,7 @@ func TestWorkflowResumeAll(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
dh := NewDefinitionHolder()
|
||||
w := NewWorker(dh, dbp, &testWorkflowListener{
|
||||
Listener: &PGListener{dbp},
|
||||
Listener: &PGListener{DB: dbp},
|
||||
onFinished: wg.Done,
|
||||
})
|
||||
|
||||
|
@ -207,7 +206,7 @@ func TestWorkflowResumeRetry(t *testing.T) {
|
|||
defer cancel()
|
||||
dbp := testDB(ctx, t)
|
||||
dh := NewDefinitionHolder()
|
||||
w := NewWorker(dh, dbp, &PGListener{dbp})
|
||||
w := NewWorker(dh, dbp, &PGListener{DB: dbp})
|
||||
|
||||
counter := 0
|
||||
blockingChan := make(chan bool)
|
||||
|
@ -252,7 +251,7 @@ func TestWorkflowResumeRetry(t *testing.T) {
|
|||
defer cancel()
|
||||
wfDone := make(chan bool, 1)
|
||||
w = NewWorker(dh, dbp, &testWorkflowListener{
|
||||
Listener: &PGListener{dbp},
|
||||
Listener: &PGListener{DB: dbp},
|
||||
onFinished: func() { wfDone <- true },
|
||||
})
|
||||
|
||||
|
@ -300,28 +299,21 @@ type testWorkflowListener struct {
|
|||
Listener
|
||||
|
||||
onFinished func()
|
||||
onStalled func()
|
||||
}
|
||||
|
||||
func (t *testWorkflowListener) WorkflowFinished(ctx context.Context, wfid uuid.UUID, outputs map[string]interface{}, err error) error {
|
||||
defer t.onFinished()
|
||||
return t.Listener.WorkflowFinished(ctx, wfid, outputs, err)
|
||||
func (t *testWorkflowListener) WorkflowFinished(ctx context.Context, wfid uuid.UUID, outputs map[string]interface{}, wferr error) error {
|
||||
err := t.Listener.WorkflowFinished(ctx, wfid, outputs, wferr)
|
||||
if t.onFinished != nil {
|
||||
t.onFinished()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type unimplementedListener struct {
|
||||
}
|
||||
|
||||
func (u *unimplementedListener) TaskStateChanged(uuid.UUID, string, *workflow.TaskState) error {
|
||||
return errors.New("method TaskStateChanged not implemented")
|
||||
}
|
||||
|
||||
func (u *unimplementedListener) Logger(uuid.UUID, string) workflow.Logger {
|
||||
return log.Default()
|
||||
}
|
||||
|
||||
func (u *unimplementedListener) WorkflowStarted(context.Context, uuid.UUID, string, map[string]interface{}, int) error {
|
||||
return errors.New("method WorkflowStarted not implemented")
|
||||
}
|
||||
|
||||
func (u *unimplementedListener) WorkflowFinished(context.Context, uuid.UUID, map[string]interface{}, error) error {
|
||||
return errors.New("method WorkflowFinished not implemented")
|
||||
func (t *testWorkflowListener) WorkflowStalled(workflowID uuid.UUID) error {
|
||||
err := t.Listener.WorkflowStalled(workflowID)
|
||||
if t.onStalled != nil {
|
||||
t.onStalled()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ type AnnounceMailTasks struct {
|
|||
// Email delivery happens asynchronously, so SendMail returns a nil error
|
||||
// if the transmission was started successfully, but that error value
|
||||
// doesn't indicate anything about the status of the delivery.
|
||||
SendMail func(MailHeader, mailContent) error
|
||||
SendMail func(MailHeader, MailContent) error
|
||||
|
||||
// AnnounceMailHeader is the header to use for the release (pre-)announcement email.
|
||||
AnnounceMailHeader MailHeader
|
||||
|
@ -282,8 +282,8 @@ type MailHeader struct {
|
|||
BCC []mail.Address
|
||||
}
|
||||
|
||||
// A mailContent holds the content of an email.
|
||||
type mailContent struct {
|
||||
// A MailContent holds the content of an email.
|
||||
type MailContent struct {
|
||||
Subject string
|
||||
BodyHTML string
|
||||
BodyText string
|
||||
|
@ -293,7 +293,7 @@ type mailContent struct {
|
|||
// which must be one of these types:
|
||||
// - releaseAnnouncement for a release announcement
|
||||
// - releasePreAnnouncement for a release pre-announcement
|
||||
func announcementMail(data any) (mailContent, error) {
|
||||
func announcementMail(data any) (MailContent, error) {
|
||||
// Select the appropriate template name.
|
||||
var name string
|
||||
switch r := data.(type) {
|
||||
|
@ -307,7 +307,7 @@ func announcementMail(data any) (mailContent, error) {
|
|||
} else if strings.Count(r.Version, ".") == 2 { // Minor release like "go1.X.Y".
|
||||
name = "announce-minor.md"
|
||||
} else {
|
||||
return mailContent{}, fmt.Errorf("unknown version format: %q", r.Version)
|
||||
return MailContent{}, fmt.Errorf("unknown version format: %q", r.Version)
|
||||
}
|
||||
|
||||
if len(r.Security) > 0 && name != "announce-minor.md" {
|
||||
|
@ -317,14 +317,14 @@ func announcementMail(data any) (mailContent, error) {
|
|||
// Note: Maybe in the future we'd want to consider support for including sentences like
|
||||
// "This beta release includes the same security fixes as in Go X.Y.Z and Go A.B.C.",
|
||||
// but we'll have a better idea after these initial templates get more practical use.
|
||||
return mailContent{}, fmt.Errorf("email template %q doesn't support the Security field; this field can only be used in minor releases", name)
|
||||
return MailContent{}, fmt.Errorf("email template %q doesn't support the Security field; this field can only be used in minor releases", name)
|
||||
} else if r.SecondaryVersion != "" && name != "announce-minor.md" {
|
||||
return mailContent{}, fmt.Errorf("email template %q doesn't support more than one release; the SecondaryVersion field can only be used in minor releases", name)
|
||||
return MailContent{}, fmt.Errorf("email template %q doesn't support more than one release; the SecondaryVersion field can only be used in minor releases", name)
|
||||
}
|
||||
case releasePreAnnouncement:
|
||||
name = "pre-announce-minor.md"
|
||||
default:
|
||||
return mailContent{}, fmt.Errorf("unknown template data type %T", data)
|
||||
return MailContent{}, fmt.Errorf("unknown template data type %T", data)
|
||||
}
|
||||
|
||||
// Render the (pre-)announcement email template.
|
||||
|
@ -332,28 +332,28 @@ func announcementMail(data any) (mailContent, error) {
|
|||
// It'll produce a valid message with a MIME header and a body, so parse it as such.
|
||||
var buf bytes.Buffer
|
||||
if err := announceTmpl.ExecuteTemplate(&buf, name, data); err != nil {
|
||||
return mailContent{}, err
|
||||
return MailContent{}, err
|
||||
}
|
||||
m, err := mail.ReadMessage(&buf)
|
||||
if err != nil {
|
||||
return mailContent{}, fmt.Errorf(`email template must be formatted like a mail message, but reading it failed: %v`, err)
|
||||
return MailContent{}, fmt.Errorf(`email template must be formatted like a mail message, but reading it failed: %v`, err)
|
||||
}
|
||||
|
||||
// Get the email subject (it's a plain string, no further processing needed).
|
||||
if _, ok := m.Header["Subject"]; !ok {
|
||||
return mailContent{}, fmt.Errorf(`email template must have a "Subject" key in its MIME header, but it's not found`)
|
||||
return MailContent{}, fmt.Errorf(`email template must have a "Subject" key in its MIME header, but it's not found`)
|
||||
} else if n := len(m.Header["Subject"]); n != 1 {
|
||||
return mailContent{}, fmt.Errorf(`email template must have a single "Subject" value in its MIME header, but have %d values`, n)
|
||||
return MailContent{}, fmt.Errorf(`email template must have a single "Subject" value in its MIME header, but have %d values`, n)
|
||||
}
|
||||
subject := m.Header.Get("Subject")
|
||||
|
||||
// Render the email body, in Markdown format at this point, to HTML and plain text.
|
||||
html, text, err := renderMarkdown(m.Body)
|
||||
if err != nil {
|
||||
return mailContent{}, err
|
||||
return MailContent{}, err
|
||||
}
|
||||
|
||||
return mailContent{subject, html, text}, nil
|
||||
return MailContent{subject, html, text}, nil
|
||||
}
|
||||
|
||||
// announceTmpl holds templates for Go release announcement emails.
|
||||
|
@ -428,7 +428,7 @@ func NewSendGridMailClient(sendgridAPIKey string) realSendGridMailClient {
|
|||
}
|
||||
|
||||
// SendMail sends an email by making an authenticated request to the SendGrid API.
|
||||
func (c realSendGridMailClient) SendMail(h MailHeader, m mailContent) error {
|
||||
func (c realSendGridMailClient) SendMail(h MailHeader, m MailContent) error {
|
||||
from, to := sendgridmail.Email(h.From), sendgridmail.Email(h.To)
|
||||
req := sendgridmail.NewSingleEmail(&from, m.Subject, &to, m.BodyText, m.BodyHTML)
|
||||
if len(req.Personalizations) != 1 {
|
||||
|
|
|
@ -240,7 +240,7 @@ Heschi and Dmitri for the Go team` + "\n",
|
|||
To: mail.Address{Address: "to-address@golang.test"},
|
||||
}
|
||||
tasks := AnnounceMailTasks{
|
||||
SendMail: func(h MailHeader, c mailContent) error {
|
||||
SendMail: func(h MailHeader, c MailContent) error {
|
||||
if diff := cmp.Diff(annMail, h); diff != "" {
|
||||
t.Errorf("mail header mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ Tatiana for the Go team` + "\n",
|
|||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
tasks := AnnounceMailTasks{
|
||||
SendMail: func(h MailHeader, c mailContent) error { return nil },
|
||||
SendMail: func(h MailHeader, c MailContent) error { return nil },
|
||||
testHookNow: func() time.Time { return time.Date(2022, time.July, 7, 0, 0, 0, 0, time.UTC) },
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
|
|
|
@ -483,6 +483,11 @@ type verboseListener struct {
|
|||
outputListener func(string, interface{})
|
||||
}
|
||||
|
||||
func (l *verboseListener) WorkflowStalled(workflowID uuid.UUID) error {
|
||||
l.t.Logf("workflow %q: stalled", workflowID.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *workflow.TaskState) error {
|
||||
switch {
|
||||
case !st.Finished:
|
||||
|
|
|
@ -478,6 +478,8 @@ type Listener interface {
|
|||
TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error
|
||||
// Logger is called to obtain a Logger for a particular task.
|
||||
Logger(workflowID uuid.UUID, taskID string) Logger
|
||||
// WorkflowStalled is called when there are no runnable tasks.
|
||||
WorkflowStalled(workflowID uuid.UUID) error
|
||||
}
|
||||
|
||||
// TaskState contains the state of a task in a running workflow. Once Finished
|
||||
|
@ -707,6 +709,8 @@ func unmarshalNew(t reflect.Type, data []byte) (interface{}, error) {
|
|||
// and when they finish. It should be used only for monitoring and persistence
|
||||
// purposes. Register Outputs to read task results.
|
||||
func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
if listener == nil {
|
||||
listener = &defaultListener{}
|
||||
}
|
||||
|
@ -761,6 +765,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
|
|||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
listener.WorkflowStalled(w.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -896,6 +901,10 @@ func (w *Workflow) expand(expanded *Definition) error {
|
|||
|
||||
type defaultListener struct{}
|
||||
|
||||
func (s *defaultListener) WorkflowStalled(workflowID uuid.UUID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -593,6 +593,11 @@ func runWorkflow(t *testing.T, w *wf.Workflow, listener wf.Listener) map[string]
|
|||
|
||||
type verboseListener struct{ t *testing.T }
|
||||
|
||||
func (l *verboseListener) WorkflowStalled(workflowID uuid.UUID) error {
|
||||
l.t.Logf("workflow %q: stalled", workflowID.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *wf.TaskState) error {
|
||||
switch {
|
||||
case !st.Started:
|
||||
|
|
Загрузка…
Ссылка в новой задаче