[major] remove old upgrade module code from scheduler

This commit is contained in:
Aaron Meihm 2016-04-13 20:40:18 -05:00
Родитель 5aac4769b0
Коммит ce85886a8e
3 изменённых файлов: 17 добавлений и 95 удалений

Просмотреть файл

@ -9,81 +9,18 @@ import (
"encoding/json"
"fmt"
"mig.ninja/mig"
"reflect"
"time"
)
// Check the action that was processed, and if it's related to upgrading agents
// extract the command results, grab the PID of the agents that was upgraded,
// and mark the agent registration in the database as 'upgraded'
func markUpgradedAgents(cmd mig.Command, ctx Context) {
var err error
defer func() {
if e := recover(); e != nil {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID, Desc: fmt.Sprintf("markUpgradedAgents() failed with error '%v'", e)}.Err()
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID, Desc: "leaving markUpgradedAgents()"}.Debug()
}()
for _, operation := range cmd.Action.Operations {
if operation.Module == "upgrade" {
for _, result := range cmd.Results {
reflection := reflect.ValueOf(result)
resultMap := reflection.Interface().(map[string]interface{})
_, ok := resultMap["success"]
if !ok {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: "Invalid operation results format. Missing 'success' key."}.Err()
panic(err)
}
success := reflect.ValueOf(resultMap["success"])
if !success.Bool() {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: "Upgrade operation failed. Agent not marked."}.Err()
panic(err)
}
_, ok = resultMap["oldpid"]
if !ok {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: "Invalid operation results format. Missing 'oldpid' key."}.Err()
panic(err)
}
oldpid := reflect.ValueOf(resultMap["oldpid"])
if oldpid.Float() < 2 || oldpid.Float() > 65535 {
desc := fmt.Sprintf("Successfully found upgraded action on agent '%s', but with PID '%s'. That's not right...",
cmd.Agent.Name, oldpid)
ctx.Channels.Log <- mig.Log{Desc: desc}.Err()
panic(desc)
}
// update the agent's registration to mark it as upgraded
agent, err := ctx.DB.AgentByQueueAndPID(cmd.Agent.QueueLoc, int(oldpid.Float()))
if err != nil {
panic(err)
}
err = ctx.DB.MarkAgentUpgraded(agent)
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, ActionID: cmd.Action.ID, CommandID: cmd.ID,
Desc: fmt.Sprintf("Agent '%s' marked as upgraded", cmd.Agent.Name)}.Info()
}
}
}
return
}
// killDupAgents takes a number of actions when several agents are found
// to be running on the same endpoint. If killdupagents is set in the
// scheduler configuration, it will kill multiple agents running on the
// same endpoint. If killdupagents is not set, it will only kill agents as
// part of the upgrade protocol, when a given agent has been marked as upgraded
// in the database.
// Given an agent queue location queueLoc, send kill actions for duplicate
// agents. Where multiple agents exist on the same host, we will attempt
// to kill the older agents.
func killDupAgents(queueLoc string, ctx Context) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("inspectMultiAgents() -> %v", e)
err = fmt.Errorf("killDupAgents() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving inspectMultiAgents()"}.Debug()
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving killDupAgents()"}.Debug()
}()
hbfreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
if err != nil {
@ -98,17 +35,7 @@ func killDupAgents(queueLoc string, ctx Context) (err error) {
destroyedAgents := 0
leftAloneAgents := 0
for _, agent := range agents {
switch agent.Status {
case "upgraded":
// upgraded agents must die
err = issueKillAction(agent, ctx)
if err != nil {
panic(err)
}
destroyedAgents++
desc := fmt.Sprintf("Agent '%s' with PID '%d' has been upgraded and will be destroyed.", agent.Name, agent.PID)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Debug()
case "destroyed":
if agent.Status == "destroyed" {
// if the agent has already been marked as destroyed, check if
// that was done longer than 3 heartbeats ago. If it did, the
// destruction failed, and we need to reissue a destruction order
@ -123,7 +50,8 @@ func killDupAgents(queueLoc string, ctx Context) (err error) {
panic(err)
}
destroyedAgents++
desc := fmt.Sprintf("Re-issuing destruction action for agent '%s' with PID '%d'.", agent.Name, agent.PID)
desc := fmt.Sprintf("Re-issuing destruction action for "+
"agent '%s' with PID '%d'.", agent.Name, agent.PID)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Debug()
} else {
leftAloneAgents++
@ -144,7 +72,8 @@ func killDupAgents(queueLoc string, ctx Context) (err error) {
oldest = agent
}
}
desc := fmt.Sprintf("Issuing destruction action for agent '%s' with PID '%d'.", oldest.Name, oldest.PID)
desc := fmt.Sprintf("Issuing destruction action for agent '%s' "+
"with PID '%d'.", oldest.Name, oldest.PID)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}
err = issueKillAction(oldest, ctx)
if err != nil {
@ -153,14 +82,15 @@ func killDupAgents(queueLoc string, ctx Context) (err error) {
// throttling to prevent issuing too many kill orders at the same time
time.Sleep(5 * time.Second)
} else {
desc := fmt.Sprintf("found '%d' agents running on '%s'. Require manual inspection.", remainingAgents, queueLoc)
desc := fmt.Sprintf("found '%d' agents running on '%s'. Require "+
"manual inspection.", remainingAgents, queueLoc)
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: desc}.Warning()
}
}
return
}
// issueKillAction issues an `agentdestroy` action targetted to a specific agent
// issueKillAction issues an `agentdestroy` action targeted to a specific agent
// and updates the status of the agent in the database
func issueKillAction(agent mig.Agent, ctx Context) (err error) {
defer func() {
@ -218,6 +148,7 @@ func issueKillAction(agent mig.Agent, ctx Context) (err error) {
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("issued kill action for agent '%s' with PID '%d'", agent.Name, agent.PID)}.Warning()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("issued kill action for agent '%s' "+
"with PID '%d'", agent.Name, agent.PID)}.Warning()
return
}

Просмотреть файл

@ -133,9 +133,9 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
}
} else {
// the agent exists in database. reuse the existing ID, and keep the status if it was
// previously set to destroyed or upgraded. otherwise set status to online
// previously set to destroyed, otherwise set status to online
agt.ID = agent.ID
if agent.Status == mig.AgtStatusDestroyed || agent.Status == mig.AgtStatusUpgraded {
if agt.Status == mig.AgtStatusDestroyed {
agt.Status = agent.Status
} else {
agt.Status = mig.AgtStatusOnline

Просмотреть файл

@ -306,15 +306,6 @@ func updateAction(cmds []mig.Command, ctx Context) (err error) {
// store action in the map
actions[a.ID] = a
// slightly unrelated to updating the action:
// in case the action is about upgrading agents, do some magical stuff
// to continue the upgrade protocol
if cmd.Status == mig.StatusSuccess && len(a.Operations) > 0 {
if a.Operations[0].Module == "upgrade" {
go markUpgradedAgents(cmd, ctx)
}
}
}
for _, a := range actions {
a.Counters, err = ctx.DB.GetActionCounters(a.ID)