From 802843d6dcefe6c6d221c53379b23685621b99fd Mon Sep 17 00:00:00 2001 From: Aaron Meihm Date: Thu, 17 Sep 2015 22:57:06 -0500 Subject: [PATCH] [minor] better validation and error handling for entities Validate entities as they are loaded. This also marks failed jobs as stale and will try to reload the job. Prior to this if a job failed it would silently not run again. --- mig-runner/entity.go | 36 ++++++++++++++++++++++++++++++++++-- mig-runner/main.go | 18 +++++++++++++++++- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/mig-runner/entity.go b/mig-runner/entity.go index fdcc3a50..e39943ba 100644 --- a/mig-runner/entity.go +++ b/mig-runner/entity.go @@ -21,6 +21,7 @@ type entity struct { confPath string modTime time.Time + deadChan chan bool abortRun chan bool cfg entityConfig } @@ -32,6 +33,17 @@ type entityConfig struct { } } +func (e *entityConfig) validate() error { + if e.Configuration.Schedule == "" { + return fmt.Errorf("missing schedule") + } + _, err := cronexpr.Parse(e.Configuration.Schedule) + if err != nil { + return fmt.Errorf("cron expression: %v", err) + } + return nil +} + func (e *entity) launchAction() (err error) { defer func() { if e := recover(); e != nil { @@ -80,11 +92,16 @@ func (e *entity) launchAction() (err error) { } func (e *entity) start() { + xr := func(s string, args ...interface{}) { + mlog(s, args...) + e.deadChan <- true + } + e.abortRun = make(chan bool, 1) for { cexpr, err := cronexpr.Parse(e.cfg.Configuration.Schedule) if err != nil { - mlog("%v: %v", e.name, err) + xr("%v: %v", e.name, err) return } nrun := cexpr.Next(time.Now()) @@ -99,10 +116,12 @@ func (e *entity) start() { mlog("%v: running", e.name) err = e.launchAction() if err != nil { - mlog("%v: %v", e.name, err) + xr("%v: %v", e.name, err) return } } + mlog("%v: job exiting", e.name) + e.deadChan <- true } func (e *entity) stop() { @@ -119,5 +138,18 @@ func (e *entity) load() (err error) { if err != nil { panic(err) } + + // Make sure an action file exists and is valid before we + // schedule this. + actpath := path.Join(e.baseDir, "action.json") + _, err = mig.ActionFromFile(actpath) + if err != nil { + panic(err) + } + + err = e.cfg.validate() + if err != nil { + panic(err) + } return nil } diff --git a/mig-runner/main.go b/mig-runner/main.go index d1c013b8..18880115 100644 --- a/mig-runner/main.go +++ b/mig-runner/main.go @@ -69,6 +69,7 @@ func procDir(dirpath string) (err error) { mlog("%v: %v", ename, err) return nil } + ent.deadChan = make(chan bool, 1) ctx.Entities[ename] = ent mlog("added entity %v", ename) go ent.start() @@ -76,7 +77,8 @@ func procDir(dirpath string) (err error) { return nil } -// Remove entities that are no longer present in the runner directory +// Remove entities that are no longer present in the runner directory, or are +// stale func procReap(ents []string) error { for k := range ctx.Entities { found := false @@ -91,6 +93,20 @@ func procReap(ents []string) error { mlog("removed entity %v", k) } } + // Also remove stale entities that are still present in the runner + // directory but have encountered an error. + for k := range ctx.Entities { + dead := false + select { + case <-ctx.Entities[k].deadChan: + dead = true + default: + } + if dead { + delete(ctx.Entities, k) + mlog("removed stale entity %v", k) + } + } return nil }