From 3fb3e9745d276053406768a2ae3ed7aee41c9945 Mon Sep 17 00:00:00 2001 From: Aaron Meihm Date: Tue, 28 Feb 2017 11:12:30 -0600 Subject: [PATCH] add audit and dispatch modules for Linux This adds two new modules, audit and dispatch with additional modifications to the agent. The audit module can be used to read and parse the kernel audit trail on Linux systems. The agent can then log the audit messages, or write them to the dispatch module. The dispatch module is a general module used for alert forwarding from the agent. If the dispatch module is running, any alerts the agent receives from persistent modules will be forwarded to the active dispatch module, where the dispatch module can write the message to a remote endpoint. --- mig-agent/agent.go | 3 + mig-agent/alert.go | 45 +++++++ mig-agent/context.go | 2 + mig-agent/persist.go | 85 ++++++++++-- modules/audit/audit.go | 188 ++++++++++++++++++++++++++ modules/audit/audit_test.go | 16 +++ modules/audit/doc.rst | 46 +++++++ modules/audit/osaudit_darwin.go | 19 +++ modules/audit/osaudit_linux.go | 94 +++++++++++++ modules/audit/osaudit_windows.go | 19 +++ modules/audit/paramscreator.go | 22 +++ modules/dispatch/dispatch.go | 217 ++++++++++++++++++++++++++++++ modules/dispatch/dispatch_test.go | 16 +++ modules/dispatch/doc.rst | 39 ++++++ modules/dispatch/paramscreator.go | 23 ++++ modules/modules.go | 58 +++++++- 16 files changed, 882 insertions(+), 10 deletions(-) create mode 100644 mig-agent/alert.go create mode 100644 modules/audit/audit.go create mode 100644 modules/audit/audit_test.go create mode 100644 modules/audit/doc.rst create mode 100644 modules/audit/osaudit_darwin.go create mode 100644 modules/audit/osaudit_linux.go create mode 100644 modules/audit/osaudit_windows.go create mode 100644 modules/audit/paramscreator.go create mode 100644 modules/dispatch/dispatch.go create mode 100644 modules/dispatch/dispatch_test.go create mode 100644 modules/dispatch/doc.rst create mode 100644 modules/dispatch/paramscreator.go diff --git a/mig-agent/agent.go b/mig-agent/agent.go index 36cc7bbc..40ac59a0 100644 --- a/mig-agent/agent.go +++ b/mig-agent/agent.go @@ -508,6 +508,9 @@ func startRoutines(ctx *Context) (err error) { // Start up the HTTP stats socket go initSocket(ctx) + // GoRoutine that processes incoming alert messages on the alert channel + go alertProcessor(ctx) + // GoRoutine that parses and validates incoming commands go func() { for msg := range ctx.Channels.NewCommand { diff --git a/mig-agent/alert.go b/mig-agent/alert.go new file mode 100644 index 00000000..2cf8318e --- /dev/null +++ b/mig-agent/alert.go @@ -0,0 +1,45 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: +// - Aaron Meihm ameihm@mozilla.com [:alm] +package main + +import ( + "fmt" + + "mig.ninja/mig" +) + +// alertHandler is called inside the agent when it recieves an alert message from a +// persistent module. +func alertHandler(ctx *Context, alert string) { + ctx.Channels.Alert <- alert +} + +// alertProcessor processes incoming alert messages from the alert channel and handles +// them. This results in either the alert being written to the dispatch module (if the +// dispatch module is active) or the alert being written to the agent log. +func alertProcessor(ctx *Context) { + ctx.Channels.Log <- mig.Log{Desc: "starting alert processor"}.Info() + + var alert string + for { + alert = <-ctx.Channels.Alert + + // If the dispatch module is active, write the message to the dispatch + // channel + dispatchChanLock.Lock() + if dispatchChan != nil { + dispatchChan <- alert + dispatchChanLock.Unlock() + continue + } + dispatchChanLock.Unlock() + + // Lastly, if no dispatch module is available, just write the alert to the + // agent log. + ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("alert: %v", alert)}.Info() + } +} diff --git a/mig-agent/context.go b/mig-agent/context.go index 6ef16ec2..9f67e49e 100644 --- a/mig-agent/context.go +++ b/mig-agent/context.go @@ -65,6 +65,7 @@ type Context struct { NewCommand chan []byte RunAgentCommand, RunExternalCommand chan moduleOp Results chan mig.Command + Alert chan string } MQ struct { // configuration @@ -290,6 +291,7 @@ func initChannels(orig_ctx Context) (ctx Context, err error) { ctx.Channels.RunAgentCommand = make(chan moduleOp, 5) ctx.Channels.RunExternalCommand = make(chan moduleOp, 5) ctx.Channels.Results = make(chan mig.Command, 5) + ctx.Channels.Alert = make(chan string, 128) ctx.Channels.Log = make(chan mig.Log, 97) ctx.Channels.Log <- mig.Log{Desc: "leaving initChannels()"}.Debug() return diff --git a/mig-agent/persist.go b/mig-agent/persist.go index 33f4ef0d..edcd97a2 100644 --- a/mig-agent/persist.go +++ b/mig-agent/persist.go @@ -22,6 +22,17 @@ import ( "gopkg.in/gcfg.v1" ) +// dispatchChan is a special channel used to route messages to the dispatch module. +// +// If non-nil, the alert processor will write messages to this channel, which will then +// be sent to the dispatch module. The dispatch channel will only be non-nil if the +// dispatch module is active. +// +// The channel is protected by a mutex, to ensure deallocation of the channel if the +// dispatch module fails is exclusive and writes cannot occur while this happens. +var dispatchChan chan string +var dispatchChanLock sync.Mutex + // persistModuleRegister maintains a map of the running persistent modules, and // any socket specification registered for that module. // @@ -113,15 +124,16 @@ func startPersistModule(ctx *Context, name string) (err error) { // the agent is running, this function will execute in a go-routine. func managePersistModule(ctx *Context, name string) { var ( - cmd *exec.Cmd - isRunning bool - pipeout modules.ModuleWriter - pipein modules.ModuleReader - err error - failDelay bool - killModule bool - inChan chan modules.Message - lastPing time.Time + cmd *exec.Cmd + isRunning bool + pipeout modules.ModuleWriter + pipein modules.ModuleReader + err error + failDelay bool + killModule bool + inChan chan modules.Message + lastPing time.Time + localDispatch chan string ) logfunc := func(f string, a ...interface{}) { @@ -130,6 +142,18 @@ func managePersistModule(ctx *Context, name string) { ctx.Channels.Log <- mig.Log{Desc: buf}.Info() } + // dispatchDealloc is used to deallocate the dispatch channel, if this module + // isn't the dispatch module, localDispatch will always be nil + dispatchDealloc := func() { + if localDispatch != nil { + dispatchChanLock.Lock() + close(dispatchChan) + dispatchChan = nil + localDispatch = nil + dispatchChanLock.Unlock() + } + } + pingtick := time.Tick(time.Second * 10) for { @@ -201,11 +225,24 @@ func managePersistModule(ctx *Context, name string) { continue } } + + // If we are the dispatch module, initialize the dispatch channel + if name == "dispatch" { + if dispatchChan == nil { + logfunc("initializing dispatch channel") + dispatchChanLock.Lock() + dispatchChan = make(chan string, 128) + localDispatch = dispatchChan + dispatchChanLock.Unlock() + } + } + select { case msg, ok := <-inChan: if !ok { err = cmd.Wait() logfunc("module is down, %v", err) + dispatchDealloc() isRunning = false persistModRegister.remove(name) failDelay = true @@ -227,6 +264,19 @@ func managePersistModule(ctx *Context, name string) { break } logfunc("(module log) %v", lp.Message) + case modules.MsgClassAlert: + var ap modules.AlertParams + buf, err := json.Marshal(msg.Parameters) + if err != nil { + logfunc("%v", err) + break + } + err = json.Unmarshal(buf, &ap) + if err != nil { + logfunc("%v", err) + break + } + alertHandler(ctx, ap.Message) case modules.MsgClassRegister: var rp modules.RegParams buf, err := json.Marshal(msg.Parameters) @@ -246,6 +296,21 @@ func managePersistModule(ctx *Context, name string) { killModule = true break } + case alertmsg := <-localDispatch: + am, err := modules.MakeMessageAlert(alertmsg) + if err != nil { + logfunc("failed to create alert, %v", err) + break + } + err = modules.WriteOutput(am, pipeout) + if err != nil { + logfunc("dispatch alert failed, %v", err) + dispatchDealloc() + isRunning = false + persistModRegister.remove(name) + failDelay = true + break + } case _ = <-pingtick: // If we haven't received a reply in the past 3 cycles we will // kill the module @@ -265,6 +330,7 @@ func managePersistModule(ctx *Context, name string) { err = modules.WriteOutput(pm, pipeout) if err != nil { logfunc("ping failed, %v", err) + dispatchDealloc() isRunning = false persistModRegister.remove(name) failDelay = true @@ -282,6 +348,7 @@ func managePersistModule(ctx *Context, name string) { return } _ = cmd.Wait() + dispatchDealloc() isRunning = false persistModRegister.remove(name) failDelay = true diff --git a/modules/audit/audit.go b/modules/audit/audit.go new file mode 100644 index 00000000..1bd3d54c --- /dev/null +++ b/modules/audit/audit.go @@ -0,0 +1,188 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +// Package audit implements a persistent module which can read and parse the operating +// system audit trail. The module turns each audit event read from the kernel into a +// module alert, and writes it to the master agent process where the agent process can either +// write the audit event to it's log or send it to the dispatch module. +// +// The audit module is currently only supported on Linux. +package audit /* import "mig.ninja/mig/modules/audit" */ + +import ( + "encoding/json" + "fmt" + "runtime" + + "mig.ninja/mig/modules" +) + +type module struct { +} + +// NewRun returns a new instance of a modules.Runner for this module. +func (m *module) NewRun() modules.Runner { + return new(run) +} + +func init() { + modules.Register("audit", new(module)) +} + +type run struct { + Parameters Parameters + Results modules.Result +} + +func buildResults(e elements, r *modules.Result) (buf []byte, err error) { + r.Success = true + r.Elements = e + r.FoundAnything = true + buf, err = json.Marshal(r) + return +} + +var logChan chan string +var alertChan chan string +var handlerErrChan chan error +var configChan chan []byte + +func moduleMain() { + var cfg config + + incfg := <-configChan + err := json.Unmarshal(incfg, &cfg) + if err != nil { + handlerErrChan <- err + return + } + logChan <- "module received configuration" + + err = initializeAudit(cfg) + if err != nil { + handlerErrChan <- err + return + } + err = runAudit() + if err != nil { + handlerErrChan <- err + return + } +} + +func requestHandler(p interface{}) (ret string) { + var results modules.Result + defer func() { + if e := recover(); e != nil { + results.Errors = append(results.Errors, fmt.Sprintf("%v", e)) + results.Success = false + err, _ := json.Marshal(results) + ret = string(err) + return + } + }() + e := elements{Ok: true} + resp, err := buildResults(e, &results) + if err != nil { + panic(err) + } + return string(resp) +} + +type config struct { + Audit struct { + RulesPath string `json:"rulespath"` + RateLimit int `json:"ratelimit"` + BacklogLimit int `json:"backloglimit"` + } `json:"audit"` +} + +// PersistModConfig returns a new configuration structure for this module. +func (r *run) PersistModConfig() interface{} { + return &config{} +} + +// RunPersist is the entry point for persistent execution of this module. +func (r *run) RunPersist(in modules.ModuleReader, out modules.ModuleWriter) { + alertChan = make(chan string, 64) + logChan = make(chan string, 64) + regChan := make(chan string, 64) + handlerErrChan = make(chan error, 64) + configChan = make(chan []byte, 1) + + go moduleMain() + l, spec, err := modules.GetPersistListener("audit") + if err != nil { + handlerErrChan <- err + } else { + regChan <- spec + } + go modules.HandlePersistRequest(l, requestHandler, handlerErrChan) + modules.DefaultPersistHandlers(in, out, logChan, handlerErrChan, regChan, + alertChan, configChan) +} + +// Run is the entry point for standard (e.g., query) based invocation of this module. +func (r *run) Run(in modules.ModuleReader) (resStr string) { + defer func() { + if e := recover(); e != nil { + // return error in json + r.Results.Errors = append(r.Results.Errors, fmt.Sprintf("%v", e)) + r.Results.Success = false + err, _ := json.Marshal(r.Results) + resStr = string(err) + return + } + }() + runtime.GOMAXPROCS(1) + sockspec, err := modules.ReadPersistInputParameters(in, &r.Parameters) + if err != nil { + panic(err) + } + err = r.ValidateParameters() + if err != nil { + panic(err) + } + resStr = modules.SendPersistRequest(r.Parameters, sockspec) + return +} + +// ValidateParameters validates the parameters set in the runner for the module. +func (r *run) ValidateParameters() (err error) { + return +} + +// PrintResults returns the results of a query of this module in human readable form. +func (r *run) PrintResults(result modules.Result, foundOnly bool) (prints []string, err error) { + var ( + elem elements + ) + + err = result.GetElements(&elem) + if err != nil { + panic(err) + } + resStr := fmt.Sprintf("ok:%v", elem.Ok) + prints = append(prints, resStr) + if !foundOnly { + for _, we := range result.Errors { + prints = append(prints, we) + } + } + return +} + +type elements struct { + Ok bool `json:"ok"` +} + +// Parameters defines any query parameters used in this module. +type Parameters struct { +} + +func newParameters() *Parameters { + return &Parameters{} +} diff --git a/modules/audit/audit_test.go b/modules/audit/audit_test.go new file mode 100644 index 00000000..1d718cf4 --- /dev/null +++ b/modules/audit/audit_test.go @@ -0,0 +1,16 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +package audit /* import "mig.ninja/mig/modules/audit" */ + +import ( + "mig.ninja/mig/testutil" + "testing" +) + +func TestRegistration(t *testing.T) { + testutil.CheckModuleRegistration(t, "audit") +} diff --git a/modules/audit/doc.rst b/modules/audit/doc.rst new file mode 100644 index 00000000..2f766e95 --- /dev/null +++ b/modules/audit/doc.rst @@ -0,0 +1,46 @@ +==================================== +Mozilla InvestiGator: audit module +==================================== +:Author: Aaron Meihm + +.. sectnum:: +.. contents:: Table of Contents + +The audit module can be used to read kernel audit messages (e.g., Linux audit) and either +write them to the agent's log file, or more typically dispatch the audit trail off the system +using the dispatch module. + +Currently only Linux is supported by the audit module. + +Auditing support for Linux is implemented using `libaudit-go `_. + +Usage +----- + +The audit module is a persistent module. When the agent is configured with the audit module, it +will spawn the audit module as a subprocess and initialize auditing on the platform. The audit +module will enable audit, load the audit rules indicated in the configuration into the kernel, and +begin collecting/parsing audit data from the kernel. + +If the dispatch module is also loaded with the agent, any audit messages will be sent to the +dispatch module where they can be transmitted to an event collection system. Otherwise, the agent +will simply log the JSON formatted audit messages in the agent log. + +Configuration +------------- + +The audit module is configured using ``audit.cfg`` in the agent configuration directory, ``/etc/mig``. + +.. code:: + + [audit] + rulespath = /etc/mig/audit.rules.json + ratelimit = 500 + backloglimit = 16384 + +``rulespath`` indicates the path to load audit rules into the kernel from. Note that this is not a +standard audit configuration, but a JSON based rule set as is used in +`libaudit-go `_. + +``ratelimit`` and ``backloglimit`` can be used to configure the Linux auditing rate and back log +limits. If respective defaults of 500 and 16384 will be used. diff --git a/modules/audit/osaudit_darwin.go b/modules/audit/osaudit_darwin.go new file mode 100644 index 00000000..87129fa0 --- /dev/null +++ b/modules/audit/osaudit_darwin.go @@ -0,0 +1,19 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +package audit /* import "mig.ninja/mig/modules/audit" */ + +import ( + "fmt" +) + +func initializeAudit(cfg config) error { + return fmt.Errorf("audit module not supported on darwin") +} + +func runAudit() error { + return fmt.Errorf("audit module not supported on darwin") +} diff --git a/modules/audit/osaudit_linux.go b/modules/audit/osaudit_linux.go new file mode 100644 index 00000000..dd06781e --- /dev/null +++ b/modules/audit/osaudit_linux.go @@ -0,0 +1,94 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +package audit /* import "mig.ninja/mig/modules/audit" */ + +import ( + "encoding/json" + "fmt" + + libaudit "github.com/mozilla/libaudit-go" + "io/ioutil" + "syscall" +) + +var auditsock *libaudit.NetlinkConnection + +// initializeAudit initializes the auditing subsystem . It will load rules +// from the rule path in the configuration, and enable auditing with the specified +// parameters. +func initializeAudit(cfg config) error { + var err error + auditsock, err = libaudit.NewNetlinkConnection() + if err != nil { + return fmt.Errorf("NewNetlinkConnection: %v", err) + } + err = libaudit.AuditSetEnabled(auditsock, true) + if err != nil { + return fmt.Errorf("AuditSetEnabled: %v", err) + } + status, err := libaudit.AuditIsEnabled(auditsock) + if err != nil { + return fmt.Errorf("AuditIsEnabled: %v", err) + } + if !status { + return fmt.Errorf("was not possible to enable audit") + } + logChan <- "auditing enabled" + // Configure audit as required + if cfg.Audit.RateLimit == 0 { // Set a default rate limit if needed + cfg.Audit.RateLimit = 500 + } + err = libaudit.AuditSetRateLimit(auditsock, cfg.Audit.RateLimit) + if err != nil { + return fmt.Errorf("AuditSetRateLimit: %v", err) + } + if cfg.Audit.BacklogLimit == 0 { // Set default backlog if needed + cfg.Audit.BacklogLimit = 16384 + } + err = libaudit.AuditSetBacklogLimit(auditsock, cfg.Audit.BacklogLimit) + if err != nil { + return fmt.Errorf("AuditSetBacklogLimit: %v", err) + } + err = libaudit.AuditSetPID(auditsock, syscall.Getpid()) + if err != nil { + return fmt.Errorf("AuditSetPID: %v", err) + } + err = libaudit.DeleteAllRules(auditsock) + if err != nil { + return fmt.Errorf("DeleteAllRules: %v", err) + } + rulebuf, err := ioutil.ReadFile(cfg.Audit.RulesPath) + if err != nil { + return err + } + warnings, err := libaudit.SetRules(auditsock, rulebuf) + if err != nil { + return fmt.Errorf("SetRules: %v", err) + } + for _, x := range warnings { + logChan <- fmt.Sprintf("ruleset warning: %v", x) + } + logChan <- "auditing configured" + return nil +} + +func runAudit() error { + doneChan := make(chan bool, 0) + logChan <- "listening for audit events" + libaudit.GetAuditMessages(auditsock, callback, &doneChan) + return nil +} + +func callback(msg *libaudit.AuditEvent, callerr error) { + // In our callback, we want to simply marshal the audit event and write it to the + // modules alert channel + buf, err := json.Marshal(msg) + if err != nil { + return + } + alertChan <- string(buf) +} diff --git a/modules/audit/osaudit_windows.go b/modules/audit/osaudit_windows.go new file mode 100644 index 00000000..356f57d1 --- /dev/null +++ b/modules/audit/osaudit_windows.go @@ -0,0 +1,19 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +package audit /* import "mig.ninja/mig/modules/audit" */ + +import ( + "fmt" +) + +func initializeAudit(cfg config) error { + return fmt.Errorf("audit module not supported on windows") +} + +func runAudit() error { + return fmt.Errorf("audit module not supported on windows") +} diff --git a/modules/audit/paramscreator.go b/modules/audit/paramscreator.go new file mode 100644 index 00000000..013f97de --- /dev/null +++ b/modules/audit/paramscreator.go @@ -0,0 +1,22 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +package audit /* import "mig.ninja/mig/modules/audit" */ + +import ( + "fmt" +) + +func printHelp(isCmd bool) { + fmt.Printf(`Query parameters +---------------- +This module has no parameters. +`) +} + +func (r *run) ParamsParser(args []string) (interface{}, error) { + return r.Parameters, r.ValidateParameters() +} diff --git a/modules/dispatch/dispatch.go b/modules/dispatch/dispatch.go new file mode 100644 index 00000000..191eb7a1 --- /dev/null +++ b/modules/dispatch/dispatch.go @@ -0,0 +1,217 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +// Package dispatch implements alert dispatching for the agent as a module. +// Persistent modules which generate alerts will have these alerts forwarded +// to this module if the dispatch module is active. The dispatch module can then +// forward the alerts on based on it's configuration. +package dispatch /* import "mig.ninja/mig/modules/dispatch" */ + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "runtime" + + "mig.ninja/mig/modules" +) + +type module struct { +} + +// NewRun returns a new instance of a modules.Runner for this module. +func (m *module) NewRun() modules.Runner { + return new(run) +} + +func init() { + modules.Register("dispatch", new(module)) +} + +type run struct { + Parameters Parameters + Results modules.Result +} + +func buildResults(e elements, r *modules.Result) (buf []byte, err error) { + r.Success = true + r.Elements = e + r.FoundAnything = true + buf, err = json.Marshal(r) + return +} + +var logChan chan string +var alertChan chan string +var handlerErrChan chan error +var configChan chan []byte + +// messageBuf is a queue used to store incoming messages, and is drained by +// runDispatch +var messageBuf chan string + +func moduleMain() { + var cfg config + + incfg := <-configChan + err := json.Unmarshal(incfg, &cfg) + if err != nil { + handlerErrChan <- err + return + } + logChan <- "module received configuration" + + messageBuf = make(chan string, 1024) + + // Register the dispatch function, which will be called when the module + // recieves an alert message from the agent. + modules.RegisterDispatchFunction(dispatchIn) + + // Start the dispatch dequeue function. + err = runDispatch(cfg) + if err != nil { + handlerErrChan <- err + return + } +} + +func dispatchIn(msg string) { + select { + case messageBuf <- msg: + default: + // If we can't queue the message it is just dropped + } +} + +func runDispatch(cfg config) error { + var httpClient http.Client + + for { + msg := <-messageBuf + b := bytes.NewBufferString(msg) + // We make an assumption the alert content is always a JSON blob here. + resp, err := httpClient.Post(cfg.Dispatch.HTTPURL, "application/json", b) + if err != nil { + logChan <- fmt.Sprintf("http post: %v", err) + continue + } + resp.Body.Close() + } + return nil +} + +func requestHandler(p interface{}) (ret string) { + var results modules.Result + defer func() { + if e := recover(); e != nil { + results.Errors = append(results.Errors, fmt.Sprintf("%v", e)) + results.Success = false + err, _ := json.Marshal(results) + ret = string(err) + return + } + }() + e := elements{Ok: true} + resp, err := buildResults(e, &results) + if err != nil { + panic(err) + } + return string(resp) +} + +type config struct { + Dispatch struct { + HTTPURL string `json:"httpurl"` + } `json:"dispatch"` +} + +// PersistModConfig returns a new configuration structure for this module. +func (r *run) PersistModConfig() interface{} { + return &config{} +} + +// RunPersist is the entry point for persistent execution of the module. +func (r *run) RunPersist(in modules.ModuleReader, out modules.ModuleWriter) { + alertChan = make(chan string, 64) + logChan = make(chan string, 64) + regChan := make(chan string, 64) + handlerErrChan = make(chan error, 64) + configChan = make(chan []byte, 1) + + go moduleMain() + l, spec, err := modules.GetPersistListener("dispatch") + if err != nil { + handlerErrChan <- err + } else { + regChan <- spec + } + go modules.HandlePersistRequest(l, requestHandler, handlerErrChan) + modules.DefaultPersistHandlers(in, out, logChan, handlerErrChan, regChan, + alertChan, configChan) +} + +// Run is the entry point for a standard (e.g., query) based invocation of the module. +func (r *run) Run(in modules.ModuleReader) (resStr string) { + defer func() { + if e := recover(); e != nil { + // return error in json + r.Results.Errors = append(r.Results.Errors, fmt.Sprintf("%v", e)) + r.Results.Success = false + err, _ := json.Marshal(r.Results) + resStr = string(err) + return + } + }() + runtime.GOMAXPROCS(1) + sockspec, err := modules.ReadPersistInputParameters(in, &r.Parameters) + if err != nil { + panic(err) + } + err = r.ValidateParameters() + if err != nil { + panic(err) + } + resStr = modules.SendPersistRequest(r.Parameters, sockspec) + return +} + +// ValidateParameters validates the parameters set in the runner for the module. +func (r *run) ValidateParameters() (err error) { + return +} + +// PrintResults returns the results of a query of this module in human readable form. +func (r *run) PrintResults(result modules.Result, foundOnly bool) (prints []string, err error) { + var ( + elem elements + ) + + err = result.GetElements(&elem) + if err != nil { + panic(err) + } + resStr := fmt.Sprintf("ok:%v", elem.Ok) + prints = append(prints, resStr) + if !foundOnly { + for _, we := range result.Errors { + prints = append(prints, we) + } + } + return +} + +type elements struct { + Ok bool `json:"ok"` +} + +// Parameters defines any query parameters used in this module. +type Parameters struct { +} + +func newParameters() *Parameters { + return &Parameters{} +} diff --git a/modules/dispatch/dispatch_test.go b/modules/dispatch/dispatch_test.go new file mode 100644 index 00000000..7477dfc4 --- /dev/null +++ b/modules/dispatch/dispatch_test.go @@ -0,0 +1,16 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +package dispatch /* import "mig.ninja/mig/modules/dispatch" */ + +import ( + "mig.ninja/mig/testutil" + "testing" +) + +func TestRegistration(t *testing.T) { + testutil.CheckModuleRegistration(t, "dispatch") +} diff --git a/modules/dispatch/doc.rst b/modules/dispatch/doc.rst new file mode 100644 index 00000000..94442a24 --- /dev/null +++ b/modules/dispatch/doc.rst @@ -0,0 +1,39 @@ +===================================== +Mozilla InvestiGator: dispatch module +===================================== +:Author: Aaron Meihm + +.. sectnum:: +.. contents:: Table of Contents + +The dispatch module is used to dispatch alerts to a remote system (e.g., an event +collector). + +Usage +----- + +Persistent modules have the ability to generate alerts. These alerts are sent from +the persistent module to the master agent process. + +When the dispatch module is not running, the agent will simply write the alert in the +standard agent log. + +If the dispatch module is active, the agent will instead forward the alert to the dispatch +module, and the module can manage buffering the alert and forwarding it on to the event +collection system configured in the module configuration. + +Configuration +------------- + +The dispatch module is configured using ``dispatch.cfg`` in the agent configuration directory, +``/etc/mig`` or ``c:\mig``. + +.. code:: + + [dispatch] + httpurl = "https://api.to.post.to/event + +Currently the dispatch module only supports HTTP POST of alert messages from modules to a +configured HTTP endpoint. + +``httpurl`` can be used to configure a URL all alerts will be posted to. diff --git a/modules/dispatch/paramscreator.go b/modules/dispatch/paramscreator.go new file mode 100644 index 00000000..4cbc52e0 --- /dev/null +++ b/modules/dispatch/paramscreator.go @@ -0,0 +1,23 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Contributor: Aaron Meihm ameihm@mozilla.com [:alm] + +package dispatch /* import "mig.ninja/mig/modules/dispatch" */ + +import ( + "fmt" +) + +func printHelp(isCmd bool) { + fmt.Printf(`Query parameters +---------------- +This module has no parameters. +`) +} + +// ParamsParser parses any parameters used in queries for this module. +func (r *run) ParamsParser(args []string) (interface{}, error) { + return r.Parameters, r.ValidateParameters() +} diff --git a/modules/modules.go b/modules/modules.go index 4659fadc..6033414b 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -50,6 +50,7 @@ const ( MsgClassLog MessageClass = "log" MsgClassRegister MessageClass = "register" MsgClassConfig MessageClass = "config" + MsgClassAlert MessageClass = "alert" ) // Parameter format expected for a log message @@ -66,6 +67,11 @@ type ConfigParams struct { Config interface{} `json:"config"` } +// AlertParams describes the parameters used in an alert message +type AlertParams struct { + Message string `json:"message"` +} + // Result implement the base type for results returned by modules. // All modules must return this type of result. The fields are: // @@ -211,6 +217,18 @@ func MakeMessageConfig(cfgdata interface{}) (rawMsg []byte, err error) { return } +// MakeMessageAlert creates a new message of class alert +func MakeMessageAlert(f string, args ...interface{}) (rawMsg []byte, err error) { + param := AlertParams{Message: fmt.Sprintf(f, args...)} + msg := Message{Class: MsgClassAlert, Parameters: param} + rawMsg, err = json.Marshal(&msg) + if err != nil { + err = fmt.Errorf("Failed to make module alert message: %v", err) + return + } + return +} + // Keep reading until we get a full line or an error, and return func readInputLine(rdr *bufio.Reader) ([]byte, error) { var ret []byte @@ -343,11 +361,23 @@ func WatchForStop(r ModuleReader, stopChan *chan bool) error { } } +// dispatchFunc can be set by a module using RegisterDispatchFunction, and is called +// inside DefaultPersistHandlers if the module recieves an alert message from the master +// agent process. +var dispatchFunc func(string) + +// RegisterDispatchFunction can be called by a module to register a function that will be +// used to process incoming alert messages from the master agent process. It is primarily +// used by the dispatch module. +func RegisterDispatchFunction(f func(string)) { + dispatchFunc = f +} + // A general management function that can be called by persistent modules from the // RunPersist function. Looks after replying to ping messages, writing logs, and other // communication between the agent and the running persistent module. func DefaultPersistHandlers(in ModuleReader, out ModuleWriter, logch chan string, - errch chan error, regch chan string, confch chan []byte) { + errch chan error, regch chan string, alertch chan string, confch chan []byte) { inChan := make(chan Message, 0) go func() { for { @@ -378,6 +408,17 @@ func DefaultPersistHandlers(in ModuleReader, out ModuleWriter, logch chan string WriteOutput(logmsg, out) } os.Exit(1) + case s := <-alertch: + almsg, err := MakeMessageAlert("%v", s) + if err != nil { + failed = true + break + } + err = WriteOutput(almsg, out) + if err != nil { + failed = true + break + } case s := <-logch: logmsg, err := MakeMessageLog("%v", s) if err != nil { @@ -435,6 +476,21 @@ func DefaultPersistHandlers(in ModuleReader, out ModuleWriter, logch chan string break } confch <- buf + case "alert": + if dispatchFunc != nil { + var alparam AlertParams + buf, err := json.Marshal(msg.Parameters) + if err != nil { + failed = true + break + } + err = json.Unmarshal(buf, &alparam) + if err != nil { + failed = true + break + } + dispatchFunc(alparam.Message) + } } }