[medium/bug] terminate scheduler when heartbeat to relays fails, fixes #146

If RabbitMQ suffers a network partition, the scheduler will remain in a
half broken state where no message can be consumed or published, but
other goroutines work fine. This patch introduce a new dummy event that
heartbeats the scheduler and terminates it if the publication of the
heartbeat fails. The hearbeat is a normal event that expires in rabbitmq
after 100 minutes, like any other event. It is published to a queue that
could eventually be consumed for monitoring.
This commit is contained in:
Julien Vehent 2015-11-05 07:29:08 -05:00
Родитель a73c2b4cd1
Коммит 943555b6cc
4 изменённых файлов: 31 добавлений и 7 удалений

Просмотреть файл

@ -18,4 +18,7 @@ const (
Ev_Q_Agt_Auth_Fail = "agent.authentication.failure"
Ev_Q_Agt_New = "agent.new"
Ev_Q_Cmd_Res = "command.results"
// dummy queue for scheduler heartbeats to the relays
Ev_Q_Sched_Hb = "scheduler.heartbeat"
)

Просмотреть файл

@ -10,17 +10,18 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/streadway/amqp"
"gopkg.in/gcfg.v1"
"io"
"io/ioutil"
"mig.ninja/mig"
migdb "mig.ninja/mig/database"
"net"
"os"
"strings"
"sync"
"time"
"github.com/streadway/amqp"
"gopkg.in/gcfg.v1"
"mig.ninja/mig"
migdb "mig.ninja/mig/database"
)
// Context contains all configuration variables as well as handlers for
@ -317,6 +318,7 @@ func initChannels(orig_ctx Context) (ctx Context, err error) {
ctx.Channels.CommandDone = make(chan mig.Command)
ctx.Channels.DetectDupAgents = make(chan string)
ctx.Channels.Log = make(chan mig.Log, 100000)
ctx.Channels.Terminate = make(chan error)
ctx.Channels.Log <- mig.Log{Desc: "leaving initChannels()"}.Debug()
return
}

Просмотреть файл

@ -7,9 +7,10 @@ package main
import (
"fmt"
"time"
"github.com/streadway/amqp"
"mig.ninja/mig"
"time"
)
// sendEvent publishes a message to the miginternal rabbitmq exchange

Просмотреть файл

@ -8,9 +8,10 @@ package main
import (
"errors"
"fmt"
"mig.ninja/mig"
"os"
"time"
"mig.ninja/mig"
)
func startRoutines(ctx Context) {
@ -279,9 +280,26 @@ func startRoutines(ctx Context) {
ctx.Channels.Log <- mig.Log{Desc: "killDupAgents() routine started"}
}
// launch the routine that heartbeats the relays and terminates if connection is lost
go func() {
hostname, _ := os.Hostname()
hbmsg := fmt.Sprintf("host='%s' pid='%d'", hostname, os.Getpid())
for {
ctx.OpID = mig.GenID()
err = sendEvent(mig.Ev_Q_Sched_Hb, []byte(hbmsg+time.Now().UTC().String()), ctx)
if err != nil {
err = fmt.Errorf("relay heartbeating failed with error '%v'", err)
ctx.Channels.Terminate <- err
}
time.Sleep(60 * time.Second)
}
}()
ctx.Channels.Log <- mig.Log{Desc: "relay heartbeating routine started"}
// block here until a terminate message is received
exitReason := <-ctx.Channels.Terminate
fmt.Fprintf(os.Stderr, "Scheduler is shutting down. Reason: %s", exitReason)
time.Sleep(time.Second)
fmt.Fprintf(os.Stderr, "Scheduler is shutting down. Reason: %s\n", exitReason)
Destroy(ctx)
return
}