diff --git a/cmd/relui/main.go b/cmd/relui/main.go index 8d1325eb..1aaf6b62 100644 --- a/cmd/relui/main.go +++ b/cmd/relui/main.go @@ -73,6 +73,10 @@ func main() { addressVarFlag(&annMail.From, "announce-mail-from", "The From address to use for the (pre-)announcement mail.") addressVarFlag(&annMail.To, "announce-mail-to", "The To address to use for the (pre-)announcement mail.") addressListVarFlag(&annMail.BCC, "announce-mail-bcc", "The BCC address list to use for the (pre-)announcement mail.") + var schedMail task.MailHeader + addressVarFlag(&schedMail.From, "schedule-mail-from", "The From address to use for the scheduled workflow failure mail.") + addressVarFlag(&schedMail.To, "schedule-mail-to", "The To address to use for the the scheduled workflow failure mail.") + addressListVarFlag(&schedMail.BCC, "schedule-mail-bcc", "The BCC address list to use for the scheduled workflow failure mail.") var twitterAPI secret.TwitterCredentials secret.JSONVarFlag(&twitterAPI, "twitter-api-secret", "Twitter API secret to use for workflows involving tweeting.") masterKey := secret.Flag("builder-master-key", "Builder master key") @@ -212,11 +216,6 @@ func main() { } dh.RegisterDefinition("Tag x/ repos", tagTasks.NewDefinition()) - w := relui.NewWorker(dh, dbPool, relui.NewPGListener(dbPool)) - go w.Run(ctx) - if err := w.ResumeAll(ctx); err != nil { - log.Printf("w.ResumeAll() = %v", err) - } var base *url.URL if *baseURL != "" { base, err = url.Parse(*baseURL) @@ -224,6 +223,17 @@ func main() { log.Fatalf("url.Parse(%q) = %v, %v", *baseURL, base, err) } } + l := &relui.PGListener{ + DB: dbPool, + BaseURL: base, + ScheduleFailureMailHeader: schedMail, + SendMail: relui.LogOnlyMailer, + } + w := relui.NewWorker(dh, dbPool, l) + go w.Run(ctx) + if err := w.ResumeAll(ctx); err != nil { + log.Printf("w.ResumeAll() = %v", err) + } s := relui.NewServer(dbPool, w, base, siteHeader, ms) log.Fatalln(https.ListenAndServe(ctx, &ochttp.Handler{Handler: GRPCHandler(grpcServer, s)})) } diff --git a/internal/relui/buildrelease_test.go b/internal/relui/buildrelease_test.go index e096e581..c6be4368 100644 --- a/internal/relui/buildrelease_test.go +++ b/internal/relui/buildrelease_test.go @@ -544,6 +544,11 @@ type verboseListener struct { outputListener func(string, interface{}) } +func (l *verboseListener) WorkflowStalled(workflowID uuid.UUID) error { + l.t.Logf("workflow %q: stalled", workflowID.String()) + return nil +} + func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *workflow.TaskState) error { switch { case !st.Finished: diff --git a/internal/relui/content.go b/internal/relui/content.go index 242d425b..6d3a19dd 100644 --- a/internal/relui/content.go +++ b/internal/relui/content.go @@ -4,7 +4,9 @@ package relui -import "embed" +import ( + "embed" +) // static is our static web server content. // diff --git a/internal/relui/listener.go b/internal/relui/listener.go index 82f85880..b839cf35 100644 --- a/internal/relui/listener.go +++ b/internal/relui/listener.go @@ -5,26 +5,51 @@ package relui import ( + "bytes" "context" "database/sql" "encoding/json" "fmt" + "html/template" "log" + "net/url" "time" "github.com/google/uuid" "github.com/jackc/pgx/v4" "golang.org/x/build/internal/relui/db" + "golang.org/x/build/internal/task" "golang.org/x/build/internal/workflow" ) -func NewPGListener(db db.PGDBTX) *PGListener { - return &PGListener{db} -} - // PGListener implements workflow.Listener for recording workflow state. type PGListener struct { - db db.PGDBTX + DB db.PGDBTX + + BaseURL *url.URL + + ScheduleFailureMailHeader task.MailHeader + SendMail func(task.MailHeader, task.MailContent) error + + templ *template.Template +} + +// WorkflowStalled is called when no tasks are runnable. +func (l *PGListener) WorkflowStalled(workflowID uuid.UUID) error { + wf, err := db.New(l.DB).Workflow(context.Background(), workflowID) + if err != nil || wf.ScheduleID.Int32 == 0 { + return err + } + var buf bytes.Buffer + body := scheduledFailureEmailBody{Workflow: wf} + if err := l.template("scheduled_workflow_failure_email.txt").Execute(&buf, body); err != nil { + log.Printf("WorkflowFinished: Execute(_, %v) = %q", body, err) + return err + } + return l.SendMail(l.ScheduleFailureMailHeader, task.MailContent{ + Subject: fmt.Sprintf("[relui] Scheduled workflow %q failed", wf.Name.String), + BodyText: buf.String(), + }) } // TaskStateChanged is called whenever a task is updated by the @@ -38,7 +63,7 @@ func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, sta if err != nil { return err } - err = l.db.BeginFunc(ctx, func(tx pgx.Tx) error { + err = l.DB.BeginFunc(ctx, func(tx pgx.Tx) error { q := db.New(tx) updated := time.Now() _, err := q.UpsertTask(ctx, db.UpsertTaskParams{ @@ -62,7 +87,7 @@ func (l *PGListener) TaskStateChanged(workflowID uuid.UUID, taskName string, sta // WorkflowStarted persists a new workflow execution in the database. func (l *PGListener) WorkflowStarted(ctx context.Context, workflowID uuid.UUID, name string, params map[string]interface{}, scheduleID int) error { - q := db.New(l.db) + q := db.New(l.DB) m, err := json.Marshal(params) if err != nil { return err @@ -80,11 +105,16 @@ func (l *PGListener) WorkflowStarted(ctx context.Context, workflowID uuid.UUID, return err } +type scheduledFailureEmailBody struct { + Workflow db.Workflow + Err error +} + // WorkflowFinished saves the final state of a workflow after its run // has completed. func (l *PGListener) WorkflowFinished(ctx context.Context, workflowID uuid.UUID, outputs map[string]interface{}, workflowErr error) error { - log.Printf("WorkflowCompleted(%q, %v, %q)", workflowID, outputs, workflowErr) - q := db.New(l.db) + log.Printf("WorkflowFinished(%q, %v, %q)", workflowID, outputs, workflowErr) + q := db.New(l.DB) m, err := json.Marshal(outputs) if err != nil { return err @@ -102,9 +132,21 @@ func (l *PGListener) WorkflowFinished(ctx context.Context, workflowID uuid.UUID, return err } +func (l *PGListener) template(name string) *template.Template { + if l.templ == nil { + helpers := map[string]any{"baseLink": l.baseLink} + l.templ = template.Must(template.New("").Funcs(helpers).ParseFS(templates, "templates/*.txt")) + } + return l.templ.Lookup(name) +} + +func (l *PGListener) baseLink(target string, extras ...string) string { + return BaseLink(l.BaseURL)(target, extras...) +} + func (l *PGListener) Logger(workflowID uuid.UUID, taskName string) workflow.Logger { return &postgresLogger{ - db: l.db, + db: l.DB, workflowID: workflowID, taskName: taskName, } @@ -136,3 +178,8 @@ func (l *postgresLogger) Printf(format string, v ...interface{}) { log.Printf("l.Printf(%q, %v) = %v", format, v, err) } } + +func LogOnlyMailer(header task.MailHeader, content task.MailContent) error { + log.Println("Logging but not sending mail:", header, content) + return nil +} diff --git a/internal/relui/listener_test.go b/internal/relui/listener_test.go index b014e5ad..9cefc132 100644 --- a/internal/relui/listener_test.go +++ b/internal/relui/listener_test.go @@ -7,6 +7,9 @@ package relui import ( "context" "database/sql" + "errors" + "fmt" + "net/mail" "testing" "time" @@ -14,6 +17,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" "golang.org/x/build/internal/relui/db" + "golang.org/x/build/internal/task" "golang.org/x/build/internal/workflow" ) @@ -76,7 +80,7 @@ func TestListenerTaskStateChanged(t *testing.T) { t.Fatalf("q.CreateWorkflow(%v, %v) = %v, wanted no error", ctx, wfp, err) } - l := &PGListener{db: dbp} + l := &PGListener{DB: dbp} err = l.TaskStateChanged(wf.ID, "TestTask", c.state) if err != nil { t.Fatalf("l.TaskStateChanged(%v, %q, %v) = %v, wanted no error", wf.ID, "TestTask", c.state, err) @@ -110,7 +114,7 @@ func TestListenerLogger(t *testing.T) { t.Fatalf("q.UpsertTask(%v, %v) = %v, wanted no error", ctx, params, err) } - l := &PGListener{db: dbp} + l := &PGListener{DB: dbp} l.Logger(wf.ID, "TestTask").Printf("A fancy log line says %q", "hello") logs, err := q.TaskLogs(ctx) @@ -128,3 +132,99 @@ func TestListenerLogger(t *testing.T) { t.Errorf("q.TaskLogs(_, %q) mismatch (-want +got):\n%s", wf.ID, diff) } } + +func TestPGListenerWorkflowStalledNotification(t *testing.T) { + cases := []struct { + desc string + schedule bool + taskErr bool + }{ + { + desc: "scheduled workflow failure sends notification", + schedule: true, + taskErr: true, + }, + { + desc: "scheduled workflow success sends nothing", + schedule: true, + }, + { + desc: "unscheduled workflow success sends nothing", + }, + { + desc: "unscheduled workflow failure sends nothing", + taskErr: true, + }, + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := testDB(ctx, t) + var schedID int + if c.schedule { + sched, err := db.New(p).CreateSchedule(ctx, db.CreateScheduleParams{WorkflowName: c.desc}) + if err != nil { + t.Fatalf("CreateSchedule() = %v, wanted no error", err) + } + schedID = int(sched.ID) + } + wd := workflow.New() + complete := workflow.Task0(wd, "complete", func(ctx context.Context) (string, error) { + if c.taskErr { + return "", fmt.Errorf("c.taskErr: %t", c.taskErr) + } + return "done", nil + }) + workflow.Output(wd, "finished", complete) + dh := NewDefinitionHolder() + dh.RegisterDefinition(c.desc, wd) + + header := task.MailHeader{ + From: mail.Address{Address: "from-address@golang.test"}, + To: mail.Address{Address: "to-address@golang.test"}, + } + var gotHeader task.MailHeader + var gotContent task.MailContent + pgl := &PGListener{ + DB: p, + SendMail: func(h task.MailHeader, c task.MailContent) error { + gotHeader, gotContent = h, c + return nil + }, + ScheduleFailureMailHeader: header, + } + listener := &testWorkflowListener{ + Listener: pgl, + onFinished: cancel, + } + w := NewWorker(dh, p, listener) + + id, err := w.StartWorkflow(ctx, c.desc, nil, schedID) + if err != nil { + t.Fatalf("w.StartWorkflow(_, %q, %v, %d) = %v, %v, wanted no error", c.desc, nil, schedID, id, err) + } + listener.onStalled = func() { + w.cancelWorkflow(id) + } + if err := w.Run(ctx); !errors.Is(err, context.Canceled) { + t.Errorf("w.Run() = %v, wanted %v", err, context.Canceled) + } + + wantSend := c.taskErr && c.schedule + if (gotContent.Subject == "") == wantSend { + t.Errorf("gotContent.Subject = %q, wanted empty: %t", gotContent.Subject, !c.taskErr) + } + if (gotContent.BodyText == "") == wantSend { + t.Errorf("gotContent.BodyText = %q, wanted empty: %t", gotContent.BodyText, !c.taskErr) + } + var wantHeader task.MailHeader + if wantSend { + wantHeader = header + } + if diff := cmp.Diff(wantHeader, gotHeader); diff != "" { + t.Errorf("WorkflowFinished(_, %q) mismatch (-want +got):\n%s", id, diff) + } + }) + } +} diff --git a/internal/relui/schedule_test.go b/internal/relui/schedule_test.go index e6695adf..c6c5cb67 100644 --- a/internal/relui/schedule_test.go +++ b/internal/relui/schedule_test.go @@ -120,7 +120,7 @@ func TestSchedulerCreate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := testDB(ctx, t) - s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p})) + s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p})) row, err := s.Create(ctx, c.sched, c.workflowName, c.params) if (err != nil) != c.wantErr { t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wantErr: %t", c.sched, c.workflowName, c.params, row, err, c.wantErr) @@ -243,7 +243,7 @@ func TestSchedulerResume(t *testing.T) { defer cancel() p := testDB(ctx, t) q := db.New(p) - s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p})) + s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p})) for _, csp := range c.scheds { if _, err := q.CreateSchedule(ctx, csp); err != nil { @@ -294,7 +294,7 @@ func TestScheduleDelete(t *testing.T) { defer cancel() p := testDB(ctx, t) q := db.New(p) - s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p})) + s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p})) row, err := s.Create(ctx, c.sched, c.workflowName, c.params) if err != nil { t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wanted no error", c.sched, c.workflowName, c.params, row, err) diff --git a/internal/relui/templates/scheduled_workflow_failure_email.txt b/internal/relui/templates/scheduled_workflow_failure_email.txt new file mode 100644 index 00000000..3832dd96 --- /dev/null +++ b/internal/relui/templates/scheduled_workflow_failure_email.txt @@ -0,0 +1,6 @@ +{{- /*gotype: golang.org/x/build/internal/relui.scheduledFailureEmailBody*/ -}} +The scheduled workflow "{{.Workflow.Name.String}}" ({{.Workflow.ID.String}}) failed at {{.Workflow.UpdatedAt.UTC.Format "2006/01/02 15:04 MST"}}. + +See the following page for details: + +{{baseLink "/workflows/" .Workflow.ID.String}} diff --git a/internal/relui/web.go b/internal/relui/web.go index 27e69832..7a90b18a 100644 --- a/internal/relui/web.go +++ b/internal/relui/web.go @@ -151,20 +151,26 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.m.ServeHTTP(w, r) } -func (s *Server) BaseLink(target string, extras ...string) string { - u, err := url.Parse(target) - if err != nil { - log.Printf("BaseLink: url.Parse(%q) = %v, %v", target, u, err) - return path.Join(append([]string{target}, extras...)...) - } - u.Path = path.Join(append([]string{u.Path}, extras...)...) - if s.baseURL == nil || u.IsAbs() { +func BaseLink(baseURL *url.URL) func(target string, extras ...string) string { + return func(target string, extras ...string) string { + u, err := url.Parse(target) + if err != nil { + log.Printf("BaseLink: url.Parse(%q) = %v, %v", target, u, err) + return path.Join(append([]string{target}, extras...)...) + } + u.Path = path.Join(append([]string{u.Path}, extras...)...) + if baseURL == nil || u.IsAbs() { + return u.String() + } + u.Scheme = baseURL.Scheme + u.Host = baseURL.Host + u.Path = path.Join(baseURL.Path, u.Path) return u.String() } - u.Scheme = s.baseURL.Scheme - u.Host = s.baseURL.Host - u.Path = path.Join(s.baseURL.Path, u.Path) - return u.String() +} + +func (s *Server) BaseLink(target string, extras ...string) string { + return BaseLink(s.baseURL)(target, extras...) } type homeResponse struct { @@ -215,7 +221,7 @@ func (s *Server) homeHandler(w http.ResponseWriter, r *http.Request) { return } for _, w := range ws { - if _, ok := s.w.running[w.ID.String()]; ok { + if ok := s.w.workflowRunning(w.ID); ok { hr.ActiveWorkflows = append(hr.ActiveWorkflows, w) continue } diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go index ebfc4a63..3daf7830 100644 --- a/internal/relui/web_test.go +++ b/internal/relui/web_test.go @@ -121,7 +121,8 @@ func TestServerHomeHandler(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/", nil) w := httptest.NewRecorder() - s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil) + s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}), nil, SiteHeader{}, nil) + s.homeHandler(w, req) resp := w.Result() @@ -292,7 +293,7 @@ func TestServerCreateWorkflowHandler(t *testing.T) { rec := httptest.NewRecorder() q := db.New(p) - s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil) + s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}), nil, SiteHeader{}, nil) s.createWorkflowHandler(rec, req) resp := rec.Result() @@ -626,7 +627,7 @@ func TestServerApproveTaskHandler(t *testing.T) { req := httptest.NewRequest(http.MethodPost, path.Join("/workflows/", c.params["id"], "tasks", url.PathEscape(c.params["name"]), "approve"), nil) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") rec := httptest.NewRecorder() - s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil) + s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}), nil, SiteHeader{}, nil) s.m.ServeHTTP(rec, req) resp := rec.Result() diff --git a/internal/relui/worker.go b/internal/relui/worker.go index 305a58f6..c35a7b13 100644 --- a/internal/relui/worker.go +++ b/internal/relui/worker.go @@ -132,6 +132,13 @@ func (w *Worker) run(wf *workflow.Workflow) error { } } +func (w *Worker) workflowRunning(id uuid.UUID) bool { + w.mu.Lock() + defer w.mu.Unlock() + _, ok := w.running[id.String()] + return ok +} + // StartWorkflow persists and starts running a workflow. func (w *Worker) StartWorkflow(ctx context.Context, name string, params map[string]interface{}, scheduleID int) (uuid.UUID, error) { d := w.dh.Definition(name) diff --git a/internal/relui/worker_test.go b/internal/relui/worker_test.go index 3e8f71a6..4ab39be1 100644 --- a/internal/relui/worker_test.go +++ b/internal/relui/worker_test.go @@ -9,7 +9,6 @@ import ( "database/sql" "errors" "fmt" - "log" "strings" "sync" "testing" @@ -30,7 +29,7 @@ func TestWorkerStartWorkflow(t *testing.T) { wg := sync.WaitGroup{} dh := NewDefinitionHolder() w := NewWorker(dh, dbp, &testWorkflowListener{ - Listener: &PGListener{dbp}, + Listener: &PGListener{DB: dbp}, onFinished: wg.Done, }) @@ -91,7 +90,7 @@ func TestWorkerResume(t *testing.T) { wg := sync.WaitGroup{} dh := NewDefinitionHolder() w := NewWorker(dh, dbp, &testWorkflowListener{ - Listener: &PGListener{dbp}, + Listener: &PGListener{DB: dbp}, onFinished: wg.Done, }) @@ -130,7 +129,7 @@ func TestWorkerResumeMissingDefinition(t *testing.T) { defer cancel() dbp := testDB(ctx, t) q := db.New(dbp) - w := NewWorker(NewDefinitionHolder(), dbp, &PGListener{dbp}) + w := NewWorker(NewDefinitionHolder(), dbp, &PGListener{DB: dbp}) cwp := db.CreateWorkflowParams{ID: uuid.New(), Name: nullString(t.Name()), Params: nullString("{}")} if wf, err := q.CreateWorkflow(ctx, cwp); err != nil { @@ -150,7 +149,7 @@ func TestWorkflowResumeAll(t *testing.T) { wg := sync.WaitGroup{} dh := NewDefinitionHolder() w := NewWorker(dh, dbp, &testWorkflowListener{ - Listener: &PGListener{dbp}, + Listener: &PGListener{DB: dbp}, onFinished: wg.Done, }) @@ -207,7 +206,7 @@ func TestWorkflowResumeRetry(t *testing.T) { defer cancel() dbp := testDB(ctx, t) dh := NewDefinitionHolder() - w := NewWorker(dh, dbp, &PGListener{dbp}) + w := NewWorker(dh, dbp, &PGListener{DB: dbp}) counter := 0 blockingChan := make(chan bool) @@ -252,7 +251,7 @@ func TestWorkflowResumeRetry(t *testing.T) { defer cancel() wfDone := make(chan bool, 1) w = NewWorker(dh, dbp, &testWorkflowListener{ - Listener: &PGListener{dbp}, + Listener: &PGListener{DB: dbp}, onFinished: func() { wfDone <- true }, }) @@ -300,28 +299,21 @@ type testWorkflowListener struct { Listener onFinished func() + onStalled func() } -func (t *testWorkflowListener) WorkflowFinished(ctx context.Context, wfid uuid.UUID, outputs map[string]interface{}, err error) error { - defer t.onFinished() - return t.Listener.WorkflowFinished(ctx, wfid, outputs, err) +func (t *testWorkflowListener) WorkflowFinished(ctx context.Context, wfid uuid.UUID, outputs map[string]interface{}, wferr error) error { + err := t.Listener.WorkflowFinished(ctx, wfid, outputs, wferr) + if t.onFinished != nil { + t.onFinished() + } + return err } -type unimplementedListener struct { -} - -func (u *unimplementedListener) TaskStateChanged(uuid.UUID, string, *workflow.TaskState) error { - return errors.New("method TaskStateChanged not implemented") -} - -func (u *unimplementedListener) Logger(uuid.UUID, string) workflow.Logger { - return log.Default() -} - -func (u *unimplementedListener) WorkflowStarted(context.Context, uuid.UUID, string, map[string]interface{}, int) error { - return errors.New("method WorkflowStarted not implemented") -} - -func (u *unimplementedListener) WorkflowFinished(context.Context, uuid.UUID, map[string]interface{}, error) error { - return errors.New("method WorkflowFinished not implemented") +func (t *testWorkflowListener) WorkflowStalled(workflowID uuid.UUID) error { + err := t.Listener.WorkflowStalled(workflowID) + if t.onStalled != nil { + t.onStalled() + } + return err } diff --git a/internal/task/announce.go b/internal/task/announce.go index 4da34285..d5423b39 100644 --- a/internal/task/announce.go +++ b/internal/task/announce.go @@ -109,7 +109,7 @@ type AnnounceMailTasks struct { // Email delivery happens asynchronously, so SendMail returns a nil error // if the transmission was started successfully, but that error value // doesn't indicate anything about the status of the delivery. - SendMail func(MailHeader, mailContent) error + SendMail func(MailHeader, MailContent) error // AnnounceMailHeader is the header to use for the release (pre-)announcement email. AnnounceMailHeader MailHeader @@ -282,8 +282,8 @@ type MailHeader struct { BCC []mail.Address } -// A mailContent holds the content of an email. -type mailContent struct { +// A MailContent holds the content of an email. +type MailContent struct { Subject string BodyHTML string BodyText string @@ -293,7 +293,7 @@ type mailContent struct { // which must be one of these types: // - releaseAnnouncement for a release announcement // - releasePreAnnouncement for a release pre-announcement -func announcementMail(data any) (mailContent, error) { +func announcementMail(data any) (MailContent, error) { // Select the appropriate template name. var name string switch r := data.(type) { @@ -307,7 +307,7 @@ func announcementMail(data any) (mailContent, error) { } else if strings.Count(r.Version, ".") == 2 { // Minor release like "go1.X.Y". name = "announce-minor.md" } else { - return mailContent{}, fmt.Errorf("unknown version format: %q", r.Version) + return MailContent{}, fmt.Errorf("unknown version format: %q", r.Version) } if len(r.Security) > 0 && name != "announce-minor.md" { @@ -317,14 +317,14 @@ func announcementMail(data any) (mailContent, error) { // Note: Maybe in the future we'd want to consider support for including sentences like // "This beta release includes the same security fixes as in Go X.Y.Z and Go A.B.C.", // but we'll have a better idea after these initial templates get more practical use. - return mailContent{}, fmt.Errorf("email template %q doesn't support the Security field; this field can only be used in minor releases", name) + return MailContent{}, fmt.Errorf("email template %q doesn't support the Security field; this field can only be used in minor releases", name) } else if r.SecondaryVersion != "" && name != "announce-minor.md" { - return mailContent{}, fmt.Errorf("email template %q doesn't support more than one release; the SecondaryVersion field can only be used in minor releases", name) + return MailContent{}, fmt.Errorf("email template %q doesn't support more than one release; the SecondaryVersion field can only be used in minor releases", name) } case releasePreAnnouncement: name = "pre-announce-minor.md" default: - return mailContent{}, fmt.Errorf("unknown template data type %T", data) + return MailContent{}, fmt.Errorf("unknown template data type %T", data) } // Render the (pre-)announcement email template. @@ -332,28 +332,28 @@ func announcementMail(data any) (mailContent, error) { // It'll produce a valid message with a MIME header and a body, so parse it as such. var buf bytes.Buffer if err := announceTmpl.ExecuteTemplate(&buf, name, data); err != nil { - return mailContent{}, err + return MailContent{}, err } m, err := mail.ReadMessage(&buf) if err != nil { - return mailContent{}, fmt.Errorf(`email template must be formatted like a mail message, but reading it failed: %v`, err) + return MailContent{}, fmt.Errorf(`email template must be formatted like a mail message, but reading it failed: %v`, err) } // Get the email subject (it's a plain string, no further processing needed). if _, ok := m.Header["Subject"]; !ok { - return mailContent{}, fmt.Errorf(`email template must have a "Subject" key in its MIME header, but it's not found`) + return MailContent{}, fmt.Errorf(`email template must have a "Subject" key in its MIME header, but it's not found`) } else if n := len(m.Header["Subject"]); n != 1 { - return mailContent{}, fmt.Errorf(`email template must have a single "Subject" value in its MIME header, but have %d values`, n) + return MailContent{}, fmt.Errorf(`email template must have a single "Subject" value in its MIME header, but have %d values`, n) } subject := m.Header.Get("Subject") // Render the email body, in Markdown format at this point, to HTML and plain text. html, text, err := renderMarkdown(m.Body) if err != nil { - return mailContent{}, err + return MailContent{}, err } - return mailContent{subject, html, text}, nil + return MailContent{subject, html, text}, nil } // announceTmpl holds templates for Go release announcement emails. @@ -428,7 +428,7 @@ func NewSendGridMailClient(sendgridAPIKey string) realSendGridMailClient { } // SendMail sends an email by making an authenticated request to the SendGrid API. -func (c realSendGridMailClient) SendMail(h MailHeader, m mailContent) error { +func (c realSendGridMailClient) SendMail(h MailHeader, m MailContent) error { from, to := sendgridmail.Email(h.From), sendgridmail.Email(h.To) req := sendgridmail.NewSingleEmail(&from, m.Subject, &to, m.BodyText, m.BodyHTML) if len(req.Personalizations) != 1 { diff --git a/internal/task/announce_test.go b/internal/task/announce_test.go index 1f5cdb36..8d7986fb 100644 --- a/internal/task/announce_test.go +++ b/internal/task/announce_test.go @@ -240,7 +240,7 @@ Heschi and Dmitri for the Go team` + "\n", To: mail.Address{Address: "to-address@golang.test"}, } tasks := AnnounceMailTasks{ - SendMail: func(h MailHeader, c mailContent) error { + SendMail: func(h MailHeader, c MailContent) error { if diff := cmp.Diff(annMail, h); diff != "" { t.Errorf("mail header mismatch (-want +got):\n%s", diff) } @@ -315,7 +315,7 @@ Tatiana for the Go team` + "\n", for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { tasks := AnnounceMailTasks{ - SendMail: func(h MailHeader, c mailContent) error { return nil }, + SendMail: func(h MailHeader, c MailContent) error { return nil }, testHookNow: func() time.Time { return time.Date(2022, time.July, 7, 0, 0, 0, 0, time.UTC) }, } var buf bytes.Buffer diff --git a/internal/task/tagx_test.go b/internal/task/tagx_test.go index c20f7221..24ceb8eb 100644 --- a/internal/task/tagx_test.go +++ b/internal/task/tagx_test.go @@ -483,6 +483,11 @@ type verboseListener struct { outputListener func(string, interface{}) } +func (l *verboseListener) WorkflowStalled(workflowID uuid.UUID) error { + l.t.Logf("workflow %q: stalled", workflowID.String()) + return nil +} + func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *workflow.TaskState) error { switch { case !st.Finished: diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go index c68e147a..93c16e1a 100644 --- a/internal/workflow/workflow.go +++ b/internal/workflow/workflow.go @@ -478,6 +478,8 @@ type Listener interface { TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error // Logger is called to obtain a Logger for a particular task. Logger(workflowID uuid.UUID, taskID string) Logger + // WorkflowStalled is called when there are no runnable tasks. + WorkflowStalled(workflowID uuid.UUID) error } // TaskState contains the state of a task in a running workflow. Once Finished @@ -707,6 +709,8 @@ func unmarshalNew(t reflect.Type, data []byte) (interface{}, error) { // and when they finish. It should be used only for monitoring and persistence // purposes. Register Outputs to read task results. func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() if listener == nil { listener = &defaultListener{} } @@ -761,6 +765,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter case <-ctx.Done(): return nil, ctx.Err() default: + listener.WorkflowStalled(w.ID) } } @@ -896,6 +901,10 @@ func (w *Workflow) expand(expanded *Definition) error { type defaultListener struct{} +func (s *defaultListener) WorkflowStalled(workflowID uuid.UUID) error { + return nil +} + func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error { return nil } diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go index 503a5bcb..997609ec 100644 --- a/internal/workflow/workflow_test.go +++ b/internal/workflow/workflow_test.go @@ -593,6 +593,11 @@ func runWorkflow(t *testing.T, w *wf.Workflow, listener wf.Listener) map[string] type verboseListener struct{ t *testing.T } +func (l *verboseListener) WorkflowStalled(workflowID uuid.UUID) error { + l.t.Logf("workflow %q: stalled", workflowID.String()) + return nil +} + func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *wf.TaskState) error { switch { case !st.Started: