Revert scheduler back to normal operations so we can do a full deploy

This commit is contained in:
Zack Mullaly 2018-11-26 15:44:08 -05:00
Родитель 5b386c1a29
Коммит deb4aba083
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 1486642516ED3535
5 изменённых файлов: 208 добавлений и 0 удалений

Просмотреть файл

@ -6,13 +6,167 @@
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/mozilla/mig"
"github.com/streadway/amqp"
)
// startHeartbeatsListener initializes the routine that receives heartbeats from agents
func startHeartbeatsListener(ctx Context) (heartbeatChan <-chan amqp.Delivery, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("startHeartbeatsListener() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving startHeartbeatsListener()"}.Debug()
}()
_, err = ctx.MQ.Chan.QueueDeclare(mig.QueueAgentHeartbeat, true, false, false, false, nil)
if err != nil {
panic(err)
}
err = ctx.MQ.Chan.QueueBind(mig.QueueAgentHeartbeat, mig.QueueAgentHeartbeat, mig.ExchangeToSchedulers, false, nil)
if err != nil {
panic(err)
}
err = ctx.MQ.Chan.Qos(0, 0, false)
if err != nil {
panic(err)
}
heartbeatChan, err = ctx.MQ.Chan.Consume(mig.QueueAgentHeartbeat, "", true, false, false, false, nil)
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "agents heartbeats listener initialized"}
return
}
// getHeartbeats processes the heartbeat messages sent by agents
func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("getHeartbeats() -> %v", e)
}
if ctx.Debug.Heartbeats {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving getHeartbeats()"}.Debug()
}
}()
// a normal heartbeat should be between 500 and 5000 characters, some may be larger if
// large environments are collected, so we fix the upper limit to 100kB
if len(msg.Body) > 102400 {
panic("discarded heartbeat larger than 100kB")
}
var agt mig.Agent
err = json.Unmarshal(msg.Body, &agt)
if err != nil {
panic(err)
}
if ctx.Debug.Heartbeats {
desc := fmt.Sprintf("Received heartbeat for Agent '%s' QueueLoc '%s'", agt.Name, agt.QueueLoc)
ctx.Channels.Log <- mig.Log{Desc: desc}.Debug()
}
// discard expired heartbeats
agtTimeOut, err := time.ParseDuration(ctx.Agent.TimeOut)
if err != nil {
panic(err)
}
expirationDate := time.Now().Add(-agtTimeOut)
if agt.HeartBeatTS.Before(expirationDate) {
desc := fmt.Sprintf("Expired heartbeat received from Agent '%s'", agt.Name)
ctx.Channels.Log <- mig.Log{Desc: desc}.Notice()
return
}
// replace the heartbeat with current time
agt.HeartBeatTS = time.Now()
// do some sanity checking
if agt.Mode != "" && agt.Mode != "daemon" && agt.Mode != "checkin" {
panic(fmt.Sprintf("invalid mode '%s' received from agent '%s'", agt.Mode, agt.QueueLoc))
}
if len(agt.Name) > 1024 {
panic(fmt.Sprintf("agent name longer than 1024 characters: name '%s' from '%s'", agt.Name, agt.QueueLoc))
}
if len(agt.Version) > 128 {
panic(fmt.Sprintf("agent version longer than 128 characters: version '%s' from '%s'", agt.Version, agt.QueueLoc))
}
// if agent is not authorized, ack the message and skip the registration
// nothing is returned to the agent. it's simply ignored.
ok, err := isAgentAuthorized(agt.QueueLoc, ctx)
if err != nil {
panic(err)
}
if !ok {
desc := fmt.Sprintf("getHeartbeats(): Agent '%s' is not authorized", agt.QueueLoc)
ctx.Channels.Log <- mig.Log{Desc: desc}.Warning()
// agent authorization failed so we drop this heartbeat and return
return
}
// write to database in a goroutine to avoid blocking
go func() {
// if an agent already exists in database, we update it, otherwise we insert it
agent, err := ctx.DB.AgentByQueueAndPID(agt.QueueLoc, agt.PID)
if err != nil {
agt.DestructionTime = time.Date(9998, time.January, 11, 11, 11, 11, 11, time.UTC)
agt.Status = mig.AgtStatusOnline
// create a new agent, set starttime to now
agt.StartTime = time.Now()
err = ctx.DB.InsertAgent(agt, nil)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB insertion failed with error '%v' for agent '%s'", err, agt.Name)}.Err()
}
} else {
// the agent exists in database. reuse the existing ID, and keep the status if it was
// previously set to destroyed, otherwise set status to online
agt.ID = agent.ID
if agt.Status == mig.AgtStatusDestroyed {
agt.Status = agent.Status
} else {
agt.Status = mig.AgtStatusOnline
}
// If the refresh time is newer than what we know for the agent, replace
// the agent in the database with the newer information. We want to keep
// history here, so don't want to just update the information in the
// existing row.
//
// Note: with older agents which might not send a refresh time, the refresh
// time will be interpreted as the zero value, and the agents should just
// update using UpdateAgentHeartbeat()
if agt.RefreshTS.IsZero() {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("agent '%v' not sending refresh time, perhaps an older version?", agt.Name)}.Warning()
}
cutoff := agent.RefreshTS.Add(15 * time.Second)
if !agt.RefreshTS.IsZero() && agt.RefreshTS.After(cutoff) {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("replacing refreshed agent for agent '%v'", agt.Name)}.Info()
err = ctx.DB.ReplaceRefreshedAgent(agt)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB update failed (refresh) with error '%v' for agent '%s'", err, agt.Name)}.Err()
}
} else {
err = ctx.DB.UpdateAgentHeartbeat(agt)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB update failed with error '%v' for agent '%s'", err, agt.Name)}.Err()
}
}
// if the agent that exists in the database has a status of 'destroyed'
// we should not be received a heartbeat from it. so, if detectmultiagents
// is set in the scheduler configuration, we pass the agent queue over to the
// routine than handles the destruction of agents
if agent.Status == mig.AgtStatusDestroyed && ctx.Agent.DetectMultiAgents {
ctx.Channels.DetectDupAgents <- agent.QueueLoc
}
}
}()
return
}
// startResultsListener initializes the routine that receives heartbeats from agents
func startResultsListener(ctx Context) (resultsChan <-chan amqp.Delivery, err error) {
defer func() {

Просмотреть файл

@ -8,6 +8,7 @@ package main
import (
"bufio"
"fmt"
"github.com/mozilla/mig"
"os"
"regexp"
)
@ -18,10 +19,16 @@ func isAgentAuthorized(agentQueueLoc string, ctx Context) (ok bool, err error) {
if e := recover(); e != nil {
err = fmt.Errorf("isAgentAuthorized() -> %v", e)
}
if ctx.Debug.Heartbeats {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving isAgentAuthorized()"}.Debug()
}
}()
var re *regexp.Regexp
// bypass mode if there's no whitelist in the conf
if ctx.Agent.Whitelist == "" {
if ctx.Debug.Heartbeats {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "Agent authorization checking is disabled, all agents are authorized"}.Debug()
}
ok = true
return
}
@ -43,11 +50,17 @@ func isAgentAuthorized(agentQueueLoc string, ctx Context) (ok bool, err error) {
panic(err)
}
if re.MatchString(agentQueueLoc) {
if ctx.Debug.Heartbeats {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("Agent '%s' is authorized", agentQueueLoc)}.Debug()
}
ok = true
return
}
} else {
if scanner.Text() == agentQueueLoc {
if ctx.Debug.Heartbeats {
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("Agent '%s' is authorized", agentQueueLoc)}.Debug()
}
ok = true
return
}

