diff --git a/mig-runner/entity.go b/mig-runner/entity.go index fdcc3a50..e39943ba 100644 --- a/mig-runner/entity.go +++ b/mig-runner/entity.go @@ -21,6 +21,7 @@ type entity struct { confPath string modTime time.Time + deadChan chan bool abortRun chan bool cfg entityConfig } @@ -32,6 +33,17 @@ type entityConfig struct { } } +func (e *entityConfig) validate() error { + if e.Configuration.Schedule == "" { + return fmt.Errorf("missing schedule") + } + _, err := cronexpr.Parse(e.Configuration.Schedule) + if err != nil { + return fmt.Errorf("cron expression: %v", err) + } + return nil +} + func (e *entity) launchAction() (err error) { defer func() { if e := recover(); e != nil { @@ -80,11 +92,16 @@ func (e *entity) launchAction() (err error) { } func (e *entity) start() { + xr := func(s string, args ...interface{}) { + mlog(s, args...) + e.deadChan <- true + } + e.abortRun = make(chan bool, 1) for { cexpr, err := cronexpr.Parse(e.cfg.Configuration.Schedule) if err != nil { - mlog("%v: %v", e.name, err) + xr("%v: %v", e.name, err) return } nrun := cexpr.Next(time.Now()) @@ -99,10 +116,12 @@ func (e *entity) start() { mlog("%v: running", e.name) err = e.launchAction() if err != nil { - mlog("%v: %v", e.name, err) + xr("%v: %v", e.name, err) return } } + mlog("%v: job exiting", e.name) + e.deadChan <- true } func (e *entity) stop() { @@ -119,5 +138,18 @@ func (e *entity) load() (err error) { if err != nil { panic(err) } + + // Make sure an action file exists and is valid before we + // schedule this. + actpath := path.Join(e.baseDir, "action.json") + _, err = mig.ActionFromFile(actpath) + if err != nil { + panic(err) + } + + err = e.cfg.validate() + if err != nil { + panic(err) + } return nil } diff --git a/mig-runner/main.go b/mig-runner/main.go index d1c013b8..18880115 100644 --- a/mig-runner/main.go +++ b/mig-runner/main.go @@ -69,6 +69,7 @@ func procDir(dirpath string) (err error) { mlog("%v: %v", ename, err) return nil } + ent.deadChan = make(chan bool, 1) ctx.Entities[ename] = ent mlog("added entity %v", ename) go ent.start() @@ -76,7 +77,8 @@ func procDir(dirpath string) (err error) { return nil } -// Remove entities that are no longer present in the runner directory +// Remove entities that are no longer present in the runner directory, or are +// stale func procReap(ents []string) error { for k := range ctx.Entities { found := false @@ -91,6 +93,20 @@ func procReap(ents []string) error { mlog("removed entity %v", k) } } + // Also remove stale entities that are still present in the runner + // directory but have encountered an error. + for k := range ctx.Entities { + dead := false + select { + case <-ctx.Entities[k].deadChan: + dead = true + default: + } + if dead { + delete(ctx.Entities, k) + mlog("removed stale entity %v", k) + } + } return nil }