зеркало из https://github.com/mozilla/mig.git
[medium] move multi agents detection to periodic job in scheduler
This commit is contained in:
Родитель
04e822d20a
Коммит
dafa769e79
|
@ -137,6 +137,31 @@ func (db *DB) InsertOrUpdateAgent(agt mig.Agent) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
// ListMultiAgentsQueues retrieves an array of queues that have more than one active agent
|
||||
func (db *DB) ListMultiAgentsQueues(pointInTime time.Time) (queues []string, err error) {
|
||||
rows, err := db.c.Query(`SELECT queueloc FROM agents
|
||||
WHERE heartbeattime > $1 AND mode != 'checkin'
|
||||
GROUP BY queueloc HAVING COUNT(queueloc) > 1`, pointInTime)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Error while listing multi agents queues: '%v'", err)
|
||||
return
|
||||
}
|
||||
for rows.Next() {
|
||||
var q string
|
||||
err = rows.Scan(&q)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
err = fmt.Errorf("Failed to retrieve agent queue: '%v'", err)
|
||||
return
|
||||
}
|
||||
queues = append(queues, q)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
err = fmt.Errorf("Failed to complete database query: '%v'", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ActiveAgentsByQueue retrieves an array of agents identified by their QueueLoc value
|
||||
func (db *DB) ActiveAgentsByQueue(queueloc string, pointInTime time.Time) (agents []mig.Agent, err error) {
|
||||
rows, err := db.c.Query(`SELECT id, name, queueloc, mode, version, pid, starttime, heartbeattime, status
|
||||
|
|
|
@ -72,18 +72,26 @@ func markUpgradedAgents(cmd mig.Command, ctx Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// inspectMultiAgents takes a number of actions when several agents are found
|
||||
// to be listening on the same queue. It will trigger an agentdestroy action
|
||||
// for agents that are flagged as upgraded, and log alerts for agents that
|
||||
// are not, such that an investigator can look at them.
|
||||
func inspectMultiAgents(queueLoc string, ctx Context) (err error) {
|
||||
// killDupAgents takes a number of actions when several agents are found
|
||||
// to be running on the same endpoint. If killdupagents is set in the
|
||||
// scheduler configuration, it will kill multiple agents running on the
|
||||
// same endpoint. If killdupagents is not set, it will only kill agents as
|
||||
// part of the upgrade protocol, when a given agent has been marked as upgraded
|
||||
// in the database.
|
||||
func killDupAgents(queueLoc string, ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("inspectMultiAgents() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving inspectMultiAgents()"}.Debug()
|
||||
}()
|
||||
agentsCount, agents, err := findDupAgents(queueLoc, ctx)
|
||||
hbfreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pointInTime := time.Now().Add(-hbfreq)
|
||||
agents, err := ctx.DB.ActiveAgentsByQueue(queueLoc, pointInTime)
|
||||
agentsCount := len(agents)
|
||||
if agentsCount < 2 {
|
||||
return
|
||||
}
|
||||
|
@ -93,7 +101,7 @@ func inspectMultiAgents(queueLoc string, ctx Context) (err error) {
|
|||
switch agent.Status {
|
||||
case "upgraded":
|
||||
// upgraded agents must die
|
||||
err = destroyAgent(agent, ctx)
|
||||
err = issueKillAction(agent, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -110,7 +118,7 @@ func inspectMultiAgents(queueLoc string, ctx Context) (err error) {
|
|||
}
|
||||
pointInTime := time.Now().Add(-hbFreq * 3)
|
||||
if agent.DestructionTime.Before(pointInTime) {
|
||||
err = destroyAgent(agent, ctx)
|
||||
err = issueKillAction(agent, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -138,33 +146,33 @@ func inspectMultiAgents(queueLoc string, ctx Context) (err error) {
|
|||
}
|
||||
desc := fmt.Sprintf("Issuing destruction action for agent '%s' with PID '%d'.", oldest.Name, oldest.PID)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}
|
||||
err = destroyAgent(oldest, ctx)
|
||||
err = issueKillAction(oldest, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// throttling to prevent issuing too many kill orders at the same time
|
||||
time.Sleep(5 * time.Second)
|
||||
} else {
|
||||
desc := fmt.Sprintf("Found '%d' agents running on '%s'. Require manual inspection.", remainingAgents, queueLoc)
|
||||
desc := fmt.Sprintf("found '%d' agents running on '%s'. Require manual inspection.", remainingAgents, queueLoc)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Warning()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// destroyAgent issues an `agentdestroy` action targetted to a specific agent
|
||||
// issueKillAction issues an `agentdestroy` action targetted to a specific agent
|
||||
// and updates the status of the agent in the database
|
||||
func destroyAgent(agent mig.Agent, ctx Context) (err error) {
|
||||
func issueKillAction(agent mig.Agent, ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("destroyAgent() -> %v", e)
|
||||
err = fmt.Errorf("issueKillAction() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving destroyAgent()"}.Debug()
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving issueKillAction()"}.Debug()
|
||||
}()
|
||||
// generate an `agentdestroy` action for this agent
|
||||
killAction := mig.Action{
|
||||
ID: mig.GenID(),
|
||||
Name: fmt.Sprintf("Destroy agent %s", agent.Name),
|
||||
Name: fmt.Sprintf("Kill agent %s", agent.Name),
|
||||
Target: fmt.Sprintf("queueloc='%s'", agent.QueueLoc),
|
||||
ValidFrom: time.Now().Add(-60 * time.Second).UTC(),
|
||||
ExpireAfter: time.Now().Add(30 * time.Minute).UTC(),
|
||||
|
@ -210,6 +218,6 @@ func destroyAgent(agent mig.Agent, ctx Context) (err error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Requested destruction of agent '%s' with PID '%d'", agent.Name, agent.PID)}.Info()
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("issued kill action for agent '%s' with PID '%d'", agent.Name, agent.PID)}.Warning()
|
||||
return
|
||||
}
|
|
@ -98,46 +98,9 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
|
|||
}
|
||||
}()
|
||||
|
||||
// If multiple agents are active at the same time, alert the cleanup routine
|
||||
if ctx.Agent.DetectMultiAgents {
|
||||
go func() {
|
||||
agtCnt, _, err := findDupAgents(agt.QueueLoc, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if agtCnt > 1 {
|
||||
ctx.Channels.DetectDupAgents <- agt.QueueLoc
|
||||
}
|
||||
}()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// findDupAgents counts agents that are listening on a given queue and
|
||||
// have sent a heartbeat in recent times, to detect systems that are running
|
||||
// two or more agents
|
||||
func findDupAgents(queueloc string, ctx Context) (count int, agents []mig.Agent, err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("findDupAgents() -> %v", e)
|
||||
}
|
||||
if ctx.Debug.Heartbeats {
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving findDupAgents()"}.Debug()
|
||||
}
|
||||
}()
|
||||
// retrieve agents that have sent in heartbeat in twice their heartbeat time
|
||||
timeOutPeriod, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pointInTime := time.Now().Add(-timeOutPeriod * 2)
|
||||
agents, err = ctx.DB.ActiveAgentsByQueue(queueloc, pointInTime)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return len(agents), agents, err
|
||||
}
|
||||
|
||||
// startResultsListener initializes the routine that receives heartbeats from agents
|
||||
func startResultsListener(ctx Context) (resultsChan <-chan amqp.Delivery, err error) {
|
||||
defer func() {
|
||||
|
|
|
@ -44,6 +44,10 @@ func periodic(ctx Context) (err error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = detectMultiAgents(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -233,3 +237,33 @@ func computeAgentsStats(ctx Context) (err error) {
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
// detectMultiAgents lists endpoint queues that are running more than one agent, and sends
|
||||
// the queues names to a channel were destruction orders can be emitted to shut down
|
||||
// duplication agents
|
||||
func detectMultiAgents(ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("detectMultiAgents() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving detectMultiAgents()"}.Debug()
|
||||
}()
|
||||
// if detectmultiagents is not set in the scheduler configuration, do nothing
|
||||
if !ctx.Agent.DetectMultiAgents {
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "detectMultiAgents is not activated. skipping"}.Debug()
|
||||
return
|
||||
}
|
||||
hbfreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pointInTime := time.Now().Add(-hbfreq)
|
||||
queues, err := ctx.DB.ListMultiAgentsQueues(pointInTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, q := range queues {
|
||||
ctx.Channels.DetectDupAgents <- q
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -255,16 +255,18 @@ func startRoutines(ctx Context) {
|
|||
ctx.Channels.Log <- mig.Log{Desc: "queue cleanup routine started"}
|
||||
|
||||
// launch the routine that handles multi agents on same queue
|
||||
go func() {
|
||||
for queueLoc := range ctx.Channels.DetectDupAgents {
|
||||
ctx.OpID = mig.GenID()
|
||||
err = inspectMultiAgents(queueLoc, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
if ctx.Agent.KillDupAgents {
|
||||
go func() {
|
||||
for queueLoc := range ctx.Channels.DetectDupAgents {
|
||||
ctx.OpID = mig.GenID()
|
||||
err = killDupAgents(queueLoc, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "inspectMultiAgents() routine started"}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "killDupAgents() routine started"}
|
||||
}
|
||||
|
||||
// block here until a terminate message is received
|
||||
exitReason := <-ctx.Channels.Terminate
|
||||
|
|
Загрузка…
Ссылка в новой задаче