[medium] improve automated deletion of unused amqp queues in scheduler

This commit is contained in:
Julien Vehent 2015-02-04 11:25:33 -05:00
Родитель 76e3911a11
Коммит 04e822d20a
5 изменённых файлов: 116 добавлений и 66 удалений

Просмотреть файл

@ -30,11 +30,17 @@
; the periodic runs less often that
; the collector and does cleanup and DB updates
[periodic]
; frequency at which the periodic jobs run
freq = "87s"
; delete finished actions, commands and invalids after
; this period has passed
deleteafter = "72h"
; run a rabbitmq unused queues cleanup job at this frequency
; this is DB & amqp intensive so don't run it too often
queuescleanupfreq = "24h"
[directories]
spool = "/var/cache/mig/"
tmp = "/var/tmp/"

Просмотреть файл

@ -45,7 +45,7 @@ type Context struct {
Freq string
}
Periodic struct {
Freq, DeleteAfter string
Freq, DeleteAfter, QueuesCleanupFreq string
}
Directories struct {
// configuration
@ -294,7 +294,7 @@ func initRelay(orig_ctx Context) (ctx Context, err error) {
panic(err)
}
ctx.Channels.Log <- mig.Log{Sev: "info", Desc: "AMQP connection opened"}
ctx.Channels.Log <- mig.Log{Desc: "AMQP connection opened"}
return
}

Просмотреть файл

@ -14,14 +14,16 @@ import (
// periodic runs tasks at regular interval
func periodic(ctx Context) (err error) {
start := time.Now()
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("periodic() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving periodic()"}.Debug()
d := time.Since(start)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("periodic run done in %v", d)}
}()
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "initiating periodic run"}
start := time.Now()
err = cleanDir(ctx, ctx.Directories.Action.Done)
if err != nil {
panic(err)
@ -38,16 +40,10 @@ func periodic(ctx Context) (err error) {
if err != nil {
panic(err)
}
err = cleanQueueDisappearedEndpoints(ctx)
if err != nil {
panic(err)
}
err = computeAgentsStats(ctx)
if err != nil {
panic(err)
}
d := time.Since(start)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("periodic run done in %v", d)}
return
}
@ -124,60 +120,6 @@ func markIdleAgents(ctx Context) (err error) {
return
}
// cleanQueueDisappearedEndpoints deletes rabbitmq queues of endpoints that no
// longer have any agent running on them. Only the queues with 0 consumers and 0
// pending messages are deleted.
func cleanQueueDisappearedEndpoints(ctx Context) (err error) {
start := time.Now()
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("cleanQueueDisappearedEndpoints() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving cleanQueueDisappearedEndpoints()"}.Debug()
}()
agtTimeout, err := time.ParseDuration(ctx.Agent.TimeOut)
if err != nil {
panic(err)
}
oldest := time.Now().Add(-agtTimeout * 2)
queues, err := ctx.DB.GetDisappearedEndpoints(oldest)
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("cleanQueueDisappearedEndpoints(): inspecting %d queues from disappeared endpoints", len(queues))}.Debug()
// create a new channel to do the maintenance. reusing the channel that exists in the
// context has strange side effect, like reducing the consumption rates of heartbeats to
// just a few messages per second. Thus, we prefer using a separate throwaway channel.
amqpchan, err := ctx.MQ.conn.Channel()
if err != nil {
panic(err)
}
defer amqpchan.Close()
for _, queue := range queues {
// the call to inspect will fail if the queue doesn't exist, so we fail silently and continue
qstat, err := amqpchan.QueueInspect("mig.agt." + queue)
if err != nil {
continue
}
if qstat.Consumers != 0 || qstat.Messages != 0 {
desc := fmt.Sprintf("skipped deletion of agent queue %s, it has %d consumers and %d pending messages",
queue, qstat.Consumers, qstat.Messages)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}
continue
}
_, err = amqpchan.QueueDelete("mig.agt."+queue, true, true, true)
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("removed endpoint queue %s", queue)}
}
d := time.Since(start)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("cleanQueueDisappearedEndpoints(): done in %v", d)}.Debug()
return
}
// computeAgentsStats computes and stores statistics about agents and endpoints
// computeAgentsStats computes and stores statistics about agents and endpoints
func computeAgentsStats(ctx Context) (err error) {
defer func() {

Просмотреть файл

@ -0,0 +1,85 @@
package main
import (
"fmt"
"github.com/streadway/amqp"
"mig"
"time"
)
// QueuesCleanup deletes rabbitmq queues of endpoints that no
// longer have any agent running on them. Only the queues with 0 consumers and 0
// pending messages are deleted.
func QueuesCleanup(ctx Context) (err error) {
// temporary context for amqp operations
tmpctx := ctx
tmpctxinitiliazed := false
start := time.Now()
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("QueuesCleanup() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving QueuesCleanup()"}.Debug()
if tmpctxinitiliazed {
tmpctx.MQ.conn.Close()
}
}()
// cleanup runs every QueuesCleanupFreq, so we run a query that lists all offline agents
// between now and up to "QueuesCleanupFreq" ago. For a typical value of QueuesCleanupFreq
// set to 24 hours, each iteration of this job will list all agents that went offline over
// the last 24 hours, plus two hours for good measure...
pointInTime, err := time.ParseDuration(ctx.Periodic.QueuesCleanupFreq)
if err != nil {
panic(err)
}
oldest := time.Now().Add(-(pointInTime + 2*time.Hour))
queues, err := ctx.DB.GetDisappearedEndpoints(oldest)
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("QueuesCleanup(): found %d offline endpoints between %s and now", len(queues), oldest.String())}.Debug()
makeamqpchan := true
for _, queue := range queues {
if makeamqpchan {
// keep the existing context, but create a new rabbitmq connection to prevent breaking
// the main one if something goes wrong.
tmpctx, err = initRelay(tmpctx)
if err != nil {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("QueuesCleanup(): %v. Continuing!", err)}.Err()
continue
}
makeamqpchan = false
tmpctxinitiliazed = true
}
// the call to inspect will fail if the queue doesn't exist, so we fail silently and continue
_, err = tmpctx.MQ.Chan.QueueInspect("mig.agt." + queue)
if err != nil {
// If a queue by this name does not exist, an error will be returned and the channel will be closed.
// Reopen the channel and continue
if amqp.ErrClosed == err || err.(*amqp.Error).Recover {
tmpctx.MQ.Chan, err = tmpctx.MQ.conn.Channel()
if err != nil {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("QueuesCleanup(): QueueInspect failed with error '%v'. Continuing.", err)}.Warning()
makeamqpchan = true
}
} else {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("QueuesCleanup(): QueueInspect failed with error '%v'. Continuing.", err)}.Warning()
makeamqpchan = true
}
continue
}
_, err = tmpctx.MQ.Chan.QueueDelete("mig.agt."+queue, false, false, false)
if err != nil {
desc := fmt.Sprintf("error while deleting queue mig.agt.%s: %v", queue, err)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Err()
makeamqpchan = true
} else {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("removed endpoint queue %s", queue)}
// throttling. looks like iterating too fast on queuedelete eventually locks the connection
time.Sleep(100 * time.Millisecond)
}
}
d := time.Since(start)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("QueuesCleanup(): done in %v", d)}
return
}

