[minor] improve error handling in module stream io

This commit is contained in:
Aaron Meihm 2016-10-14 13:15:00 -05:00
Родитель f758ee2594
Коммит f6117e782e
2 изменённых файлов: 54 добавлений и 17 удалений

Просмотреть файл

@ -43,6 +43,7 @@ func buildResults(e elements, r *modules.Result) (buf []byte, err error) {
}
var logChan chan string
var handlerErrChan chan error
func runSomeTasks() {
for {
@ -84,17 +85,23 @@ func requestHandler(p interface{}) (ret string) {
func (r *run) RunPersist(in io.ReadCloser, out io.WriteCloser) {
// Create a string channel, used to send log messages up to the agent
// from the module tasks
// from the module tasks. Functions in the persistent module can
// log messages through the agent by writing to this channel.
logChan = make(chan string, 64)
// Start up an example background task
// Create an error channel we will pass to the handlers. Writing an
// error to this channel will cause DefaultPersistHandlers() to return
// and the module to exit.
handlerErrChan = make(chan error, 64)
// Start up an example background task we want our module to run
// continuously.
go runSomeTasks()
_ = os.Remove(modules.PersistSockPath("examplepersist"))
l, err := net.Listen("unix", modules.PersistSockPath("examplepersist"))
if err != nil {
panic(err)
}
go modules.HandlePersistRequest(l, requestHandler)
modules.DefaultPersistHandlers(in, out, logChan)
go modules.HandlePersistRequest(l, requestHandler, handlerErrChan)
modules.DefaultPersistHandlers(in, out, logChan, handlerErrChan)
}
func (r *run) Run(in io.Reader) (resStr string) {

Просмотреть файл

@ -20,7 +20,9 @@ import (
"io"
"io/ioutil"
"net"
"os"
"path"
"time"
)
var ModuleRunDir string
@ -124,16 +126,12 @@ func MakeMessage(class MessageClass, params interface{}, comp bool) (rawMsg []by
}
func MakeMessageLog(f string, args ...interface{}) (rawMsg []byte, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("Failed to make module log message: %v", e)
}
}()
param := LogParams{Message: fmt.Sprintf(f, args...)}
msg := Message{Class: MsgClassLog, Parameters: param}
rawMsg, err = json.Marshal(&msg)
if err != nil {
panic(err)
err = fmt.Errorf("Failed to make module log message: %v", err)
return
}
return
}
@ -236,7 +234,8 @@ func WatchForStop(r io.Reader, stopChan *chan bool) error {
}
}
func DefaultPersistHandlers(in io.ReadCloser, out io.WriteCloser, logch chan string) {
func DefaultPersistHandlers(in io.ReadCloser, out io.WriteCloser, logch chan string,
errch chan error) {
inChan := make(chan Message, 0)
go func() {
for {
@ -252,6 +251,21 @@ func DefaultPersistHandlers(in io.ReadCloser, out io.WriteCloser, logch chan str
failed := false
select {
case em := <-errch:
failed = true
// An error occurred somewhere in the persistent module and
// we want to exit. Try to write the log message, and also
// schedule a hard exit to ensure we do not run into a blocking
// scenario during the write.
go func() {
time.Sleep(time.Second * 5)
os.Exit(1)
}()
logmsg, err := MakeMessageLog("%v", em)
if err == nil {
WriteOutput(logmsg, out)
}
os.Exit(1)
case s := <-logch:
logmsg, err := MakeMessageLog("%v", s)
if err != nil {
@ -289,29 +303,44 @@ func DefaultPersistHandlers(in io.ReadCloser, out io.WriteCloser, logch chan str
}
}
func HandlePersistRequest(l net.Listener, f func(interface{}) string) {
func HandlePersistRequest(l net.Listener, f func(interface{}) string, errch chan error) {
for {
conn, err := l.Accept()
if err != nil {
panic(err)
errch <- err
return
}
go func() {
var p interface{}
err = ReadInputParameters(conn, &p)
if err != nil {
panic(err)
errch <- err
return
}
resp := f(p)
WriteOutput([]byte(resp), conn)
err = conn.Close()
if err != nil {
panic(err)
errch <- err
return
}
}()
}
}
func SendPersistRequest(p interface{}, modname string) string {
func SendPersistRequest(p interface{}, modname string) (res string) {
defer func() {
// If something goes wrong here we will want to format and
// return the result an a Result type, with the error
// message set.
if e := recover(); e != nil {
var r Result
r.Errors = append(r.Errors, fmt.Sprintf("%v", e))
r.Success = false
resbuf, _ := json.Marshal(&r)
res = string(resbuf)
}
}()
conn, err := net.Dial("unix", PersistSockPath(modname))
if err != nil {
panic(err)
@ -328,7 +357,8 @@ func SendPersistRequest(p interface{}, modname string) string {
if err != nil {
panic(err)
}
return string(rb)
res = string(rb)
return
}
func PersistSockPath(modname string) string {