зеркало из https://github.com/mozilla/mig.git
[medium] refactor agents stats into its own table & job
the api was previously calculating agent stats on the fly, which was slow and didn't keep historical data. it's now the responsibility of the scheduler to periodically calculate these stats and store them into an agents_stats table
This commit is contained in:
Родитель
91141b17da
Коммит
075167433f
|
@ -1,16 +1,38 @@
|
|||
; Sample MIG configuration file
|
||||
|
||||
[agent]
|
||||
|
||||
; timeout controls the inactivity period after which
|
||||
; agents are marked offline
|
||||
timeout = "20m"
|
||||
|
||||
; heartbeatfreq maps to the agent configuration and helps
|
||||
; the scheduler detect duplicate agents, and some other things
|
||||
heartbeatfreq = "5m"
|
||||
|
||||
; whitelist contains a list of agent queues that are allowed
|
||||
; to send heartbeats and receive commands
|
||||
whitelist = "/var/cache/mig/agents_whitelist.txt"
|
||||
|
||||
; detect endpoints that are running multiple agents
|
||||
detectmultiagents = true
|
||||
|
||||
; issue kill orders to duplicate agents running on the same endpoint
|
||||
killdupagents = false
|
||||
|
||||
; the collector continuously pulls
|
||||
; pending messages from the spool
|
||||
[collector]
|
||||
; frequency at which the job collector runs
|
||||
freq = "60s"
|
||||
; period during which done actions and commands, and invalid actions are kept
|
||||
; frequency at which the collector runs,
|
||||
; default is to run every second
|
||||
freq = "1s"
|
||||
|
||||
; the periodic runs less often that
|
||||
; the collector and does cleanup and DB updates
|
||||
[periodic]
|
||||
freq = "87s"
|
||||
; delete finished actions, commands and invalids after
|
||||
; this period has passed
|
||||
deleteafter = "72h"
|
||||
|
||||
[directories]
|
||||
|
|
|
@ -44,3 +44,22 @@ type AgentEnv struct {
|
|||
Addresses []string `json:"addresses,omitempty"`
|
||||
PublicIP string `json:"publicip,omitempty"`
|
||||
}
|
||||
|
||||
type AgentsStats struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
OnlineAgents float64 `json:"onlineagents"`
|
||||
OnlineAgentsByVersion []AgentsVersionsSum `json:"onlineagentsbyversion"`
|
||||
OnlineEndpoints float64 `json:"onlineendpoints"`
|
||||
IdleAgents float64 `json:"idleagents"`
|
||||
IdleAgentsByVersion []AgentsVersionsSum `json:"idleagentsbyversion"`
|
||||
IdleEndpoints float64 `json:"idleendpoints"`
|
||||
NewEndpoints float64 `json:"newendpoints"`
|
||||
MultiAgentsEndpoints float64 `json:"multiagentsendpoints"`
|
||||
DisappearedEndpoints float64 `json:"disappearedendpoints"`
|
||||
FlappingEndpoints float64 `json:"flappingendpoints"`
|
||||
}
|
||||
|
||||
type AgentsVersionsSum struct {
|
||||
Version string `json:"version"`
|
||||
Count float64 `json:"count"`
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"fmt"
|
||||
"github.com/jvehent/cljs"
|
||||
"mig"
|
||||
migdb "mig/database"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
@ -77,30 +76,19 @@ func agentToItem(agt mig.Agent) (item cljs.Item, err error) {
|
|||
|
||||
// agentsSumToItem receives an AgentsSum and returns an Item
|
||||
// in the Collection+JSON format
|
||||
func agentsSummaryToItem(onlineagtsum, idleagtsum []migdb.AgentsSum,
|
||||
countonlineendpoints, countidleendpoints, countnewendpoints, countdoubleagents, countdisappearedendpoints, countflappingendpoints float64,
|
||||
ctx Context) (item cljs.Item, err error) {
|
||||
func agentsSummaryToItem(stats mig.AgentsStats, ctx Context) (item cljs.Item, err error) {
|
||||
item.Href = fmt.Sprintf("%s/dashboard", ctx.Server.BaseURL)
|
||||
var (
|
||||
totalOnlineAgents, totalIdleAgents float64
|
||||
)
|
||||
for _, asum := range onlineagtsum {
|
||||
totalOnlineAgents += asum.Count
|
||||
}
|
||||
for _, asum := range idleagtsum {
|
||||
totalIdleAgents += asum.Count
|
||||
}
|
||||
item.Data = []cljs.Data{
|
||||
{Name: "online agents", Value: totalOnlineAgents},
|
||||
{Name: "online endpoints", Value: countonlineendpoints},
|
||||
{Name: "idle agents", Value: totalIdleAgents},
|
||||
{Name: "idle endpoints", Value: countidleendpoints},
|
||||
{Name: "new endpoints", Value: countnewendpoints},
|
||||
{Name: "endpoints running 2 or more agents", Value: countdoubleagents},
|
||||
{Name: "disappeared endpoints", Value: countdisappearedendpoints},
|
||||
{Name: "flapping endpoints", Value: countflappingendpoints},
|
||||
{Name: "online agents by version", Value: onlineagtsum},
|
||||
{Name: "idle agents by version", Value: idleagtsum},
|
||||
{Name: "online agents", Value: stats.OnlineAgents},
|
||||
{Name: "online agents by version", Value: stats.OnlineAgentsByVersion},
|
||||
{Name: "online endpoints", Value: stats.OnlineEndpoints},
|
||||
{Name: "idle agents", Value: stats.IdleAgents},
|
||||
{Name: "idle agents by version", Value: stats.IdleAgentsByVersion},
|
||||
{Name: "idle endpoints", Value: stats.IdleEndpoints},
|
||||
{Name: "new endpoints", Value: stats.NewEndpoints},
|
||||
{Name: "endpoints running 2 or more agents", Value: stats.MultiAgentsEndpoints},
|
||||
{Name: "disappeared endpoints", Value: stats.DisappearedEndpoints},
|
||||
{Name: "flapping endpoints", Value: stats.FlappingEndpoints},
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -12,11 +12,9 @@ import (
|
|||
"github.com/gorilla/mux"
|
||||
"github.com/jvehent/cljs"
|
||||
"mig"
|
||||
migdb "mig/database"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ctx Context
|
||||
|
@ -360,9 +358,8 @@ func getHome(respWriter http.ResponseWriter, request *http.Request) {
|
|||
|
||||
func getDashboard(respWriter http.ResponseWriter, request *http.Request) {
|
||||
var (
|
||||
err error
|
||||
onlineagtsum, idleagtsum []migdb.AgentsSum
|
||||
onlineEndpts, idleEndpts, newEndpts, doubleAgts, disappearedEndpts, flappingEndpts float64
|
||||
err error
|
||||
agentsStats mig.AgentsStats
|
||||
)
|
||||
opid := getOpID(request)
|
||||
loc := fmt.Sprintf("%s%s", ctx.Server.Host, request.URL.String())
|
||||
|
@ -375,98 +372,15 @@ func getDashboard(respWriter http.ResponseWriter, request *http.Request) {
|
|||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: "leaving getDashboard()"}.Debug()
|
||||
}()
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
start := time.Now()
|
||||
onlineagtsum, err = ctx.DB.SumOnlineAgentsByVersion()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("SumOnlineAgentsByVersion() took %v to run", d)}.Debug()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
idleagtsum, err = ctx.DB.SumIdleAgentsByVersion()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("SumIdleAgentsByVersion() took %v to run", d)}.Debug()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
onlineEndpts, err = ctx.DB.CountOnlineEndpoints()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("CountOnlineEndpoints() took %v to run", d)}.Debug()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
idleEndpts, err = ctx.DB.CountIdleEndpoints()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("CountIdleEndpoints() took %v to run", d)}.Debug()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
newEndpts, err = ctx.DB.CountNewEndpoints(time.Now().Add(-7 * 24 * time.Hour))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("CountNewEndpoints() took %v to run", d)}.Debug()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
doubleAgts, err = ctx.DB.CountDoubleAgents()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("CountDoubleAgents() took %v to run", d)}.Debug()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
disappearedEndpts, err = ctx.DB.CountDisappearedEndpoints(time.Now().Add(-7 * 24 * time.Hour))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("CountDisappearedEndpoints() took %v to run", d)}.Debug()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
flappingEndpts, err = ctx.DB.CountFlappingEndpoints()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
done <- true
|
||||
d := time.Since(start)
|
||||
ctx.Channels.Log <- mig.Log{OpID: opid, Desc: fmt.Sprintf("CountFlappingEndpoints() took %v to run", d)}.Debug()
|
||||
}()
|
||||
// each query is ran in parallel and return a boolean in the done channel
|
||||
// so when we have received 8 messages in the channel, all queries are done
|
||||
ctr := 0
|
||||
for <-done {
|
||||
ctr++
|
||||
if ctr == 8 {
|
||||
break
|
||||
}
|
||||
stats, err := ctx.DB.GetAgentsStats(1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sumItem, err := agentsSummaryToItem(onlineagtsum, idleagtsum, onlineEndpts,
|
||||
idleEndpts, newEndpts, doubleAgts, disappearedEndpts, flappingEndpts, ctx)
|
||||
if len(stats) != 1 {
|
||||
panic(fmt.Sprintf("expected 1 set of agents stats, got %d", len(stats)))
|
||||
}
|
||||
agentsStats = stats[0]
|
||||
sumItem, err := agentsSummaryToItem(agentsStats, ctx)
|
||||
resource.AddItem(sumItem)
|
||||
|
||||
// add the last 10 actions
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"log"
|
||||
"mig"
|
||||
"mig/client"
|
||||
migdb "mig/database"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -240,7 +239,7 @@ func printStatus(cli client.Client) (err error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var sum []migdb.AgentsSum
|
||||
var sum []mig.AgentsVersionsSum
|
||||
err = json.Unmarshal(bData, &sum)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -257,7 +256,7 @@ func printStatus(cli client.Client) (err error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var sum []migdb.AgentsSum
|
||||
var sum []mig.AgentsVersionsSum
|
||||
err = json.Unmarshal(bData, &sum)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -33,10 +33,11 @@ func (db *DB) AgentByQueueAndPID(queueloc string, pid int) (agent mig.Agent, err
|
|||
|
||||
// AgentByID returns a single agent identified by its ID
|
||||
func (db *DB) AgentByID(id float64) (agent mig.Agent, err error) {
|
||||
var jTags, jEnv []byte
|
||||
err = db.c.QueryRow(`SELECT id, name, queueloc, mode, version, pid, starttime, heartbeattime,
|
||||
status FROM agents WHERE id=$1`, id).Scan(
|
||||
status, tags, environment FROM agents WHERE id=$1`, id).Scan(
|
||||
&agent.ID, &agent.Name, &agent.QueueLoc, &agent.Mode, &agent.Version, &agent.PID,
|
||||
&agent.StartTime, &agent.HeartBeatTS, &agent.Status)
|
||||
&agent.StartTime, &agent.HeartBeatTS, &agent.Status, &jTags, &jEnv)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Error while retrieving agent: '%v'", err)
|
||||
return
|
||||
|
@ -44,6 +45,16 @@ func (db *DB) AgentByID(id float64) (agent mig.Agent, err error) {
|
|||
if err == sql.ErrNoRows {
|
||||
return
|
||||
}
|
||||
err = json.Unmarshal(jTags, &agent.Tags)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to unmarshal agent tags")
|
||||
return
|
||||
}
|
||||
err = json.Unmarshal(jEnv, &agent.Env)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to unmarshal agent environment")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -169,10 +180,8 @@ func (db *DB) ActiveAgentsByTarget(target string) (agents []mig.Agent, err error
|
|||
_ = txn.Rollback()
|
||||
return
|
||||
}
|
||||
rows, err := txn.Query(fmt.Sprintf(`SELECT DISTINCT ON (queueloc) id, name, queueloc, mode, version, pid,
|
||||
starttime, destructiontime, heartbeattime, status
|
||||
FROM agents
|
||||
WHERE agents.status IN ('%s', '%s') AND (%s)
|
||||
rows, err := txn.Query(fmt.Sprintf(`SELECT DISTINCT ON (queueloc) id, name, queueloc, version, pid, status
|
||||
FROM agents WHERE agents.status IN ('%s', '%s') AND (%s)
|
||||
ORDER BY agents.queueloc, agents.heartbeattime DESC`, mig.AgtStatusOnline, mig.AgtStatusIdle, target))
|
||||
if err != nil {
|
||||
_ = txn.Rollback()
|
||||
|
@ -181,9 +190,7 @@ func (db *DB) ActiveAgentsByTarget(target string) (agents []mig.Agent, err error
|
|||
}
|
||||
for rows.Next() {
|
||||
var agent mig.Agent
|
||||
err = rows.Scan(&agent.ID, &agent.Name, &agent.QueueLoc, &agent.Mode, &agent.Version,
|
||||
&agent.PID, &agent.StartTime, &agent.DestructionTime, &agent.HeartBeatTS,
|
||||
&agent.Status)
|
||||
err = rows.Scan(&agent.ID, &agent.Name, &agent.QueueLoc, &agent.Version, &agent.PID, &agent.Status)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
err = fmt.Errorf("Failed to retrieve agent data: '%v'", err)
|
||||
|
@ -229,13 +236,76 @@ func (db *DB) MarkAgentDestroyed(agent mig.Agent) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
type AgentsSum struct {
|
||||
Version string `json:"version"`
|
||||
Count float64 `json:"count"`
|
||||
// GetAgentsStats retrieves the latest agents statistics. limit controls how many rows
|
||||
// of statistics are returned
|
||||
func (db *DB) GetAgentsStats(limit int) (stats []mig.AgentsStats, err error) {
|
||||
rows, err := db.c.Query(`SELECT timestamp, online_agents, online_agents_by_version,
|
||||
online_endpoints, idle_agents, idle_agents_by_version, idle_endpoints, new_endpoints,
|
||||
multi_agents_endpoints, disappeared_endpoints, flapping_endpoints
|
||||
FROM agents_stats ORDER BY timestamp DESC LIMIT $1`, limit)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Error while retrieving agent statistics: '%v'", err)
|
||||
return
|
||||
}
|
||||
for rows.Next() {
|
||||
var jOnlAgtVer, jIdlAgtVer []byte
|
||||
var s mig.AgentsStats
|
||||
err = rows.Scan(&s.Timestamp, &s.OnlineAgents, &jOnlAgtVer, &s.OnlineEndpoints,
|
||||
&s.IdleAgents, &jIdlAgtVer, &s.IdleEndpoints, &s.NewEndpoints,
|
||||
&s.MultiAgentsEndpoints, &s.DisappearedEndpoints, &s.FlappingEndpoints)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
err = fmt.Errorf("Failed to retrieve agent statistics data: '%v'", err)
|
||||
return
|
||||
}
|
||||
err = json.Unmarshal(jOnlAgtVer, &s.OnlineAgentsByVersion)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
err = fmt.Errorf("Failed to unmarshal online agent by version statistics: '%v'", err)
|
||||
return
|
||||
}
|
||||
err = json.Unmarshal(jIdlAgtVer, &s.IdleAgentsByVersion)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
err = fmt.Errorf("Failed to unmarshal idle agent by version statistics: '%v'", err)
|
||||
return
|
||||
}
|
||||
stats = append(stats, s)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
err = fmt.Errorf("Failed to complete database query: '%v'", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StoreAgentsStats store a new row of agents statistics and sets the timestamp to the current time
|
||||
func (db *DB) StoreAgentsStats(stats mig.AgentsStats) (err error) {
|
||||
jOnlAgtVer, err := json.Marshal(stats.OnlineAgentsByVersion)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Failed to marshal online agents by version: '%v'", err)
|
||||
return
|
||||
}
|
||||
jIdlAgtVer, err := json.Marshal(stats.IdleAgentsByVersion)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Failed to marshal idle agents by version: '%v'", err)
|
||||
return
|
||||
}
|
||||
_, err = db.c.Exec(`INSERT INTO agents_stats
|
||||
(timestamp, online_agents, online_agents_by_version, online_endpoints,
|
||||
idle_agents, idle_agents_by_version, idle_endpoints, new_endpoints,
|
||||
multi_agents_endpoints, disappeared_endpoints, flapping_endpoints)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`,
|
||||
time.Now().UTC(), stats.OnlineAgents, jOnlAgtVer, stats.OnlineEndpoints,
|
||||
stats.IdleAgents, jIdlAgtVer, stats.IdleEndpoints, stats.NewEndpoints,
|
||||
stats.MultiAgentsEndpoints, stats.DisappearedEndpoints, stats.FlappingEndpoints)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to insert agent statistics in database: '%v'", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SumOnlineAgentsByVersion retrieves a sum of online agents grouped by version
|
||||
func (db *DB) SumOnlineAgentsByVersion() (sum []AgentsSum, err error) {
|
||||
func (db *DB) SumOnlineAgentsByVersion() (sum []mig.AgentsVersionsSum, err error) {
|
||||
rows, err := db.c.Query(`SELECT COUNT(*), version FROM agents
|
||||
WHERE agents.status=$1 GROUP BY version`, mig.AgtStatusOnline)
|
||||
if err != nil {
|
||||
|
@ -243,7 +313,7 @@ func (db *DB) SumOnlineAgentsByVersion() (sum []AgentsSum, err error) {
|
|||
return
|
||||
}
|
||||
for rows.Next() {
|
||||
var asum AgentsSum
|
||||
var asum mig.AgentsVersionsSum
|
||||
err = rows.Scan(&asum.Count, &asum.Version)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
|
@ -260,7 +330,7 @@ func (db *DB) SumOnlineAgentsByVersion() (sum []AgentsSum, err error) {
|
|||
|
||||
// SumIdleAgentsByVersion retrieves a sum of idle agents grouped by version
|
||||
// and excludes endpoints where an online agent is running
|
||||
func (db *DB) SumIdleAgentsByVersion() (sum []AgentsSum, err error) {
|
||||
func (db *DB) SumIdleAgentsByVersion() (sum []mig.AgentsVersionsSum, err error) {
|
||||
rows, err := db.c.Query(`SELECT COUNT(*), version FROM agents
|
||||
WHERE agents.status=$1 AND agents.queueloc NOT IN (
|
||||
SELECT distinct(queueloc) FROM agents
|
||||
|
@ -271,7 +341,7 @@ func (db *DB) SumIdleAgentsByVersion() (sum []AgentsSum, err error) {
|
|||
return
|
||||
}
|
||||
for rows.Next() {
|
||||
var asum AgentsSum
|
||||
var asum mig.AgentsVersionsSum
|
||||
err = rows.Scan(&asum.Count, &asum.Version)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
|
@ -323,18 +393,18 @@ func (db *DB) CountIdleEndpoints() (sum float64, err error) {
|
|||
}
|
||||
|
||||
// CountNewEndpointsretrieves a count of new endpoints that started after `pointInTime`
|
||||
func (db *DB) CountNewEndpoints(pointInTime time.Time) (sum float64, err error) {
|
||||
func (db *DB) CountNewEndpoints(recent, old time.Time) (sum float64, err error) {
|
||||
err = db.c.QueryRow(`SELECT COUNT(*) FROM (
|
||||
SELECT queueloc FROM agents
|
||||
WHERE queueloc NOT IN (
|
||||
SELECT queueloc FROM agents
|
||||
WHERE heartbeattime > NOW() - interval '60 days'
|
||||
WHERE heartbeattime > $2
|
||||
AND heartbeattime < $1
|
||||
GROUP BY queueloc
|
||||
)
|
||||
AND starttime > $1
|
||||
GROUP BY queueloc
|
||||
)AS newendpoints`, pointInTime).Scan(&sum)
|
||||
)AS newendpoints`, recent, old).Scan(&sum)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Error while counting new endpoints: '%v'", err)
|
||||
return
|
||||
|
|
|
@ -1,36 +1,36 @@
|
|||
CREATE TABLE actions (
|
||||
id numeric NOT NULL,
|
||||
name character varying(2048) NOT NULL,
|
||||
target character varying(2048) NOT NULL,
|
||||
description json,
|
||||
threat json,
|
||||
operations json,
|
||||
validfrom timestamp with time zone NOT NULL,
|
||||
expireafter timestamp with time zone NOT NULL,
|
||||
starttime timestamp with time zone,
|
||||
finishtime timestamp with time zone,
|
||||
lastupdatetime timestamp with time zone,
|
||||
status character varying(256),
|
||||
syntaxversion integer,
|
||||
pgpsignature character varying(4096) NOT NULL
|
||||
id numeric NOT NULL,
|
||||
name character varying(2048) NOT NULL,
|
||||
target character varying(2048) NOT NULL,
|
||||
description json,
|
||||
threat json,
|
||||
operations json,
|
||||
validfrom timestamp with time zone NOT NULL,
|
||||
expireafter timestamp with time zone NOT NULL,
|
||||
starttime timestamp with time zone,
|
||||
finishtime timestamp with time zone,
|
||||
lastupdatetime timestamp with time zone,
|
||||
status character varying(256),
|
||||
syntaxversion integer,
|
||||
pgpsignature character varying(4096) NOT NULL
|
||||
);
|
||||
ALTER TABLE public.actions OWNER TO migadmin;
|
||||
ALTER TABLE ONLY actions
|
||||
ADD CONSTRAINT actions_pkey PRIMARY KEY (id);
|
||||
|
||||
CREATE TABLE agents (
|
||||
id numeric NOT NULL,
|
||||
name character varying(2048) NOT NULL,
|
||||
queueloc character varying(2048) NOT NULL,
|
||||
mode character varying(2048) NOT NULL,
|
||||
version character varying(2048) NOT NULL,
|
||||
pid integer NOT NULL,
|
||||
starttime timestamp with time zone NOT NULL,
|
||||
destructiontime timestamp with time zone,
|
||||
heartbeattime timestamp with time zone NOT NULL,
|
||||
status character varying(255),
|
||||
environment json,
|
||||
tags json
|
||||
id numeric NOT NULL,
|
||||
name character varying(2048) NOT NULL,
|
||||
queueloc character varying(2048) NOT NULL,
|
||||
mode character varying(2048) NOT NULL,
|
||||
version character varying(2048) NOT NULL,
|
||||
pid integer NOT NULL,
|
||||
starttime timestamp with time zone NOT NULL,
|
||||
destructiontime timestamp with time zone,
|
||||
heartbeattime timestamp with time zone NOT NULL,
|
||||
status character varying(255),
|
||||
environment json,
|
||||
tags json
|
||||
);
|
||||
ALTER TABLE public.agents OWNER TO migadmin;
|
||||
ALTER TABLE ONLY agents
|
||||
|
@ -39,10 +39,24 @@ CREATE INDEX agents_heartbeattime_idx ON agents(heartbeattime DESC);
|
|||
CREATE INDEX agents_queueloc_pid_idx ON agents(queueloc, pid);
|
||||
CREATE INDEX agents_status_idx ON agents(status);
|
||||
|
||||
CREATE TABLE agents_stats (
|
||||
timestamp timestamp with time zone not null,
|
||||
online_agents numeric,
|
||||
online_agents_by_version json,
|
||||
online_endpoints numeric,
|
||||
idle_agents numeric,
|
||||
idle_agents_by_version json,
|
||||
idle_endpoints numeric,
|
||||
new_endpoints numeric,
|
||||
multi_agents_endpoints numeric,
|
||||
disappeared_endpoints numeric,
|
||||
flapping_endpoints numeric
|
||||
);
|
||||
|
||||
CREATE TABLE agtmodreq (
|
||||
moduleid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
minimumweight integer NOT NULL
|
||||
moduleid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
minimumweight integer NOT NULL
|
||||
);
|
||||
ALTER TABLE public.agtmodreq OWNER TO migadmin;
|
||||
CREATE UNIQUE INDEX agtmodreq_moduleid_agentid_idx ON agtmodreq USING btree (moduleid, agentid);
|
||||
|
@ -50,13 +64,13 @@ CREATE INDEX agtmodreq_agentid_idx ON agtmodreq USING btree (agentid);
|
|||
CREATE INDEX agtmodreq_moduleid_idx ON agtmodreq USING btree (moduleid);
|
||||
|
||||
CREATE TABLE commands (
|
||||
id numeric NOT NULL,
|
||||
actionid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
status character varying(255) NOT NULL,
|
||||
results json,
|
||||
starttime timestamp with time zone NOT NULL,
|
||||
finishtime timestamp with time zone
|
||||
id numeric NOT NULL,
|
||||
actionid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
status character varying(255) NOT NULL,
|
||||
results json,
|
||||
starttime timestamp with time zone NOT NULL,
|
||||
finishtime timestamp with time zone
|
||||
);
|
||||
ALTER TABLE public.commands OWNER TO migadmin;
|
||||
ALTER TABLE ONLY commands
|
||||
|
@ -65,10 +79,10 @@ CREATE INDEX commands_agentid ON commands(agentid DESC);
|
|||
CREATE INDEX commands_actionid ON commands(actionid DESC);
|
||||
|
||||
CREATE TABLE invagtmodperm (
|
||||
investigatorid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
moduleid numeric NOT NULL,
|
||||
weight integer NOT NULL
|
||||
investigatorid numeric NOT NULL,
|
||||
agentid numeric NOT NULL,
|
||||
moduleid numeric NOT NULL,
|
||||
weight integer NOT NULL
|
||||
);
|
||||
ALTER TABLE public.invagtmodperm OWNER TO migadmin;
|
||||
CREATE UNIQUE INDEX invagtmodperm_investigatorid_agentid_moduleid_idx ON invagtmodperm USING btree (investigatorid, agentid, moduleid);
|
||||
|
@ -78,14 +92,14 @@ CREATE INDEX invagtmodperm_moduleid_idx ON invagtmodperm USING btree (moduleid);
|
|||
|
||||
CREATE SEQUENCE investigators_id_seq START 1;
|
||||
CREATE TABLE investigators (
|
||||
id numeric NOT NULL DEFAULT nextval('investigators_id_seq'),
|
||||
name character varying(1024) NOT NULL,
|
||||
pgpfingerprint character varying(128),
|
||||
publickey bytea,
|
||||
privatekey bytea,
|
||||
status character varying(255) NOT NULL,
|
||||
createdat timestamp with time zone NOT NULL,
|
||||
lastmodified timestamp with time zone NOT NULL
|
||||
id numeric NOT NULL DEFAULT nextval('investigators_id_seq'),
|
||||
name character varying(1024) NOT NULL,
|
||||
pgpfingerprint character varying(128),
|
||||
publickey bytea,
|
||||
privatekey bytea,
|
||||
status character varying(255) NOT NULL,
|
||||
createdat timestamp with time zone NOT NULL,
|
||||
lastmodified timestamp with time zone NOT NULL
|
||||
);
|
||||
ALTER TABLE public.investigators OWNER TO migadmin;
|
||||
ALTER TABLE ONLY investigators
|
||||
|
@ -93,17 +107,17 @@ ALTER TABLE ONLY investigators
|
|||
CREATE UNIQUE INDEX investigators_pgpfingerprint_idx ON investigators USING btree (pgpfingerprint);
|
||||
|
||||
CREATE TABLE modules (
|
||||
id numeric NOT NULL,
|
||||
name character varying(256) NOT NULL
|
||||
id numeric NOT NULL,
|
||||
name character varying(256) NOT NULL
|
||||
);
|
||||
ALTER TABLE public.modules OWNER TO migadmin;
|
||||
ALTER TABLE ONLY modules
|
||||
ADD CONSTRAINT modules_pkey PRIMARY KEY (id);
|
||||
|
||||
CREATE TABLE signatures (
|
||||
actionid numeric NOT NULL,
|
||||
investigatorid numeric NOT NULL,
|
||||
pgpsignature character varying(4096) NOT NULL
|
||||
actionid numeric NOT NULL,
|
||||
investigatorid numeric NOT NULL,
|
||||
pgpsignature character varying(4096) NOT NULL
|
||||
);
|
||||
ALTER TABLE public.signatures OWNER TO migadmin;
|
||||
CREATE UNIQUE INDEX signatures_actionid_investigatorid_idx ON signatures USING btree (actionid, investigatorid);
|
||||
|
@ -136,12 +150,12 @@ ALTER TABLE ONLY signatures
|
|||
|
||||
-- Scheduler can read all tables, insert and select private keys in the investigators table, but cannot update investigators
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA public TO migscheduler;
|
||||
GRANT INSERT, UPDATE ON actions, commands, agents, signatures TO migscheduler;
|
||||
GRANT INSERT, UPDATE ON actions, commands, agents, agents_stats, signatures TO migscheduler;
|
||||
GRANT INSERT ON investigators TO migscheduler;
|
||||
GRANT USAGE ON SEQUENCE investigators_id_seq TO migscheduler;
|
||||
|
||||
-- API has limited permissions, and cannot list scheduler private keys in the investigators table, but can update their statuses
|
||||
GRANT SELECT ON actions, agents, agtmodreq, commands, invagtmodperm, modules, signatures TO migapi;
|
||||
GRANT SELECT ON actions, agents, agents_stats, agtmodreq, commands, invagtmodperm, modules, signatures TO migapi;
|
||||
GRANT SELECT (id, name, pgpfingerprint, publickey, status, createdat, lastmodified) ON investigators TO migapi;
|
||||
GRANT INSERT ON actions, signatures TO migapi;
|
||||
GRANT INSERT (name, pgpfingerprint, publickey, status, createdat, lastmodified) ON investigators TO migapi;
|
||||
|
|
|
@ -13,11 +13,11 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// spoolInspection walks through the local directories and performs the following
|
||||
// collector walks through the local directories and performs the following
|
||||
// 1. load actions and commandsthat are sitting in the directories and waiting for processing
|
||||
// 2. evaluate actions and commands that are inflight (todo)
|
||||
// 3. remove finished and invalid actions and commands once the DeleteAfter period is passed
|
||||
func spoolInspection(ctx Context) (err error) {
|
||||
func collector(ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("spoolInspection() -> %v", e)
|
||||
|
@ -42,23 +42,6 @@ func spoolInspection(ctx Context) (err error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = cleanDir(ctx, ctx.Directories.Action.Done)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = cleanDir(ctx, ctx.Directories.Action.Invalid)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = markOfflineAgents(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = markIdleAgents(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -225,76 +208,3 @@ func expireCommands(ctx Context) (err error) {
|
|||
dir.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// cleanDir walks through a directory and delete the files that
|
||||
// are older than the configured DeleteAfter parameter
|
||||
func cleanDir(ctx Context, targetDir string) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("cleanDir() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving cleanDir()"}.Debug()
|
||||
}()
|
||||
deletionPoint, err := time.ParseDuration(ctx.Collector.DeleteAfter)
|
||||
dir, err := os.Open(targetDir)
|
||||
dirContent, err := dir.Readdir(-1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// loop over the content of the directory
|
||||
for _, DirEntry := range dirContent {
|
||||
if !DirEntry.Mode().IsRegular() {
|
||||
// ignore non file
|
||||
continue
|
||||
}
|
||||
// if the DeleteAfter value is after the time of last modification,
|
||||
// the file is due for deletion
|
||||
if time.Now().Add(-deletionPoint).After(DirEntry.ModTime()) {
|
||||
filename := targetDir + "/" + DirEntry.Name()
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("removing '%s'", filename)}
|
||||
os.Remove(filename)
|
||||
}
|
||||
}
|
||||
dir.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// markOfflineAgents updates the status of idle agents that passed the agent timeout to "offline"
|
||||
func markOfflineAgents(ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("markOfflineAgents() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving markOfflineAgents()"}.Debug()
|
||||
}()
|
||||
timeOutPeriod, err := time.ParseDuration(ctx.Agent.TimeOut)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pointInTime := time.Now().Add(-timeOutPeriod)
|
||||
err = ctx.DB.MarkOfflineAgents(pointInTime)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// markIdleAgents updates the status of agents that stopped sending heartbeats
|
||||
func markIdleAgents(ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("markIdleAgents() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: "leaving markIdleAgents()"}.Debug()
|
||||
}()
|
||||
hbFreq, err := time.ParseDuration(ctx.Agent.HeartbeatFreq)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pointInTime := time.Now().Add(-hbFreq * 5)
|
||||
err = ctx.DB.MarkIdleAgents(pointInTime)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -42,6 +42,9 @@ type Context struct {
|
|||
DetectDupAgents chan string
|
||||
}
|
||||
Collector struct {
|
||||
Freq string
|
||||
}
|
||||
Periodic struct {
|
||||
Freq, DeleteAfter string
|
||||
}
|
||||
Directories struct {
|
||||
|
|
|
@ -7,18 +7,14 @@ package main
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/streadway/amqp"
|
||||
"mig"
|
||||
"mig/pgp"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/howeyc/fsnotify"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
// build version
|
||||
|
@ -44,304 +40,9 @@ func main() {
|
|||
}
|
||||
fmt.Fprintf(os.Stderr, "OK\n")
|
||||
|
||||
// Goroutine that handles events, such as logs and panics,
|
||||
// and decides what to do with them
|
||||
go func() {
|
||||
for event := range ctx.Channels.Log {
|
||||
stop, err := mig.ProcessLog(ctx.Logging, event)
|
||||
if err != nil {
|
||||
panic("Unable to process logs")
|
||||
}
|
||||
// if ProcessLog says we should stop now, feed the Terminate chan
|
||||
if stop {
|
||||
ctx.Channels.Terminate <- errors.New(event.Desc)
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "mig.ProcessLog() routine started"}
|
||||
|
||||
// Watch the data directories for new files
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go watchDirectories(watcher, ctx)
|
||||
// TODO: looks like the watchers are lost after a while. the (ugly) loop
|
||||
// below reinits the watchers every 137 seconds to prevent that from happening
|
||||
go func() {
|
||||
for {
|
||||
err = initWatchers(watcher, ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(137 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
// Goroutine that loads actions dropped into ctx.Directories.Action.New
|
||||
go func() {
|
||||
for actionPath := range ctx.Channels.NewAction {
|
||||
ctx.OpID = mig.GenID()
|
||||
err := processNewAction(actionPath, ctx)
|
||||
// if something fails in the action processing, move it to the invalid folder
|
||||
if err != nil {
|
||||
// move action to INVALID folder and log
|
||||
dest := fmt.Sprintf("%s/%d.json", ctx.Directories.Action.Invalid, time.Now().UTC().UnixNano())
|
||||
os.Rename(actionPath, dest)
|
||||
reason := fmt.Sprintf("%v. '%s' moved to '%s'", err, actionPath, dest)
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: reason}.Warning()
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "processNewAction() routine started"}
|
||||
|
||||
// Goroutine that loads and sends commands dropped in ready state
|
||||
// it uses a select and a timeout to load a batch of commands instead of
|
||||
// sending them one by one
|
||||
go func() {
|
||||
ctx.OpID = mig.GenID()
|
||||
readyCmd := make(map[float64]mig.Command)
|
||||
ctr := 0
|
||||
for {
|
||||
select {
|
||||
case cmd := <-ctx.Channels.CommandReady:
|
||||
ctr++
|
||||
readyCmd[cmd.ID] = cmd
|
||||
case <-time.After(1 * time.Second):
|
||||
if ctr > 0 {
|
||||
var cmds []mig.Command
|
||||
for id, cmd := range readyCmd {
|
||||
cmds = append(cmds, cmd)
|
||||
delete(readyCmd, id)
|
||||
}
|
||||
err := sendCommands(cmds, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
}
|
||||
}
|
||||
// reinit
|
||||
ctx.OpID = mig.GenID()
|
||||
ctr = 0
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "sendCommands() routine started"}
|
||||
|
||||
// Goroutine that loads commands from the ctx.Directories.Command.Returned and marks
|
||||
// them as finished or cancelled
|
||||
go func() {
|
||||
ctx.OpID = mig.GenID()
|
||||
returnedCmd := make(map[uint64]string)
|
||||
var ctr uint64 = 0
|
||||
for {
|
||||
select {
|
||||
case cmdFile := <-ctx.Channels.CommandReturned:
|
||||
ctr++
|
||||
returnedCmd[ctr] = cmdFile
|
||||
case <-time.After(1 * time.Second):
|
||||
if ctr > 0 {
|
||||
var cmdFiles []string
|
||||
for id, cmdFile := range returnedCmd {
|
||||
cmdFiles = append(cmdFiles, cmdFile)
|
||||
delete(returnedCmd, id)
|
||||
}
|
||||
err := returnCommands(cmdFiles, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
}
|
||||
}
|
||||
// reinit
|
||||
ctx.OpID = mig.GenID()
|
||||
ctr = 0
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "terminateCommand() routine started"}
|
||||
|
||||
// Goroutine that updates an action when a command is done
|
||||
go func() {
|
||||
ctx.OpID = mig.GenID()
|
||||
doneCmd := make(map[float64]mig.Command)
|
||||
ctr := 0
|
||||
for {
|
||||
select {
|
||||
case cmd := <-ctx.Channels.CommandDone:
|
||||
ctr++
|
||||
doneCmd[cmd.ID] = cmd
|
||||
case <-time.After(1 * time.Second):
|
||||
if ctr > 0 {
|
||||
var cmds []mig.Command
|
||||
for id, cmd := range doneCmd {
|
||||
cmds = append(cmds, cmd)
|
||||
delete(doneCmd, id)
|
||||
}
|
||||
err := updateAction(cmds, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{OpID: ctx.OpID, Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
}
|
||||
}
|
||||
// reinit
|
||||
ctx.OpID = mig.GenID()
|
||||
ctr = 0
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "updateAction() routine started"}
|
||||
|
||||
// start a listening channel to receive heartbeats from agents
|
||||
heartbeatsChan, err := startHeartbeatsListener(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
for msg := range heartbeatsChan {
|
||||
ctx.OpID = mig.GenID()
|
||||
err := getHeartbeats(msg, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "agents heartbeats listener routine started"}
|
||||
|
||||
// start a listening channel to results from agents
|
||||
agtResultsChan, err := startResultsListener(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
for delivery := range agtResultsChan {
|
||||
ctx.OpID = mig.GenID()
|
||||
// validate the size of the data received, and make sure its first and
|
||||
// last bytes are valid json enclosures
|
||||
if len(delivery.Body) < 10 || delivery.Body[0] != '{' || delivery.Body[len(delivery.Body)-1] != '}' {
|
||||
ctx.Channels.Log <- mig.Log{
|
||||
OpID: ctx.OpID,
|
||||
Desc: fmt.Sprintf("discarding invalid message received in results channel"),
|
||||
}.Err()
|
||||
break
|
||||
}
|
||||
// write to disk in Returned directory
|
||||
dest := fmt.Sprintf("%s/%.0f", ctx.Directories.Command.Returned, ctx.OpID)
|
||||
err = safeWrite(ctx, dest, delivery.Body)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{
|
||||
OpID: ctx.OpID,
|
||||
Desc: fmt.Sprintf("failed to write agent results to disk: %v", err),
|
||||
}.Err()
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "agents results listener routine started"}
|
||||
|
||||
// launch the routine that regularly walks through the local directories
|
||||
go func() {
|
||||
collectorSleeper, err := time.ParseDuration(ctx.Collector.Freq)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for {
|
||||
ctx.OpID = mig.GenID()
|
||||
err := spoolInspection(ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
}
|
||||
time.Sleep(collectorSleeper)
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "spoolInspection() routine started"}
|
||||
|
||||
// launch the routine that handles multi agents on same queue
|
||||
go func() {
|
||||
for queueLoc := range ctx.Channels.DetectDupAgents {
|
||||
ctx.OpID = mig.GenID()
|
||||
err = inspectMultiAgents(queueLoc, ctx)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%v", err)}.Err()
|
||||
}
|
||||
}
|
||||
}()
|
||||
ctx.Channels.Log <- mig.Log{Desc: "inspectMultiAgents() routine started"}
|
||||
|
||||
// won't exit until this chan received something
|
||||
exitReason := <-ctx.Channels.Terminate
|
||||
fmt.Fprintf(os.Stderr, "Scheduler is shutting down. Reason: %s", exitReason)
|
||||
Destroy(ctx)
|
||||
}
|
||||
|
||||
// initWatchers initializes the watcher flags for all the monitored directories
|
||||
func initWatchers(watcher *fsnotify.Watcher, ctx Context) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("initWatchers() -> %v", e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{Desc: "leaving initWatchers()"}.Debug()
|
||||
}()
|
||||
|
||||
err = watcher.WatchFlags(ctx.Directories.Action.New, fsnotify.FSN_CREATE)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("%v '%s'", err, ctx.Directories.Action.New)
|
||||
panic(e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("watcher.Watch(): %s", ctx.Directories.Action.New)}.Debug()
|
||||
|
||||
err = watcher.WatchFlags(ctx.Directories.Command.InFlight, fsnotify.FSN_CREATE)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("%v '%s'", err, ctx.Directories.Command.InFlight)
|
||||
panic(e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("watcher.Watch(): %s", ctx.Directories.Command.InFlight)}.Debug()
|
||||
|
||||
err = watcher.WatchFlags(ctx.Directories.Command.Returned, fsnotify.FSN_CREATE)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("%v '%s'", err, ctx.Directories.Command.Returned)
|
||||
panic(e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("watcher.Watch(): %s", ctx.Directories.Command.Returned)}.Debug()
|
||||
|
||||
err = watcher.WatchFlags(ctx.Directories.Action.Done, fsnotify.FSN_CREATE)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("%v '%s'", err, ctx.Directories.Action.Done)
|
||||
panic(e)
|
||||
}
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("watcher.Watch(): %s", ctx.Directories.Action.Done)}.Debug()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// watchDirectories calls specific function when a file appears in a watched directory
|
||||
func watchDirectories(watcher *fsnotify.Watcher, ctx Context) {
|
||||
for {
|
||||
select {
|
||||
case ev := <-watcher.Event:
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("watchDirectories(): %s", ev.String())}.Debug()
|
||||
|
||||
// New file detected, but the file size might still be zero, because inotify wakes up before
|
||||
// the file is fully written. If that's the case, wait a little and hope that's enough to finish writing
|
||||
err := waitForFileOrDelete(ev.Name, 5)
|
||||
if err != nil {
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("error while reading '%s': %v", ev.Name, err)}.Err()
|
||||
goto nextfile
|
||||
}
|
||||
// Use the prefix of the filename to send it to the appropriate channel
|
||||
if strings.HasPrefix(ev.Name, ctx.Directories.Action.New) {
|
||||
ctx.Channels.NewAction <- ev.Name
|
||||
} else if strings.HasPrefix(ev.Name, ctx.Directories.Command.InFlight) {
|
||||
ctx.Channels.UpdateCommand <- ev.Name
|
||||
} else if strings.HasPrefix(ev.Name, ctx.Directories.Command.Returned) {
|
||||
ctx.Channels.CommandReturned <- ev.Name
|
||||
} else if strings.HasPrefix(ev.Name, ctx.Directories.Action.Done) {
|
||||
ctx.Channels.ActionDone <- ev.Name
|
||||
}
|
||||
case err := <-watcher.Error:
|
||||
// in case of error, raise an emergency
|
||||
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("watchDirectories(): %v", err)}.Emerg()
|
||||
}
|
||||
nextfile:
|
||||
}
|
||||
// startroutines is where all the work is done.
|
||||
// it blocks until a Terminate message is received
|
||||
startRoutines(ctx)
|
||||
}
|
||||
|
||||
// processNewAction is called when a new action is available. It pulls
|
||||
|
|
Загрузка…
Ссылка в новой задаче