dispatch: add start of sns output mode

This commit is contained in:
Aaron Meihm 2017-09-20 16:00:48 -05:00
Родитель f26172fea5
Коммит b4949aa1fd
2 изменённых файлов: 70 добавлений и 6 удалений

Просмотреть файл

@ -151,6 +151,13 @@ func runDispatch(cfg config) error {
dr DispatchRecord
)
if cfg.Dispatch.SNSTopic != "" {
err := initSNS(cfg)
if err != nil {
return err
}
}
for {
msg := <-messageBuf
dr.fromString(msg)
@ -159,13 +166,22 @@ func runDispatch(cfg config) error {
logChan <- fmt.Sprintf("create dispatch record: %v", err)
continue
}
b := bytes.NewBuffer(buf)
resp, err := httpClient.Post(cfg.Dispatch.HTTPURL, "application/json", b)
if err != nil {
logChan <- fmt.Sprintf("http post: %v", err)
continue
if cfg.Dispatch.SNSTopic != "" {
err := dispatchSNS(buf)
if err != nil {
logChan <- fmt.Sprintf("sns dispatch: %v", err)
continue
}
} else {
// Default to HTTP POST
b := bytes.NewBuffer(buf)
resp, err := httpClient.Post(cfg.Dispatch.HTTPURL, "application/json", b)
if err != nil {
logChan <- fmt.Sprintf("http post: %v", err)
continue
}
resp.Body.Close()
}
resp.Body.Close()
}
return nil
}
@ -192,6 +208,8 @@ func requestHandler(p interface{}) (ret string) {
type config struct {
Dispatch struct {
HTTPURL string `json:"httpurl"`
SNSTopic string `json:"snstopic"`
Region string `json:"region"`
ChannelSize int `json:"channelsize"`
} `json:"dispatch"`
}

46
modules/dispatch/sns.go Normal file
Просмотреть файл

@ -0,0 +1,46 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Contributor: Aaron Meihm ameihm@mozilla.com [:alm]
package dispatch /* import "mig.ninja/mig/modules/dispatch" */
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)
var snsARN string
var snsService *sns.SNS
var awsSession *session.Session
func initSNS(cfg config) error {
awsSession = session.Must(session.NewSession())
meta := ec2metadata.New(awsSession)
instancedoc, err := meta.GetInstanceIdentityDocument()
if err != nil {
return err
}
// Build an ARN we will use to publish
snsARN = "arn:aws:sns:" + instancedoc.Region + ":" +
instancedoc.AccountID + ":" + cfg.Dispatch.SNSTopic
snsService = sns.New(awsSession, &aws.Config{
Region: &instancedoc.Region,
})
return nil
}
func dispatchSNS(msg []byte) error {
params := &sns.PublishInput{
Message: aws.String(string(msg)),
TopicArn: aws.String(snsARN),
}
_, err := snsService.Publish(params)
return err
}