Просмотреть файл

@ -166,7 +166,7 @@ func startRoutines(ctx Context) {
ctx.OpID = mig.GenID()
err := getHeartbeats(msg, ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("heartbeat routine failed with error '%v'", err)}.Err()
}
}
}()
@ -213,7 +213,7 @@ func startRoutines(ctx Context) {
ctx.OpID = mig.GenID()
err := collector(ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("collector routined failed with error '%v'", err)}.Err()
}
time.Sleep(collectorSleeper)
}
@ -230,13 +230,30 @@ func startRoutines(ctx Context) {
ctx.OpID = mig.GenID()
err := periodic(ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("period routine failed with error '%v'", err)}.Err()
}
time.Sleep(periodicSleeper)
}
}()
ctx.Channels.Log <- mig.Log{Desc: "periodic routine started"}
// launch the routine that cleans up unused amqp queues
go func() {
sleeper, err := time.ParseDuration(ctx.Periodic.QueuesCleanupFreq)
if err != nil {
panic(err)
}
for {
ctx.OpID = mig.GenID()
err = QueuesCleanup(ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("queues cleanup routine failed with error '%v'", err)}.Err()
}
time.Sleep(sleeper)
}
}()
ctx.Channels.Log <- mig.Log{Desc: "queue cleanup routine started"}
// launch the routine that handles multi agents on same queue
go func() {
for queueLoc := range ctx.Channels.DetectDupAgents {