From 98d4eac1c5b5d5b9618c28d6ea2b8e121935735b Mon Sep 17 00:00:00 2001 From: Zack Mullaly Date: Mon, 19 Nov 2018 15:54:41 -0500 Subject: [PATCH] Only insert a new agent when we find out about one. Otherwise update agents in place when we deal with a heartbeat --- mig-api/agents/persist_heartbeat_postgres.go | 83 +++++--------------- 1 file changed, 18 insertions(+), 65 deletions(-) diff --git a/mig-api/agents/persist_heartbeat_postgres.go b/mig-api/agents/persist_heartbeat_postgres.go index ff135e3a..7c51231c 100644 --- a/mig-api/agents/persist_heartbeat_postgres.go +++ b/mig-api/agents/persist_heartbeat_postgres.go @@ -9,6 +9,7 @@ package agents import ( "fmt" + "github.com/mozilla/mig" migdb "github.com/mozilla/mig/database" ) @@ -23,74 +24,26 @@ func NewPersistHeartbeatPostgres(db *migdb.DB) PersistHeartbeatPostgres { } func (persist PersistHeartbeatPostgres) PersistHeartbeat(heartbeat Heartbeat) error { - fmt.Printf("POST /heartbeat got heartbeat %v\n", heartbeat) + agent, err := persist.db.AgentByQueueAndPID( + heartbeat.QueueLoc, + int(heartbeat.PID)) - agent := heartbeat.ToMigAgent() - //err := persist.db.InsertAgent(agent, nil) - agent, err := persist.db.AgentByQueueAndPID(agent.QueueLoc, agent.PID) + // If the agent doesn't exist, we want to record it as a new row. + // This is basically the case where an agent reports itself as operating + // for the first time. if err != nil { - return err + agent.DestructionTime = time.Date(9998, time.January, 11, 11, 11, 11, 11, time.UTC) + agent.Status = mig.AgtStatusOnline + agent.StartTime = time.Now() + return persist.db.InsertAgent(agent, nil) + } + + agent.Status = mig.AgtStatusOnline + + cutoff := agent.RefreshTS.Add(15 * time.Second) + if !agent.RefreshTS.IsZero() && agent.RefreshTS.After(cutoff) { + return persist.db.ReplaceRefreshedAgent(agent) } return persist.db.UpdateAgentHeartbeat(agent) } - -// _dontrun invokes a goroutine that updates the agent table when a heartbeat -// message would have been handled by the scheduler. For now we're holding onto -// the code as a reference -func _dontrun() { - go func() { - // if an agent already exists in database, we update it, otherwise we insert it - agent, err := ctx.DB.AgentByQueueAndPID(agt.QueueLoc, agt.PID) - if err != nil { - agt.DestructionTime = time.Date(9998, time.January, 11, 11, 11, 11, 11, time.UTC) - agt.Status = mig.AgtStatusOnline - // create a new agent, set starttime to now - agt.StartTime = time.Now() - err = ctx.DB.InsertAgent(agt, nil) - if err != nil { - ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB insertion failed with error '%v' for agent '%s'", err, agt.Name)}.Err() - } - } else { - // the agent exists in database. reuse the existing ID, and keep the status if it was - // previously set to destroyed, otherwise set status to online - agt.ID = agent.ID - if agt.Status == mig.AgtStatusDestroyed { - agt.Status = agent.Status - } else { - agt.Status = mig.AgtStatusOnline - } - // If the refresh time is newer than what we know for the agent, replace - // the agent in the database with the newer information. We want to keep - // history here, so don't want to just update the information in the - // existing row. - // - // Note: with older agents which might not send a refresh time, the refresh - // time will be interpreted as the zero value, and the agents should just - // update using UpdateAgentHeartbeat() - if agt.RefreshTS.IsZero() { - ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("agent '%v' not sending refresh time, perhaps an older version?", agt.Name)}.Warning() - } - cutoff := agent.RefreshTS.Add(15 * time.Second) - if !agt.RefreshTS.IsZero() && agt.RefreshTS.After(cutoff) { - ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("replacing refreshed agent for agent '%v'", agt.Name)}.Info() - err = ctx.DB.ReplaceRefreshedAgent(agt) - if err != nil { - ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB update failed (refresh) with error '%v' for agent '%s'", err, agt.Name)}.Err() - } - } else { - err = ctx.DB.UpdateAgentHeartbeat(agt) - if err != nil { - ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB update failed with error '%v' for agent '%s'", err, agt.Name)}.Err() - } - } - // if the agent that exists in the database has a status of 'destroyed' - // we should not be received a heartbeat from it. so, if detectmultiagents - // is set in the scheduler configuration, we pass the agent queue over to the - // routine than handles the destruction of agents - if agent.Status == mig.AgtStatusDestroyed && ctx.Agent.DetectMultiAgents { - ctx.Channels.DetectDupAgents <- agent.QueueLoc - } - } - }() -}