зеркало из https://github.com/mozilla/mig.git
[minor] better validation and error handling for entities
Validate entities as they are loaded. This also marks failed jobs as stale and will try to reload the job. Prior to this if a job failed it would silently not run again.
This commit is contained in:
Родитель
7b77f39083
Коммит
802843d6dc
|
@ -21,6 +21,7 @@ type entity struct {
|
|||
confPath string
|
||||
modTime time.Time
|
||||
|
||||
deadChan chan bool
|
||||
abortRun chan bool
|
||||
cfg entityConfig
|
||||
}
|
||||
|
@ -32,6 +33,17 @@ type entityConfig struct {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *entityConfig) validate() error {
|
||||
if e.Configuration.Schedule == "" {
|
||||
return fmt.Errorf("missing schedule")
|
||||
}
|
||||
_, err := cronexpr.Parse(e.Configuration.Schedule)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cron expression: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *entity) launchAction() (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
|
@ -80,11 +92,16 @@ func (e *entity) launchAction() (err error) {
|
|||
}
|
||||
|
||||
func (e *entity) start() {
|
||||
xr := func(s string, args ...interface{}) {
|
||||
mlog(s, args...)
|
||||
e.deadChan <- true
|
||||
}
|
||||
|
||||
e.abortRun = make(chan bool, 1)
|
||||
for {
|
||||
cexpr, err := cronexpr.Parse(e.cfg.Configuration.Schedule)
|
||||
if err != nil {
|
||||
mlog("%v: %v", e.name, err)
|
||||
xr("%v: %v", e.name, err)
|
||||
return
|
||||
}
|
||||
nrun := cexpr.Next(time.Now())
|
||||
|
@ -99,10 +116,12 @@ func (e *entity) start() {
|
|||
mlog("%v: running", e.name)
|
||||
err = e.launchAction()
|
||||
if err != nil {
|
||||
mlog("%v: %v", e.name, err)
|
||||
xr("%v: %v", e.name, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
mlog("%v: job exiting", e.name)
|
||||
e.deadChan <- true
|
||||
}
|
||||
|
||||
func (e *entity) stop() {
|
||||
|
@ -119,5 +138,18 @@ func (e *entity) load() (err error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Make sure an action file exists and is valid before we
|
||||
// schedule this.
|
||||
actpath := path.Join(e.baseDir, "action.json")
|
||||
_, err = mig.ActionFromFile(actpath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = e.cfg.validate()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -69,6 +69,7 @@ func procDir(dirpath string) (err error) {
|
|||
mlog("%v: %v", ename, err)
|
||||
return nil
|
||||
}
|
||||
ent.deadChan = make(chan bool, 1)
|
||||
ctx.Entities[ename] = ent
|
||||
mlog("added entity %v", ename)
|
||||
go ent.start()
|
||||
|
@ -76,7 +77,8 @@ func procDir(dirpath string) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Remove entities that are no longer present in the runner directory
|
||||
// Remove entities that are no longer present in the runner directory, or are
|
||||
// stale
|
||||
func procReap(ents []string) error {
|
||||
for k := range ctx.Entities {
|
||||
found := false
|
||||
|
@ -91,6 +93,20 @@ func procReap(ents []string) error {
|
|||
mlog("removed entity %v", k)
|
||||
}
|
||||
}
|
||||
// Also remove stale entities that are still present in the runner
|
||||
// directory but have encountered an error.
|
||||
for k := range ctx.Entities {
|
||||
dead := false
|
||||
select {
|
||||
case <-ctx.Entities[k].deadChan:
|
||||
dead = true
|
||||
default:
|
||||
}
|
||||
if dead {
|
||||
delete(ctx.Entities, k)
|
||||
mlog("removed stale entity %v", k)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче