[medium] introduce periodic agent environment refresh

It's possible over the course of the process lifetime on a workstation
that address or other environment information can change. This adds
support for periodic refreshes of current environment information from
the system.

This commit also requires modifying the way we pass the Context around
the agent, from a copy to using a pointer so the changes made in the
refresh routine are visible.

Closes #205
This commit is contained in:
Aaron Meihm 2016-04-08 15:33:50 -05:00
Родитель 46a23de6b6
Коммит a84474ad9e
12 изменённых файлов: 279 добавлений и 62 удалений

Просмотреть файл

@ -28,6 +28,7 @@ type Agent struct {
StartTime time.Time `json:"starttime,omitempty"`
DestructionTime time.Time `json:"destructiontime,omitempty"`
HeartBeatTS time.Time `json:"heartbeatts,omitempty"`
RefreshTS time.Time `json:"refreshts,omitempty"`
Status string `json:"status,omitempty"`
Authorized bool `json:"authorized,omitempty"`
Env AgentEnv `json:"environment,omitempty"`

Просмотреть файл

@ -34,6 +34,10 @@ var DISCOVERAWSMETA = true
// and exits. this mode is used to run the agent as a cron job, not a daemon.
var CHECKIN = false
// how often the agent will refresh its environment. if 0 agent
// will only update environment at initialization.
var REFRESHENV time.Duration = 0
var LOGGINGCONF = mig.Logging{
Mode: "stdout", // stdout | file | syslog
Level: "debug", // debug | info | ...

Просмотреть файл

@ -31,6 +31,13 @@
; and exits. this mode is used to run the agent as a cron job, not a daemon.
checkin = off
; how often should the agent refresh its own environment (for example, to update
; the environment if the system changes ip addresses. leave unset or "" to disable
; periodic environment refresh. on workstations or other devices that may be
; obtaining dhcp leases and moving around, you might want to start with something
; like "5m"
refreshenv = ""
[certs]
ca = "/path/to/ca/cert"
cert= "/path/to/client/cert"

Просмотреть файл

@ -20,10 +20,10 @@ import (
// AgentByQueueAndPID returns a single agent that is located at a given queueloc and has a given PID
func (db *DB) AgentByQueueAndPID(queueloc string, pid int) (agent mig.Agent, err error) {
err = db.c.QueryRow(`SELECT id, name, queueloc, mode, version, pid, starttime, heartbeattime,
status FROM agents WHERE queueloc=$1 AND pid=$2 AND status!=$3`,
refreshtime, status FROM agents WHERE queueloc=$1 AND pid=$2 AND status!=$3`,
queueloc, pid, mig.AgtStatusOffline).Scan(
&agent.ID, &agent.Name, &agent.QueueLoc, &agent.Mode, &agent.Version, &agent.PID,
&agent.StartTime, &agent.HeartBeatTS, &agent.Status)
&agent.StartTime, &agent.HeartBeatTS, &agent.RefreshTS, &agent.Status)
if err != nil {
err = fmt.Errorf("Error while retrieving agent: '%v'", err)
return
@ -38,9 +38,10 @@ func (db *DB) AgentByQueueAndPID(queueloc string, pid int) (agent mig.Agent, err
func (db *DB) AgentByID(id float64) (agent mig.Agent, err error) {
var jTags, jEnv []byte
err = db.c.QueryRow(`SELECT id, name, queueloc, mode, version, pid, starttime, heartbeattime,
status, tags, environment FROM agents WHERE id=$1`, id).Scan(
refreshtime, status, tags, environment FROM agents WHERE id=$1`, id).Scan(
&agent.ID, &agent.Name, &agent.QueueLoc, &agent.Mode, &agent.Version, &agent.PID,
&agent.StartTime, &agent.HeartBeatTS, &agent.Status, &jTags, &jEnv)
&agent.StartTime, &agent.HeartBeatTS, &agent.RefreshTS, &agent.Status,
&jTags, &jEnv)
if err != nil {
err = fmt.Errorf("Error while retrieving agent: '%v'", err)
return
@ -90,7 +91,10 @@ func (db *DB) AgentsActiveSince(pointInTime time.Time) (agents []mig.Agent, err
}
// InsertAgent creates a new agent in the database
func (db *DB) InsertAgent(agt mig.Agent) (err error) {
//
// If useTx is not nil, the transaction will be used instead of the standard
// connection
func (db *DB) InsertAgent(agt mig.Agent, useTx *sql.Tx) (err error) {
jEnv, err := json.Marshal(agt.Env)
if err != nil {
err = fmt.Errorf("Failed to marshal agent environment: '%v'", err)
@ -102,12 +106,23 @@ func (db *DB) InsertAgent(agt mig.Agent) (err error) {
return
}
agtid := mig.GenID()
if useTx != nil {
_, err = useTx.Exec(`INSERT INTO agents
(id, name, queueloc, mode, version, pid, starttime, destructiontime,
heartbeattime, refreshtime, status, environment, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`,
agtid, agt.Name, agt.QueueLoc, agt.Mode, agt.Version, agt.PID,
agt.StartTime, agt.DestructionTime, agt.HeartBeatTS, agt.RefreshTS,
agt.Status, jEnv, jTags)
} else {
_, err = db.c.Exec(`INSERT INTO agents
(id, name, queueloc, mode, version, pid, starttime, destructiontime,
heartbeattime, status, environment, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
heartbeattime, refreshtime, status, environment, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`,
agtid, agt.Name, agt.QueueLoc, agt.Mode, agt.Version, agt.PID,
agt.StartTime, agt.DestructionTime, agt.HeartBeatTS, agt.Status, jEnv, jTags)
agt.StartTime, agt.DestructionTime, agt.HeartBeatTS, agt.RefreshTS,
agt.Status, jEnv, jTags)
}
if err != nil {
return fmt.Errorf("Failed to insert agent in database: '%v'", err)
}
@ -125,6 +140,35 @@ func (db *DB) UpdateAgentHeartbeat(agt mig.Agent) (err error) {
return
}
// Replace an existing agent in the database with newer environment information. This
// should be called when we receive a heartbeat for an active agent, but the refresh
// time indicates newer environment information exists.
func (db *DB) ReplaceRefreshedAgent(agt mig.Agent) (err error) {
// Do this in a transaction to ensure other parts of the scheduler don't
// pick up invalid information
tx, err := db.c.Begin()
if err != nil {
return
}
_, err = tx.Exec(`UPDATE agents SET status=$1 WHERE id=$2`,
mig.AgtStatusOffline, agt.ID)
if err != nil {
_ = tx.Rollback()
return
}
err = db.InsertAgent(agt, tx)
if err != nil {
_ = tx.Rollback()
return
}
err = tx.Commit()
if err != nil {
_ = tx.Rollback()
return
}
return
}
// ListMultiAgentsQueues retrieves an array of queues that have more than one active agent
func (db *DB) ListMultiAgentsQueues(pointInTime time.Time) (queues []string, err error) {
rows, err := db.c.Query(`SELECT queueloc FROM agents
@ -154,8 +198,10 @@ func (db *DB) ListMultiAgentsQueues(pointInTime time.Time) (queues []string, err
// ActiveAgentsByQueue retrieves an array of agents identified by their QueueLoc value
func (db *DB) ActiveAgentsByQueue(queueloc string, pointInTime time.Time) (agents []mig.Agent, err error) {
rows, err := db.c.Query(`SELECT id, name, queueloc, mode, version, pid, starttime, heartbeattime, status
FROM agents WHERE agents.heartbeattime > $1 AND agents.queueloc=$2`, pointInTime, queueloc)
rows, err := db.c.Query(`SELECT id, name, queueloc, mode, version, pid, starttime,
heartbeattime, refreshtime, status
FROM agents WHERE agents.heartbeattime > $1 AND agents.queueloc=$2`,
pointInTime, queueloc)
if rows != nil {
defer rows.Close()
}
@ -166,7 +212,8 @@ func (db *DB) ActiveAgentsByQueue(queueloc string, pointInTime time.Time) (agent
for rows.Next() {
var agent mig.Agent
err = rows.Scan(&agent.ID, &agent.Name, &agent.QueueLoc, &agent.Mode, &agent.Version,
&agent.PID, &agent.StartTime, &agent.HeartBeatTS, &agent.Status)
&agent.PID, &agent.StartTime, &agent.HeartBeatTS,
&agent.RefreshTS, &agent.Status)
if err != nil {
err = fmt.Errorf("Failed to retrieve agent data: '%v'", err)
return
@ -199,7 +246,8 @@ func (db *DB) ActiveAgentsByTarget(target string) (agents []mig.Agent, err error
return
}
rows, err := txn.Query(fmt.Sprintf(`SELECT DISTINCT ON (queueloc) id, name, queueloc,
version, pid, starttime, destructiontime, heartbeattime, status, mode, environment, tags
version, pid, starttime, destructiontime, heartbeattime, refreshtime, status,
mode, environment, tags
FROM agents WHERE agents.status IN ('%s', '%s') AND (%s)
ORDER BY agents.queueloc ASC`, mig.AgtStatusOnline, mig.AgtStatusIdle, target))
if rows != nil {
@ -214,7 +262,7 @@ func (db *DB) ActiveAgentsByTarget(target string) (agents []mig.Agent, err error
var agent mig.Agent
err = rows.Scan(&agent.ID, &agent.Name, &agent.QueueLoc, &agent.Version,
&agent.PID, &agent.StartTime, &agent.DestructionTime, &agent.HeartBeatTS,
&agent.Status, &agent.Mode, &jEnv, &jTags)
&agent.RefreshTS, &agent.Status, &agent.Mode, &jEnv, &jTags)
if err != nil {
err = fmt.Errorf("Failed to retrieve agent data: '%v'", err)
return

Просмотреть файл

@ -28,6 +28,7 @@ CREATE TABLE agents (
starttime timestamp with time zone NOT NULL,
destructiontime timestamp with time zone,
heartbeattime timestamp with time zone NOT NULL,
refreshtime timestamp with time zone NOT NULL,
status character varying(255),
environment json,
tags json

Просмотреть файл

@ -16,7 +16,7 @@ import (
// checkActionAuthorization verifies the PGP signatures of a given action
// against the Access Control List of the agent.
func checkActionAuthorization(a mig.Action, ctx Context) (err error) {
func checkActionAuthorization(a mig.Action, ctx *Context) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("checkActionAuthorization() -> %v", e)

Просмотреть файл

@ -272,12 +272,14 @@ func runAgentCheckin(runOpt runtimeOptions) (err error) {
ctx.Agent.Mode = "checkin"
err = startRoutines(ctx)
err = startRoutines(&ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to start agent routines: '%v'", err)
os.Exit(0)
}
ctx.Agent.Lock()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Mozilla InvestiGator version %s: started agent %s in checkin mode", mig.Version, ctx.Agent.Hostname)}
ctx.Agent.Unlock()
// The loop below retrieves messages from the relay. If no message is available,
// it will timeout and break out of the loop after 10 seconds, causing the agent to exit
@ -350,14 +352,16 @@ func runAgent(runOpt runtimeOptions) (err error) {
ctx.Agent.Mode = "daemon"
// Goroutine that receives messages from AMQP
go getCommands(ctx)
go getCommands(&ctx)
err = startRoutines(ctx)
err = startRoutines(&ctx)
if err != nil {
panic(err)
}
ctx.Agent.Lock()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Mozilla InvestiGator version %s: started agent %s", mig.Version, ctx.Agent.Hostname)}
ctx.Agent.Unlock()
// The agent blocks here until a termination order is received
// The order is then evaluated to decide if a new agent must be respawned, or the agent
@ -396,8 +400,9 @@ func runAgent(runOpt runtimeOptions) (err error) {
return
}
// startRoutines starts the goroutines that process commands and heartbeats
func startRoutines(ctx Context) (err error) {
// startRoutines starts the goroutines that process commands, heartbeats, and look after
// refreshing the agent environment.
func startRoutines(ctx *Context) (err error) {
// GoRoutine that parses and validates incoming commands
go func() {
for msg := range ctx.Channels.NewCommand {
@ -438,11 +443,19 @@ func startRoutines(ctx Context) (err error) {
// GoRoutine that sends heartbeat messages to scheduler
go heartbeat(ctx)
// GoRoutine that updates the agent environment
if REFRESHENV != 0 {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("environment will refresh every %v", REFRESHENV)}
go refreshAgentEnvironment(ctx)
} else {
ctx.Channels.Log <- mig.Log{Desc: "periodic environment refresh is disabled"}
}
return
}
// getCommands receives AMQP messages, and feed them to the action chan
func getCommands(ctx Context) (err error) {
func getCommands(ctx *Context) (err error) {
for m := range ctx.MQ.Bind.Chan {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("received message '%s'", m.Body)}.Debug()
@ -468,7 +481,7 @@ func getCommands(ctx Context) (err error) {
// parseCommands transforms a message into a MIG Command struct, performs validation
// and run the command
func parseCommands(ctx Context, msg []byte) (err error) {
func parseCommands(ctx *Context, msg []byte) (err error) {
var cmd mig.Command
cmd.ID = 0 // safety net
defer func() {
@ -549,7 +562,7 @@ func parseCommands(ctx Context, msg []byte) (err error) {
// execution and kills the module if needed. On success, it stores the output from
// the module in a moduleResult struct and passes it along to the function that aggregates
// all results
func runModule(ctx Context, op moduleOp) (err error) {
func runModule(ctx *Context, op moduleOp) (err error) {
var result moduleResult
result.id = op.id
result.position = op.position
@ -659,7 +672,7 @@ func runModule(ctx Context, op moduleOp) (err error) {
// receiveResult listens on a temporary channels for results coming from modules. It aggregates them, and
// when all are received, it builds a response that is passed to the Result channel
func receiveModuleResults(ctx Context, cmd mig.Command, resultChan chan moduleResult, opsCounter int) (err error) {
func receiveModuleResults(ctx *Context, cmd mig.Command, resultChan chan moduleResult, opsCounter int) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("receiveModuleResults() -> %v", e)
@ -722,7 +735,7 @@ finish:
}
// sendResults builds a message body and send the command results back to the scheduler
func sendResults(ctx Context, result mig.Command) (err error) {
func sendResults(ctx *Context, result mig.Command) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("sendResults() -> %v", e)
@ -746,7 +759,10 @@ func sendResults(ctx Context, result mig.Command) (err error) {
// hearbeat will send heartbeats messages to the scheduler at regular intervals
// and also store that heartbeat on disc
func heartbeat(ctx Context) (err error) {
func heartbeat(ctx *Context) (err error) {
// loop forever
for {
ctx.Agent.Lock()
// declare an Agent registration message
HeartBeat := mig.Agent{
Name: ctx.Agent.Hostname,
@ -757,10 +773,9 @@ func heartbeat(ctx Context) (err error) {
StartTime: time.Now(),
Env: ctx.Agent.Env,
Tags: ctx.Agent.Tags,
RefreshTS: ctx.Agent.RefreshTS,
}
// loop forever
for {
// make a heartbeat
HeartBeat.HeartBeatTS = time.Now()
body, err := json.Marshal(HeartBeat)
@ -777,13 +792,14 @@ func heartbeat(ctx Context) (err error) {
ctx.Channels.Log <- mig.Log{Desc: "Failed to write mig-agent.ok to disk"}.Err()
}
os.Chmod(ctx.Agent.RunDir+"mig-agent.ok", 0644)
ctx.Agent.Unlock()
time.Sleep(ctx.Sleeper)
}
return
}
// publish is a generic function that sends messages to an AMQP exchange
func publish(ctx Context, exchange, routingKey string, body []byte) (err error) {
func publish(ctx *Context, exchange, routingKey string, body []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("publish() -> %v", e)

Просмотреть файл

@ -35,6 +35,47 @@ type AgentContext struct {
AWS AWSContext // AWS specific information
}
func (ctx *AgentContext) IsZero() bool {
// If we don't have an OS treat it as unset
if ctx.OS == "" {
return true
}
return false
}
// Check of any values in the AgentContext differ from those in comp
func (ctx *AgentContext) Differs(comp AgentContext) bool {
if ctx.Hostname != comp.Hostname ||
ctx.BinPath != comp.BinPath ||
ctx.RunDir != comp.RunDir ||
ctx.OS != comp.OS ||
ctx.OSIdent != comp.OSIdent ||
ctx.Init != comp.Init ||
ctx.Architecture != comp.Architecture ||
ctx.PublicIP != comp.PublicIP ||
ctx.AWS.InstanceID != comp.AWS.InstanceID ||
ctx.AWS.LocalIPV4 != comp.AWS.LocalIPV4 ||
ctx.AWS.AMIID != comp.AWS.AMIID ||
ctx.AWS.InstanceType != comp.AWS.InstanceType {
return true
}
if ctx.Addresses == nil && comp.Addresses == nil {
return false
}
if ctx.Addresses == nil || comp.Addresses == nil {
return true
}
if len(ctx.Addresses) != len(comp.Addresses) {
return true
}
for i := range ctx.Addresses {
if ctx.Addresses[i] != comp.Addresses[i] {
return true
}
}
return false
}
func (ctx *AgentContext) ToAgent() (ret mig.Agent) {
ret.Name = ctx.Hostname
ret.PID = os.Getpid()

Просмотреть файл

@ -28,6 +28,7 @@ type config struct {
HeartbeatFreq string
ModuleTimeout string
Api string
RefreshEnv string
}
Certs struct {
Ca, Cert, Key string
@ -67,6 +68,13 @@ func configLoad(path string) (err error) {
if err != nil {
panic(err)
}
var refreshenv time.Duration
if config.Agent.RefreshEnv != "" {
refreshenv, err = time.ParseDuration(config.Agent.RefreshEnv)
if err != nil {
panic(err)
}
}
ISIMMORTAL = config.Agent.IsImmortal
MUSTINSTALLSERVICE = config.Agent.InstallService
@ -81,5 +89,6 @@ func configLoad(path string) (err error) {
CACERT = cacert
AGENTCERT = agentcert
AGENTKEY = agentkey
REFRESHENV = refreshenv
return
}

Просмотреть файл

@ -24,6 +24,7 @@ import (
"os"
"os/signal"
"strconv"
"sync"
"time"
)
@ -33,11 +34,27 @@ import (
type Context struct {
ACL mig.ACL
Agent struct {
Hostname, QueueLoc, Mode, UID, BinPath, RunDir string
BinPath, RunDir, QueueLoc, Mode, UID string
Respawn bool
CheckIn bool
// A lock must be obtained before reading these values.
Hostname string
Env mig.AgentEnv
Tags interface{}
RefreshTS time.Time
// Stores a copy of the last agent context generated by
// agentcontext.NewAgentContext, used primarily to determine
// if the context has changed when we do a refresh
lastAgentContext agentcontext.AgentContext
// This mutex is used to protect the Agent struct as some
// elements in the struct could be routinely updated by
// the agent context update go-routine. Care should be taken
// to ensure a lock is obtained before reading or using
// values flagged as volatile.
sync.Mutex
}
Channels struct {
// internal
@ -69,6 +86,36 @@ type Context struct {
Logging mig.Logging
}
// Update volatile/dynamic fields in c.Agent using information stored in
// the AgentContext. A lock should be obtained on c.Agent before this
// function is called.
func (c *Context) updateVolatileFromAgentContext(actx agentcontext.AgentContext) bool {
ts := time.Now()
c.Agent.Hostname = actx.Hostname
c.Agent.Env.OS = actx.OS
c.Agent.Env.Arch = actx.Architecture
c.Agent.Env.Ident = actx.OSIdent
c.Agent.Env.Init = actx.Init
c.Agent.Env.Addresses = actx.Addresses
c.Agent.Env.PublicIP = actx.PublicIP
c.Agent.Env.AWS.InstanceID = actx.AWS.InstanceID
c.Agent.Env.AWS.LocalIPV4 = actx.AWS.LocalIPV4
c.Agent.Env.AWS.AMIID = actx.AWS.AMIID
c.Agent.Env.AWS.InstanceType = actx.AWS.InstanceType
if c.Agent.lastAgentContext.IsZero() {
c.Agent.lastAgentContext = actx
c.Agent.RefreshTS = ts
return false
}
// See if anything changed since the last update
if actx.Differs(c.Agent.lastAgentContext) {
c.Agent.lastAgentContext = actx
c.Agent.RefreshTS = ts
return true
}
return false
}
// Init prepare the AMQP connections to the broker and launches the
// goroutines that will process commands received by the MIG Scheduler
func Init(foreground, upgrade bool) (ctx Context, err error) {
@ -80,6 +127,11 @@ func Init(foreground, upgrade bool) (ctx Context, err error) {
ctx.Channels.Log <- mig.Log{Desc: "leaving initAgent()"}.Debug()
}
}()
// Pick up a lock on Context Agent field as we will be updating or reading it here and in
// various functions called from here such as daemonize().
ctx.Agent.Lock()
defer ctx.Agent.Unlock()
ctx.Agent.Tags = TAGS
ctx.Logging, err = mig.InitLogger(LOGGINGCONF, "mig-agent")
@ -120,20 +172,14 @@ func Init(foreground, upgrade bool) (ctx Context, err error) {
// is controlled by systemd, upstart or launchd
ctx.Agent.Respawn = ISIMMORTAL
// Assign the values based on the context we retrieved
ctx.Agent.BinPath = actx.BinPath
ctx.Agent.Hostname = actx.Hostname
// Do initial assignment of values which could change over the lifetime
// of the agent process
ctx.updateVolatileFromAgentContext(actx)
// Set some other values obtained from the agent context which will not
// change while the process is running.
ctx.Agent.RunDir = actx.RunDir
ctx.Agent.Env.OS = actx.OS
ctx.Agent.Env.Arch = actx.Architecture
ctx.Agent.Env.Ident = actx.OSIdent
ctx.Agent.Env.Init = actx.Init
ctx.Agent.Env.Addresses = actx.Addresses
ctx.Agent.Env.PublicIP = actx.PublicIP
ctx.Agent.Env.AWS.InstanceID = actx.AWS.InstanceID
ctx.Agent.Env.AWS.LocalIPV4 = actx.AWS.LocalIPV4
ctx.Agent.Env.AWS.AMIID = actx.AWS.AMIID
ctx.Agent.Env.AWS.InstanceType = actx.AWS.InstanceType
ctx.Agent.BinPath = actx.BinPath
// get the agent ID
ctx, err = initAgentID(ctx)
@ -561,3 +607,26 @@ func serviceDeploy(orig_ctx Context) (ctx Context, err error) {
ctx.Channels.Log <- mig.Log{Desc: "Started mig-agent service"}.Info()
return
}
func refreshAgentEnvironment(ctx *Context) {
for {
time.Sleep(REFRESHENV)
ctx.Channels.Log <- mig.Log{Desc: "refreshing agent environment"}.Info()
ctx.Agent.Lock()
hints := agentcontext.AgentContextHints{
DiscoverPublicIP: DISCOVERPUBLICIP,
DiscoverAWSMeta: DISCOVERAWSMETA,
APIUrl: APIURL,
Proxies: PROXIES[:],
}
actx, err := agentcontext.NewAgentContext(ctx.Channels.Log, hints)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("error obtaining new agent context: %v", err)}.Err()
} else {
if ctx.updateVolatileFromAgentContext(actx) {
ctx.Channels.Log <- mig.Log{Desc: "agent environment has changed"}.Info()
}
}
ctx.Agent.Unlock()
}
}

Просмотреть файл

@ -122,7 +122,7 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
agt.Status = mig.AgtStatusOnline
// create a new agent, set starttime to now
agt.StartTime = time.Now()
err = ctx.DB.InsertAgent(agt)
err = ctx.DB.InsertAgent(agt, nil)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB insertion failed with error '%v' for agent '%s'", err, agt.Name)}.Err()
}
@ -140,10 +140,30 @@ func getHeartbeats(msg amqp.Delivery, ctx Context) (err error) {
} else {
agt.Status = mig.AgtStatusOnline
}
// If the refresh time is newer than what we know for the agent, replace
// the agent in the database with the newer information. We want to keep
// history here, so don't want to just update the information in the
// existing row.
//
// Note: with older agents which might not send a refresh time, the refresh
// time will be interpreted as the zero value, and the agents should just
// update using UpdateAgentHeartbeat()
if agt.RefreshTS.IsZero() {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("agent '%v' not sending refresh time, perhaps an older version?", agt.Name)}.Warning()
}
cutoff := agent.RefreshTS.Add(15 * time.Second)
if !agt.RefreshTS.IsZero() && agt.RefreshTS.After(cutoff) {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("replacing refreshed agent for agent '%v'", agt.Name)}.Info()
err = ctx.DB.ReplaceRefreshedAgent(agt)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB update failed (refresh) with error '%v' for agent '%s'", err, agt.Name)}.Err()
}
} else {
err = ctx.DB.UpdateAgentHeartbeat(agt)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Heartbeat DB update failed with error '%v' for agent '%s'", err, agt.Name)}.Err()
}
}
// if the agent that exists in the database has a status of 'destroyed'
// we should not be received a heartbeat from it. so, if detectmultiagents
// is set in the scheduler configuration, we pass the agent queue over to the

Просмотреть файл

@ -305,6 +305,7 @@ var AMQPBROKER string = "amqp://agent:$mqpass@localhost:5672/mig"
var PROXIES = [...]string{``}
var SOCKET string = "127.0.0.1:51664"
var HEARTBEATFREQ time.Duration = 30 * time.Second
var REFRESHENV time.Duration = 60 * time.Second
var MODULETIMEOUT time.Duration = 300 * time.Second
var AGENTACL = [...]string{
\`{