Просмотреть файл

@ -86,6 +86,7 @@ type Context struct {
}
Logging mig.Logging
Debug struct {
Heartbeats bool
}
}

Просмотреть файл

@ -37,6 +37,10 @@ func periodic(ctx Context) (err error) {
if err != nil {
panic(err)
}
err = markIdleAgents(ctx)
if err != nil {
panic(err)
}
err = computeAgentsStats(ctx)
if err != nil {
panic(err)
@ -107,6 +111,26 @@ func markOfflineAgents(ctx Context) (err error) {
return
}
// markIdleAgents updates the status of agents that stopped sending heartbeats
func markIdleAgents(ctx Context) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("markIdleAgents() -> %v", e)
}
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving markIdleAgents()"}.Debug()
}()
hbFreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
if err != nil {
panic(err)
}
pointInTime := time.Now().Add(-hbFreq * 5)
err = ctx.DB.MarkIdleAgents(pointInTime)
if err != nil {
panic(err)
}
return
}
// save time of last hourly run
var countNewEndpointsHourly time.Time

Просмотреть файл

@ -163,6 +163,22 @@ func startRoutines(ctx Context) {
}()
ctx.Channels.Log <- mig.Log{Desc: "updateAction() routine started"}
// start a listening channel to receive heartbeats from agents
heartbeatsChan, err := startHeartbeatsListener(ctx)
if err != nil {
panic(err)
}
go func() {
for msg := range heartbeatsChan {
ctx.OpID = mig.GenID()
err := getHeartbeats(msg, ctx)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("heartbeat routine failed with error '%v'", err)}.Err()
}
}
}()
ctx.Channels.Log <- mig.Log{Desc: "agents heartbeats listener routine started"}
// start a listening channel to results from agents
agtResultsChan, err := startResultsListener(ctx)
if err != nil {