add audit and dispatch modules for Linux

This adds two new modules, audit and dispatch with additional
modifications to the agent.

The audit module can be used to read and parse the kernel audit trail on
Linux systems. The agent can then log the audit messages, or write them
to the dispatch module.

The dispatch module is a general module used for alert forwarding from
the agent. If the dispatch module is running, any alerts the agent
receives from persistent modules will be forwarded to the active
dispatch module, where the dispatch module can write the message to a
remote endpoint.
This commit is contained in:
Aaron Meihm 2017-02-28 11:12:30 -06:00
Родитель 31a6fbf5e1
Коммит 3fb3e9745d
16 изменённых файлов: 882 добавлений и 10 удалений

Просмотреть файл

@ -508,6 +508,9 @@ func startRoutines(ctx *Context) (err error) {
// Start up the HTTP stats socket
go initSocket(ctx)
// GoRoutine that processes incoming alert messages on the alert channel
go alertProcessor(ctx)
// GoRoutine that parses and validates incoming commands
go func() {
for msg := range ctx.Channels.NewCommand {

45
mig-agent/alert.go Normal file
Просмотреть файл

@ -0,0 +1,45 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor:
// - Aaron Meihm ameihm@mozilla.com [:alm]
package main
import (
"fmt"
"mig.ninja/mig"
)
// alertHandler is called inside the agent when it recieves an alert message from a
// persistent module.
func alertHandler(ctx *Context, alert string) {
ctx.Channels.Alert <- alert
}
// alertProcessor processes incoming alert messages from the alert channel and handles
// them. This results in either the alert being written to the dispatch module (if the
// dispatch module is active) or the alert being written to the agent log.
func alertProcessor(ctx *Context) {
ctx.Channels.Log <- mig.Log{Desc: "starting alert processor"}.Info()
var alert string
for {
alert = <-ctx.Channels.Alert
// If the dispatch module is active, write the message to the dispatch
// channel
dispatchChanLock.Lock()
if dispatchChan != nil {
dispatchChan <- alert
dispatchChanLock.Unlock()
continue
}
dispatchChanLock.Unlock()
// Lastly, if no dispatch module is available, just write the alert to the
// agent log.
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("alert: %v", alert)}.Info()
}
}

Просмотреть файл

@ -65,6 +65,7 @@ type Context struct {
NewCommand chan []byte
RunAgentCommand, RunExternalCommand chan moduleOp
Results chan mig.Command
Alert chan string
}
MQ struct {
// configuration
@ -290,6 +291,7 @@ func initChannels(orig_ctx Context) (ctx Context, err error) {
ctx.Channels.RunAgentCommand = make(chan moduleOp, 5)
ctx.Channels.RunExternalCommand = make(chan moduleOp, 5)
ctx.Channels.Results = make(chan mig.Command, 5)
ctx.Channels.Alert = make(chan string, 128)
ctx.Channels.Log = make(chan mig.Log, 97)
ctx.Channels.Log <- mig.Log{Desc: "leaving initChannels()"}.Debug()
return

Просмотреть файл

@ -22,6 +22,17 @@ import (
"gopkg.in/gcfg.v1"
)
// dispatchChan is a special channel used to route messages to the dispatch module.
//
// If non-nil, the alert processor will write messages to this channel, which will then
// be sent to the dispatch module. The dispatch channel will only be non-nil if the
// dispatch module is active.
//
// The channel is protected by a mutex, to ensure deallocation of the channel if the
// dispatch module fails is exclusive and writes cannot occur while this happens.
var dispatchChan chan string
var dispatchChanLock sync.Mutex
// persistModuleRegister maintains a map of the running persistent modules, and
// any socket specification registered for that module.
//
@ -113,15 +124,16 @@ func startPersistModule(ctx *Context, name string) (err error) {
// the agent is running, this function will execute in a go-routine.
func managePersistModule(ctx *Context, name string) {
var (
cmd *exec.Cmd
isRunning bool
pipeout modules.ModuleWriter
pipein modules.ModuleReader
err error
failDelay bool
killModule bool
inChan chan modules.Message
lastPing time.Time
cmd *exec.Cmd
isRunning bool
pipeout modules.ModuleWriter
pipein modules.ModuleReader
err error
failDelay bool
killModule bool
inChan chan modules.Message
lastPing time.Time
localDispatch chan string
)
logfunc := func(f string, a ...interface{}) {
@ -130,6 +142,18 @@ func managePersistModule(ctx *Context, name string) {
ctx.Channels.Log <- mig.Log{Desc: buf}.Info()
}
// dispatchDealloc is used to deallocate the dispatch channel, if this module
// isn't the dispatch module, localDispatch will always be nil
dispatchDealloc := func() {
if localDispatch != nil {
dispatchChanLock.Lock()
close(dispatchChan)
dispatchChan = nil
localDispatch = nil
dispatchChanLock.Unlock()
}
}
pingtick := time.Tick(time.Second * 10)
for {
@ -201,11 +225,24 @@ func managePersistModule(ctx *Context, name string) {
continue
}
}
// If we are the dispatch module, initialize the dispatch channel
if name == "dispatch" {
if dispatchChan == nil {
logfunc("initializing dispatch channel")
dispatchChanLock.Lock()
dispatchChan = make(chan string, 128)
localDispatch = dispatchChan
dispatchChanLock.Unlock()
}
}
select {
case msg, ok := <-inChan:
if !ok {
err = cmd.Wait()
logfunc("module is down, %v", err)
dispatchDealloc()
isRunning = false
persistModRegister.remove(name)
failDelay = true
@ -227,6 +264,19 @@ func managePersistModule(ctx *Context, name string) {
break
}
logfunc("(module log) %v", lp.Message)
case modules.MsgClassAlert:
var ap modules.AlertParams
buf, err := json.Marshal(msg.Parameters)
if err != nil {
logfunc("%v", err)
break
}
err = json.Unmarshal(buf, &ap)
if err != nil {
logfunc("%v", err)
break
}
alertHandler(ctx, ap.Message)
case modules.MsgClassRegister:
var rp modules.RegParams
buf, err := json.Marshal(msg.Parameters)
@ -246,6 +296,21 @@ func managePersistModule(ctx *Context, name string) {
killModule = true
break
}
case alertmsg := <-localDispatch:
am, err := modules.MakeMessageAlert(alertmsg)
if err != nil {
logfunc("failed to create alert, %v", err)
break
}
err = modules.WriteOutput(am, pipeout)
if err != nil {
logfunc("dispatch alert failed, %v", err)
dispatchDealloc()
isRunning = false
persistModRegister.remove(name)
failDelay = true
break
}
case _ = <-pingtick:
// If we haven't received a reply in the past 3 cycles we will
// kill the module
@ -265,6 +330,7 @@ func managePersistModule(ctx *Context, name string) {
err = modules.WriteOutput(pm, pipeout)
if err != nil {
logfunc("ping failed, %v", err)
dispatchDealloc()
isRunning = false
persistModRegister.remove(name)
failDelay = true
@ -282,6 +348,7 @@ func managePersistModule(ctx *Context, name string) {
return
}
_ = cmd.Wait()
dispatchDealloc()
isRunning = false
persistModRegister.remove(name)
failDelay = true

188
modules/audit/audit.go Normal file
Просмотреть файл

@ -0,0 +1,188 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
// Package audit implements a persistent module which can read and parse the operating
// system audit trail. The module turns each audit event read from the kernel into a
// module alert, and writes it to the master agent process where the agent process can either
// write the audit event to it's log or send it to the dispatch module.
//
// The audit module is currently only supported on Linux.
package audit /* import "mig.ninja/mig/modules/audit" */
import (
"encoding/json"
"fmt"
"runtime"
"mig.ninja/mig/modules"
)
type module struct {
}
// NewRun returns a new instance of a modules.Runner for this module.
func (m *module) NewRun() modules.Runner {
return new(run)
}
func init() {
modules.Register("audit", new(module))
}
type run struct {
Parameters Parameters
Results modules.Result
}
func buildResults(e elements, r *modules.Result) (buf []byte, err error) {
r.Success = true
r.Elements = e
r.FoundAnything = true
buf, err = json.Marshal(r)
return
}
var logChan chan string
var alertChan chan string
var handlerErrChan chan error
var configChan chan []byte
func moduleMain() {
var cfg config
incfg := <-configChan
err := json.Unmarshal(incfg, &cfg)
if err != nil {
handlerErrChan <- err
return
}
logChan <- "module received configuration"
err = initializeAudit(cfg)
if err != nil {
handlerErrChan <- err
return
}
err = runAudit()
if err != nil {
handlerErrChan <- err
return
}
}
func requestHandler(p interface{}) (ret string) {
var results modules.Result
defer func() {
if e := recover(); e != nil {
results.Errors = append(results.Errors, fmt.Sprintf("%v", e))
results.Success = false
err, _ := json.Marshal(results)
ret = string(err)
return
}
}()
e := elements{Ok: true}
resp, err := buildResults(e, &results)
if err != nil {
panic(err)
}
return string(resp)
}
type config struct {
Audit struct {
RulesPath string `json:"rulespath"`
RateLimit int `json:"ratelimit"`
BacklogLimit int `json:"backloglimit"`
} `json:"audit"`
}
// PersistModConfig returns a new configuration structure for this module.
func (r *run) PersistModConfig() interface{} {
return &config{}
}
// RunPersist is the entry point for persistent execution of this module.
func (r *run) RunPersist(in modules.ModuleReader, out modules.ModuleWriter) {
alertChan = make(chan string, 64)
logChan = make(chan string, 64)
regChan := make(chan string, 64)
handlerErrChan = make(chan error, 64)
configChan = make(chan []byte, 1)
go moduleMain()
l, spec, err := modules.GetPersistListener("audit")
if err != nil {
handlerErrChan <- err
} else {
regChan <- spec
}
go modules.HandlePersistRequest(l, requestHandler, handlerErrChan)
modules.DefaultPersistHandlers(in, out, logChan, handlerErrChan, regChan,
alertChan, configChan)
}
// Run is the entry point for standard (e.g., query) based invocation of this module.
func (r *run) Run(in modules.ModuleReader) (resStr string) {
defer func() {
if e := recover(); e != nil {
// return error in json
r.Results.Errors = append(r.Results.Errors, fmt.Sprintf("%v", e))
r.Results.Success = false
err, _ := json.Marshal(r.Results)
resStr = string(err)
return
}
}()
runtime.GOMAXPROCS(1)
sockspec, err := modules.ReadPersistInputParameters(in, &r.Parameters)
if err != nil {
panic(err)
}
err = r.ValidateParameters()
if err != nil {
panic(err)
}
resStr = modules.SendPersistRequest(r.Parameters, sockspec)
return
}
// ValidateParameters validates the parameters set in the runner for the module.
func (r *run) ValidateParameters() (err error) {
return
}
// PrintResults returns the results of a query of this module in human readable form.
func (r *run) PrintResults(result modules.Result, foundOnly bool) (prints []string, err error) {
var (
elem elements
)
err = result.GetElements(&elem)
if err != nil {
panic(err)
}
resStr := fmt.Sprintf("ok:%v", elem.Ok)
prints = append(prints, resStr)
if !foundOnly {
for _, we := range result.Errors {
prints = append(prints, we)
}
}
return
}
type elements struct {
Ok bool `json:"ok"`
}
// Parameters defines any query parameters used in this module.
type Parameters struct {
}
func newParameters() *Parameters {
return &Parameters{}
}

Просмотреть файл

@ -0,0 +1,16 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package audit /* import "mig.ninja/mig/modules/audit" */
import (
"mig.ninja/mig/testutil"
"testing"
)
func TestRegistration(t *testing.T) {
testutil.CheckModuleRegistration(t, "audit")
}

46
modules/audit/doc.rst Normal file
Просмотреть файл

@ -0,0 +1,46 @@
====================================
Mozilla InvestiGator: audit module
====================================
:Author: Aaron Meihm <ameihm@mozilla.com>
.. sectnum::
.. contents:: Table of Contents
The audit module can be used to read kernel audit messages (e.g., Linux audit) and either
write them to the agent's log file, or more typically dispatch the audit trail off the system
using the dispatch module.
Currently only Linux is supported by the audit module.
Auditing support for Linux is implemented using `libaudit-go <https://github.com/mozilla/libaudit-go>`_.
Usage
-----
The audit module is a persistent module. When the agent is configured with the audit module, it
will spawn the audit module as a subprocess and initialize auditing on the platform. The audit
module will enable audit, load the audit rules indicated in the configuration into the kernel, and
begin collecting/parsing audit data from the kernel.
If the dispatch module is also loaded with the agent, any audit messages will be sent to the
dispatch module where they can be transmitted to an event collection system. Otherwise, the agent
will simply log the JSON formatted audit messages in the agent log.
Configuration
-------------
The audit module is configured using ``audit.cfg`` in the agent configuration directory, ``/etc/mig``.
.. code::
[audit]
rulespath = /etc/mig/audit.rules.json
ratelimit = 500
backloglimit = 16384
``rulespath`` indicates the path to load audit rules into the kernel from. Note that this is not a
standard audit configuration, but a JSON based rule set as is used in
`libaudit-go <https://github.com/mozilla/libaudit-go>`_.
``ratelimit`` and ``backloglimit`` can be used to configure the Linux auditing rate and back log
limits. If respective defaults of 500 and 16384 will be used.

Просмотреть файл

@ -0,0 +1,19 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package audit /* import "mig.ninja/mig/modules/audit" */
import (
"fmt"
)
func initializeAudit(cfg config) error {
return fmt.Errorf("audit module not supported on darwin")
}
func runAudit() error {
return fmt.Errorf("audit module not supported on darwin")
}

Просмотреть файл

@ -0,0 +1,94 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package audit /* import "mig.ninja/mig/modules/audit" */
import (
"encoding/json"
"fmt"
libaudit "github.com/mozilla/libaudit-go"
"io/ioutil"
"syscall"
)
var auditsock *libaudit.NetlinkConnection
// initializeAudit initializes the auditing subsystem . It will load rules
// from the rule path in the configuration, and enable auditing with the specified
// parameters.
func initializeAudit(cfg config) error {
var err error
auditsock, err = libaudit.NewNetlinkConnection()
if err != nil {
return fmt.Errorf("NewNetlinkConnection: %v", err)
}
err = libaudit.AuditSetEnabled(auditsock, true)
if err != nil {
return fmt.Errorf("AuditSetEnabled: %v", err)
}
status, err := libaudit.AuditIsEnabled(auditsock)
if err != nil {
return fmt.Errorf("AuditIsEnabled: %v", err)
}
if !status {
return fmt.Errorf("was not possible to enable audit")
}
logChan <- "auditing enabled"
// Configure audit as required
if cfg.Audit.RateLimit == 0 { // Set a default rate limit if needed
cfg.Audit.RateLimit = 500
}
err = libaudit.AuditSetRateLimit(auditsock, cfg.Audit.RateLimit)
if err != nil {
return fmt.Errorf("AuditSetRateLimit: %v", err)
}
if cfg.Audit.BacklogLimit == 0 { // Set default backlog if needed
cfg.Audit.BacklogLimit = 16384
}
err = libaudit.AuditSetBacklogLimit(auditsock, cfg.Audit.BacklogLimit)
if err != nil {
return fmt.Errorf("AuditSetBacklogLimit: %v", err)
}
err = libaudit.AuditSetPID(auditsock, syscall.Getpid())
if err != nil {
return fmt.Errorf("AuditSetPID: %v", err)
}
err = libaudit.DeleteAllRules(auditsock)
if err != nil {
return fmt.Errorf("DeleteAllRules: %v", err)
}
rulebuf, err := ioutil.ReadFile(cfg.Audit.RulesPath)
if err != nil {
return err
}
warnings, err := libaudit.SetRules(auditsock, rulebuf)
if err != nil {
return fmt.Errorf("SetRules: %v", err)
}
for _, x := range warnings {
logChan <- fmt.Sprintf("ruleset warning: %v", x)
}
logChan <- "auditing configured"
return nil
}
func runAudit() error {
doneChan := make(chan bool, 0)
logChan <- "listening for audit events"
libaudit.GetAuditMessages(auditsock, callback, &doneChan)
return nil
}
func callback(msg *libaudit.AuditEvent, callerr error) {
// In our callback, we want to simply marshal the audit event and write it to the
// modules alert channel
buf, err := json.Marshal(msg)
if err != nil {
return
}
alertChan <- string(buf)
}

Просмотреть файл

@ -0,0 +1,19 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package audit /* import "mig.ninja/mig/modules/audit" */
import (
"fmt"
)
func initializeAudit(cfg config) error {
return fmt.Errorf("audit module not supported on windows")
}
func runAudit() error {
return fmt.Errorf("audit module not supported on windows")
}

Просмотреть файл

@ -0,0 +1,22 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package audit /* import "mig.ninja/mig/modules/audit" */
import (
"fmt"
)
func printHelp(isCmd bool) {
fmt.Printf(`Query parameters
----------------
This module has no parameters.
`)
}
func (r *run) ParamsParser(args []string) (interface{}, error) {
return r.Parameters, r.ValidateParameters()
}

Просмотреть файл

@ -0,0 +1,217 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
// Package dispatch implements alert dispatching for the agent as a module.
// Persistent modules which generate alerts will have these alerts forwarded
// to this module if the dispatch module is active. The dispatch module can then
// forward the alerts on based on it's configuration.
package dispatch /* import "mig.ninja/mig/modules/dispatch" */
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"runtime"
"mig.ninja/mig/modules"
)
type module struct {
}
// NewRun returns a new instance of a modules.Runner for this module.
func (m *module) NewRun() modules.Runner {
return new(run)
}
func init() {
modules.Register("dispatch", new(module))
}
type run struct {
Parameters Parameters
Results modules.Result
}
func buildResults(e elements, r *modules.Result) (buf []byte, err error) {
r.Success = true
r.Elements = e
r.FoundAnything = true
buf, err = json.Marshal(r)
return
}
var logChan chan string
var alertChan chan string
var handlerErrChan chan error
var configChan chan []byte
// messageBuf is a queue used to store incoming messages, and is drained by
// runDispatch
var messageBuf chan string
func moduleMain() {
var cfg config
incfg := <-configChan
err := json.Unmarshal(incfg, &cfg)
if err != nil {
handlerErrChan <- err
return
}
logChan <- "module received configuration"
messageBuf = make(chan string, 1024)
// Register the dispatch function, which will be called when the module
// recieves an alert message from the agent.
modules.RegisterDispatchFunction(dispatchIn)
// Start the dispatch dequeue function.
err = runDispatch(cfg)
if err != nil {
handlerErrChan <- err
return
}
}
func dispatchIn(msg string) {
select {
case messageBuf <- msg:
default:
// If we can't queue the message it is just dropped
}
}
func runDispatch(cfg config) error {
var httpClient http.Client
for {
msg := <-messageBuf
b := bytes.NewBufferString(msg)
// We make an assumption the alert content is always a JSON blob here.
resp, err := httpClient.Post(cfg.Dispatch.HTTPURL, "application/json", b)
if err != nil {
logChan <- fmt.Sprintf("http post: %v", err)
continue
}
resp.Body.Close()
}
return nil
}
func requestHandler(p interface{}) (ret string) {
var results modules.Result
defer func() {
if e := recover(); e != nil {
results.Errors = append(results.Errors, fmt.Sprintf("%v", e))
results.Success = false
err, _ := json.Marshal(results)
ret = string(err)
return
}
}()
e := elements{Ok: true}
resp, err := buildResults(e, &results)
if err != nil {
panic(err)
}
return string(resp)
}
type config struct {
Dispatch struct {
HTTPURL string `json:"httpurl"`
} `json:"dispatch"`
}
// PersistModConfig returns a new configuration structure for this module.
func (r *run) PersistModConfig() interface{} {
return &config{}
}
// RunPersist is the entry point for persistent execution of the module.
func (r *run) RunPersist(in modules.ModuleReader, out modules.ModuleWriter) {
alertChan = make(chan string, 64)
logChan = make(chan string, 64)
regChan := make(chan string, 64)
handlerErrChan = make(chan error, 64)
configChan = make(chan []byte, 1)
go moduleMain()
l, spec, err := modules.GetPersistListener("dispatch")
if err != nil {
handlerErrChan <- err
} else {
regChan <- spec
}
go modules.HandlePersistRequest(l, requestHandler, handlerErrChan)
modules.DefaultPersistHandlers(in, out, logChan, handlerErrChan, regChan,
alertChan, configChan)
}
// Run is the entry point for a standard (e.g., query) based invocation of the module.
func (r *run) Run(in modules.ModuleReader) (resStr string) {
defer func() {
if e := recover(); e != nil {
// return error in json
r.Results.Errors = append(r.Results.Errors, fmt.Sprintf("%v", e))
r.Results.Success = false
err, _ := json.Marshal(r.Results)
resStr = string(err)
return
}
}()
runtime.GOMAXPROCS(1)
sockspec, err := modules.ReadPersistInputParameters(in, &r.Parameters)
if err != nil {
panic(err)
}
err = r.ValidateParameters()
if err != nil {
panic(err)
}
resStr = modules.SendPersistRequest(r.Parameters, sockspec)
return
}
// ValidateParameters validates the parameters set in the runner for the module.
func (r *run) ValidateParameters() (err error) {
return
}
// PrintResults returns the results of a query of this module in human readable form.
func (r *run) PrintResults(result modules.Result, foundOnly bool) (prints []string, err error) {
var (
elem elements
)
err = result.GetElements(&elem)
if err != nil {
panic(err)
}
resStr := fmt.Sprintf("ok:%v", elem.Ok)
prints = append(prints, resStr)
if !foundOnly {
for _, we := range result.Errors {
prints = append(prints, we)
}
}
return
}
type elements struct {
Ok bool `json:"ok"`
}
// Parameters defines any query parameters used in this module.
type Parameters struct {
}
func newParameters() *Parameters {
return &Parameters{}
}

Просмотреть файл

@ -0,0 +1,16 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package dispatch /* import "mig.ninja/mig/modules/dispatch" */
import (
"mig.ninja/mig/testutil"
"testing"
)
func TestRegistration(t *testing.T) {
testutil.CheckModuleRegistration(t, "dispatch")
}

39
modules/dispatch/doc.rst Normal file
Просмотреть файл

@ -0,0 +1,39 @@
=====================================
Mozilla InvestiGator: dispatch module
=====================================
:Author: Aaron Meihm <ameihm@mozilla.com>
.. sectnum::
.. contents:: Table of Contents
The dispatch module is used to dispatch alerts to a remote system (e.g., an event
collector).
Usage
-----
Persistent modules have the ability to generate alerts. These alerts are sent from
the persistent module to the master agent process.
When the dispatch module is not running, the agent will simply write the alert in the
standard agent log.
If the dispatch module is active, the agent will instead forward the alert to the dispatch
module, and the module can manage buffering the alert and forwarding it on to the event
collection system configured in the module configuration.
Configuration
-------------
The dispatch module is configured using ``dispatch.cfg`` in the agent configuration directory,
``/etc/mig`` or ``c:\mig``.
.. code::
[dispatch]
httpurl = "https://api.to.post.to/event
Currently the dispatch module only supports HTTP POST of alert messages from modules to a
configured HTTP endpoint.
``httpurl`` can be used to configure a URL all alerts will be posted to.

Просмотреть файл

@ -0,0 +1,23 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package dispatch /* import "mig.ninja/mig/modules/dispatch" */
import (
"fmt"
)
func printHelp(isCmd bool) {
fmt.Printf(`Query parameters
----------------
This module has no parameters.
`)
}
// ParamsParser parses any parameters used in queries for this module.
func (r *run) ParamsParser(args []string) (interface{}, error) {
return r.Parameters, r.ValidateParameters()
}

Просмотреть файл

@ -50,6 +50,7 @@ const (
MsgClassLog MessageClass = "log"
MsgClassRegister MessageClass = "register"
MsgClassConfig MessageClass = "config"
MsgClassAlert MessageClass = "alert"
)
// Parameter format expected for a log message
@ -66,6 +67,11 @@ type ConfigParams struct {
Config interface{} `json:"config"`
}
// AlertParams describes the parameters used in an alert message
type AlertParams struct {
Message string `json:"message"`
}
// Result implement the base type for results returned by modules.
// All modules must return this type of result. The fields are:
//
@ -211,6 +217,18 @@ func MakeMessageConfig(cfgdata interface{}) (rawMsg []byte, err error) {
return
}
// MakeMessageAlert creates a new message of class alert
func MakeMessageAlert(f string, args ...interface{}) (rawMsg []byte, err error) {
param := AlertParams{Message: fmt.Sprintf(f, args...)}
msg := Message{Class: MsgClassAlert, Parameters: param}
rawMsg, err = json.Marshal(&msg)
if err != nil {
err = fmt.Errorf("Failed to make module alert message: %v", err)
return
}
return
}
// Keep reading until we get a full line or an error, and return
func readInputLine(rdr *bufio.Reader) ([]byte, error) {
var ret []byte
@ -343,11 +361,23 @@ func WatchForStop(r ModuleReader, stopChan *chan bool) error {
}
}
// dispatchFunc can be set by a module using RegisterDispatchFunction, and is called
// inside DefaultPersistHandlers if the module recieves an alert message from the master
// agent process.
var dispatchFunc func(string)
// RegisterDispatchFunction can be called by a module to register a function that will be
// used to process incoming alert messages from the master agent process. It is primarily
// used by the dispatch module.
func RegisterDispatchFunction(f func(string)) {
dispatchFunc = f
}
// A general management function that can be called by persistent modules from the
// RunPersist function. Looks after replying to ping messages, writing logs, and other
// communication between the agent and the running persistent module.
func DefaultPersistHandlers(in ModuleReader, out ModuleWriter, logch chan string,
errch chan error, regch chan string, confch chan []byte) {
errch chan error, regch chan string, alertch chan string, confch chan []byte) {
inChan := make(chan Message, 0)
go func() {
for {
@ -378,6 +408,17 @@ func DefaultPersistHandlers(in ModuleReader, out ModuleWriter, logch chan string
WriteOutput(logmsg, out)
}
os.Exit(1)
case s := <-alertch:
almsg, err := MakeMessageAlert("%v", s)
if err != nil {
failed = true
break
}
err = WriteOutput(almsg, out)
if err != nil {
failed = true
break
}
case s := <-logch:
logmsg, err := MakeMessageLog("%v", s)
if err != nil {
@ -435,6 +476,21 @@ func DefaultPersistHandlers(in ModuleReader, out ModuleWriter, logch chan string
break
}
confch <- buf
case "alert":
if dispatchFunc != nil {
var alparam AlertParams
buf, err := json.Marshal(msg.Parameters)
if err != nil {
failed = true
break
}
err = json.Unmarshal(buf, &alparam)
if err != nil {
failed = true
break
}
dispatchFunc(alparam.Message)
}
}
}