update: move notifiers to subdir

This commit is contained in:
jsvisa 2016-05-13 09:34:46 +08:00 коммит произвёл Delweng Zheng
Родитель 4715c73856
Коммит e6e7da076e
9 изменённых файлов: 519 добавлений и 466 удалений

Просмотреть файл

@ -1,139 +0,0 @@
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
import (
"bytes"
"fmt"
log "github.com/cihub/seelog"
"net/smtp"
"text/template"
)
type EmailNotifier struct {
template *template.Template
auth smtp.Auth
server string
port int
interval int64
threshold int
username string
password string
from string
to string
groups []string
groupResults map[string]Message
}
func (emailer *EmailNotifier) NotifierName() string {
return "email-notify"
}
func (emailer *EmailNotifier) Notify(msg Message) error {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
for _, group := range emailer.groups {
clusterGroup := fmt.Sprintf("%s,%s", result.Cluster, result.Group)
if clusterGroup == group {
emailer.groupResults[clusterGroup] = msg
}
}
if len(emailer.groups) == len(emailer.groupResults) {
return emailer.sendConsumerGroupStatusNotify()
}
}
return nil
}
func (emailer *EmailNotifier) Ignore(msg Message) bool {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
return int(result.Status) < emailer.threshold
}
return false
}
func NewEmailNotifier(app *ApplicationContext) ([]*EmailNotifier, error) {
log.Info("Start email notify")
template, err := template.ParseFiles(app.Config.Smtp.Template)
if err != nil {
log.Critical("Cannot parse email template: %v", err)
return nil, err
}
var auth smtp.Auth
switch app.Config.Smtp.AuthType {
case "plain":
auth = smtp.PlainAuth("", app.Config.Smtp.Username, app.Config.Smtp.Password, app.Config.Smtp.Server)
case "crammd5":
auth = smtp.CRAMMD5Auth(app.Config.Smtp.Username, app.Config.Smtp.Password)
}
emailers := []*EmailNotifier{}
for to, cfg := range app.Config.Emailnotifier {
if cfg.Enable {
emailer := &EmailNotifier{
threshold: cfg.Threshold,
template: template,
server: app.Config.Smtp.Server,
port: app.Config.Smtp.Port,
username: app.Config.Smtp.Username,
password: app.Config.Smtp.Password,
auth: auth,
interval: cfg.Interval,
from: app.Config.Smtp.From,
to: to,
groups: cfg.Groups,
groupResults: make(map[string]Message),
}
emailers = append(emailers, emailer)
}
}
return emailers, nil
}
func (emailer *EmailNotifier) sendConsumerGroupStatusNotify() error {
var bytesToSend bytes.Buffer
log.Debug("send email")
results := make([]*ConsumerGroupStatus, len(emailer.groups))
i := 0
for group, result := range emailer.groupResults {
results[i] = result.(*ConsumerGroupStatus)
delete(emailer.groupResults, group)
i++
}
err := emailer.template.Execute(&bytesToSend, struct {
From string
To string
Results []*ConsumerGroupStatus
}{
From: emailer.from,
To: emailer.to,
Results: results,
})
if err != nil {
log.Error("Failed to assemble email:", err)
return err
}
err = smtp.SendMail(fmt.Sprintf("%s:%v", emailer.server, emailer.port),
emailer.auth, emailer.from, []string{emailer.to}, bytesToSend.Bytes())
if err != nil {
log.Error("Failed to send email message:", err)
return err
}
return nil
}

Просмотреть файл

@ -1,238 +0,0 @@
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
import (
"bytes"
log "github.com/cihub/seelog"
"github.com/pborman/uuid"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"text/template"
"time"
)
type HttpNotifier struct {
url string
templatePost *template.Template
templateDelete *template.Template
threshold int
sendDelete bool
extras map[string]string
groupIds map[string]map[string]Event
httpClient *http.Client
}
type Event struct {
Id string
Start time.Time
}
func (notify *HttpNotifier) NotifierName() string {
return "http-notify"
}
func (notifier *HttpNotifier) Notify(msg Message) error {
switch msg.(type) {
case *ConsumerGroupStatus:
result := msg.(*ConsumerGroupStatus)
return notifier.sendConsumerGroupStatusNotify(result)
default:
return nil
}
return nil
}
func (notifier *HttpNotifier) Ignore(msg Message) bool {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
return int(result.Status) < notifier.threshold
}
return true
}
func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
httpConfig := app.Config.Httpnotifier
// Helper functions for templates
fmap := template.FuncMap{
"jsonencoder": templateJsonEncoder,
"topicsbystatus": classifyTopicsByStatus,
"partitioncounts": templateCountPartitions,
"add": templateAdd,
"minus": templateMinus,
"multiply": templateMultiply,
"divide": templateDivide,
"maxlag": maxLagHelper,
}
// Compile the templates
templatePost, err := template.New("post").Funcs(fmap).ParseFiles(httpConfig.TemplatePost)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier POST template: %v", err)
os.Exit(1)
}
templatePost = templatePost.Templates()[0]
templateDelete, err := template.New("delete").Funcs(fmap).ParseFiles(httpConfig.TemplateDelete)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier DELETE template: %v", err)
os.Exit(1)
}
templateDelete = templateDelete.Templates()[0]
// Parse the extra parameters for the templates
extras := make(map[string]string)
for _, extra := range httpConfig.Extras {
parts := strings.Split(extra, "=")
extras[parts[0]] = parts[1]
}
return &HttpNotifier{
url: httpConfig.Url,
threshold: httpConfig.PostThreshold,
sendDelete: httpConfig.SendDelete,
templatePost: templatePost,
templateDelete: templateDelete,
extras: extras,
groupIds: make(map[string]map[string]Event),
httpClient: &http.Client{
Timeout: time.Duration(httpConfig.Timeout) * time.Second,
Transport: &http.Transport{
Dial: (&net.Dialer{
KeepAlive: time.Duration(httpConfig.Keepalive) * time.Second,
}).Dial,
Proxy: http.ProxyFromEnvironment,
},
},
}, nil
}
func (notifier *HttpNotifier) sendConsumerGroupStatusNotify(result *ConsumerGroupStatus) error {
// We only use IDs if we are sending deletes
idStr := ""
startTime := time.Now()
if notifier.sendDelete {
if _, ok := notifier.groupIds[result.Cluster]; !ok {
// Create the cluster map
notifier.groupIds[result.Cluster] = make(map[string]Event)
}
if _, ok := notifier.groupIds[result.Cluster][result.Group]; !ok {
// Create Event and Id
eventId := uuid.NewRandom()
idStr = eventId.String()
notifier.groupIds[result.Cluster][result.Group] = Event{
Id: idStr,
Start: startTime,
}
} else {
idStr = notifier.groupIds[result.Cluster][result.Group].Id
startTime = notifier.groupIds[result.Cluster][result.Group].Start
}
}
// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
bytesToSend := new(bytes.Buffer)
err := notifier.templatePost.Execute(bytesToSend, struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
Result *ConsumerGroupStatus
JsonEncode func(interface{}) string
}{
Cluster: result.Cluster,
Group: result.Group,
Id: idStr,
Start: startTime,
Extras: notifier.extras,
Result: result,
JsonEncode: templateJsonEncoder,
})
if err != nil {
log.Errorf("Failed to assemble POST: %v", err)
return err
}
// Send POST to HTTP endpoint
req, err := http.NewRequest("POST", notifier.url, bytesToSend)
req.Header.Set("Content-Type", "application/json")
resp, err := notifier.httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %v", result.Group, result.Cluster, result.Status, idStr, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent POST for group %s in cluster %s at severity %v (Id %s)", result.Group, result.Cluster, result.Status, idStr)
} else {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %s", result.Group,
result.Cluster, result.Status, idStr, resp.Status)
}
if notifier.sendDelete && (result.Status == StatusOK) {
if _, ok := notifier.groupIds[result.Cluster][result.Group]; ok {
// Send DELETE to HTTP endpoint
bytesToSend := new(bytes.Buffer)
err := notifier.templateDelete.Execute(bytesToSend, struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
}{
Cluster: result.Cluster,
Group: result.Group,
Id: notifier.groupIds[result.Cluster][result.Group].Id,
Start: notifier.groupIds[result.Cluster][result.Group].Start,
Extras: notifier.extras,
})
if err != nil {
log.Errorf("Failed to assemble DELETE for group %s in cluster %s (Id %s): %v", result.Group,
result.Cluster, notifier.groupIds[result.Cluster][result.Group].Id, err)
return err
}
req, err := http.NewRequest("DELETE", notifier.url, bytesToSend)
req.Header.Set("Content-Type", "application/json")
resp, err := notifier.httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %v", result.Group,
result.Cluster, notifier.groupIds[result.Cluster][result.Group].Id, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent DELETE for group %s in cluster %s (Id %s)", result.Group, result.Cluster,
notifier.groupIds[result.Cluster][result.Group].Id)
} else {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %s", result.Group,
result.Cluster, notifier.groupIds[result.Cluster][result.Group].Id, resp.Status)
}
// Remove ID for group that is now clear
delete(notifier.groupIds[result.Cluster], result.Group)
}
}
return nil
}

Просмотреть файл

@ -1,9 +0,0 @@
package main
type Message interface{}
type Notifier interface {
Notify(msg Message) error
NotifierName() string
Ignore(msg Message) bool
}

114
notifier/email_notifier.go Normal file
Просмотреть файл

@ -0,0 +1,114 @@
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package notifier
import (
"bytes"
"fmt"
log "github.com/cihub/seelog"
"net/smtp"
"text/template"
)
type EmailNotifier struct {
TemplateFile string
Server string
Port int
Interval int64
Threshold int
Username string
Password string
AuthType string
From string
To string
Groups []string
auth smtp.Auth
template *template.Template
groupMsgs map[string]Message
}
func (emailer *EmailNotifier) NotifierName() string {
return "email-notify"
}
func (emailer *EmailNotifier) Notify(msg Message) error {
if emailer.auth == nil {
switch emailer.AuthType {
case "plain":
emailer.auth = smtp.PlainAuth("", emailer.Username, emailer.Password, emailer.Server)
case "crammd5":
emailer.auth = smtp.CRAMMD5Auth(emailer.Username, emailer.Password)
}
}
if emailer.template == nil {
template, err := template.ParseFiles(emailer.TemplateFile)
if err != nil {
log.Critical("Cannot parse email template: %v", err)
return err
}
emailer.template = template
}
if emailer.groupMsgs == nil {
emailer.groupMsgs = make(map[string]Message)
}
for _, group := range emailer.Groups {
clusterGroup := fmt.Sprintf("%s,%s", msg.Cluster, msg.Group)
if clusterGroup == group {
emailer.groupMsgs[clusterGroup] = msg
}
}
if len(emailer.Groups) == len(emailer.groupMsgs) {
return emailer.sendConsumerGroupStatusNotify()
}
return nil
}
func (emailer *EmailNotifier) Ignore(msg Message) bool {
return int(msg.Status) < emailer.Threshold
}
func (emailer *EmailNotifier) sendConsumerGroupStatusNotify() error {
var bytesToSend bytes.Buffer
log.Debug("send email")
msgs := make([]Message, len(emailer.Groups))
i := 0
for group, msg := range emailer.groupMsgs {
msgs[i] = msg
delete(emailer.groupMsgs, group)
i++
}
err := emailer.template.Execute(&bytesToSend, struct {
From string
To string
Results []Message
}{
From: emailer.From,
To: emailer.To,
Results: msgs,
})
if err != nil {
log.Error("Failed to assemble email:", err)
return err
}
err = smtp.SendMail(fmt.Sprintf("%s:%v", emailer.Server, emailer.Port),
emailer.auth, emailer.From, []string{emailer.To}, bytesToSend.Bytes())
if err != nil {
log.Error("Failed to send email message:", err)
return err
}
return nil
}

Просмотреть файл

@ -8,7 +8,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
package notifier
import (
"encoding/json"

203
notifier/http_notifier.go Normal file
Просмотреть файл

@ -0,0 +1,203 @@
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package notifier
import (
"bytes"
log "github.com/cihub/seelog"
"github.com/pborman/uuid"
"io"
"io/ioutil"
"net/http"
"text/template"
"time"
)
type HttpNotifier struct {
Url string
TemplatePostFile string
TemplateDeleteFile string
Threshold int
SendDelete bool
Extras map[string]string
HttpClient *http.Client
templatePost *template.Template
templateDelete *template.Template
groupIds map[string]map[string]Event
}
type Event struct {
Id string
Start time.Time
}
func (notify *HttpNotifier) NotifierName() string {
return "http-notify"
}
func (notifier *HttpNotifier) Notify(msg Message) error {
// Helper functions for templates
fmap := template.FuncMap{
"jsonencoder": templateJsonEncoder,
"topicsbystatus": classifyTopicsByStatus,
"partitioncounts": templateCountPartitions,
"add": templateAdd,
"minus": templateMinus,
"multiply": templateMultiply,
"divide": templateDivide,
"maxlag": maxLagHelper,
}
if notifier.templatePost == nil {
// Compile the templates
templatePost, err := template.New("post").Funcs(fmap).ParseFiles(notifier.TemplatePostFile)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier POST template: %v", err)
return err
}
notifier.templatePost = templatePost.Templates()[0]
}
if notifier.templateDelete == nil {
templateDelete, err := template.New("delete").Funcs(fmap).ParseFiles(notifier.TemplateDeleteFile)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier DELETE template: %v", err)
return err
}
notifier.templateDelete = templateDelete.Templates()[0]
}
if notifier.groupIds == nil {
notifier.groupIds = make(map[string]map[string]Event)
}
return notifier.sendConsumerGroupStatusNotify(msg)
}
func (notifier *HttpNotifier) Ignore(msg Message) bool {
return int(msg.Status) < notifier.Threshold
}
func (notifier *HttpNotifier) sendConsumerGroupStatusNotify(msg Message) error {
// We only use IDs if we are sending deletes
idStr := ""
startTime := time.Now()
if notifier.SendDelete {
if _, ok := notifier.groupIds[msg.Cluster]; !ok {
// Create the cluster map
notifier.groupIds[msg.Cluster] = make(map[string]Event)
}
if _, ok := notifier.groupIds[msg.Cluster][msg.Group]; !ok {
// Create Event and Id
eventId := uuid.NewRandom()
idStr = eventId.String()
notifier.groupIds[msg.Cluster][msg.Group] = Event{
Id: idStr,
Start: startTime,
}
} else {
idStr = notifier.groupIds[msg.Cluster][msg.Group].Id
startTime = notifier.groupIds[msg.Cluster][msg.Group].Start
}
}
// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
bytesToSend := new(bytes.Buffer)
err := notifier.templatePost.Execute(bytesToSend, struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
Result Message
JsonEncode func(interface{}) string
}{
Cluster: msg.Cluster,
Group: msg.Group,
Id: idStr,
Start: startTime,
Extras: notifier.Extras,
Result: msg,
JsonEncode: templateJsonEncoder,
})
if err != nil {
log.Errorf("Failed to assemble POST: %v", err)
return err
}
// Send POST to HTTP endpoint
req, err := http.NewRequest("POST", notifier.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")
resp, err := notifier.HttpClient.Do(req)
if err != nil {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %v", msg.Group, msg.Cluster, msg.Status, idStr, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent POST for group %s in cluster %s at severity %v (Id %s)", msg.Group, msg.Cluster, msg.Status, idStr)
} else {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %s", msg.Group,
msg.Cluster, msg.Status, idStr, resp.Status)
}
if notifier.SendDelete && (msg.Status == StatusOK) {
if _, ok := notifier.groupIds[msg.Cluster][msg.Group]; ok {
// Send DELETE to HTTP endpoint
bytesToSend := new(bytes.Buffer)
err := notifier.templateDelete.Execute(bytesToSend, struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
}{
Cluster: msg.Cluster,
Group: msg.Group,
Id: notifier.groupIds[msg.Cluster][msg.Group].Id,
Start: notifier.groupIds[msg.Cluster][msg.Group].Start,
Extras: notifier.Extras,
})
if err != nil {
log.Errorf("Failed to assemble DELETE for group %s in cluster %s (Id %s): %v", msg.Group,
msg.Cluster, notifier.groupIds[msg.Cluster][msg.Group].Id, err)
return err
}
req, err := http.NewRequest("DELETE", notifier.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")
resp, err := notifier.HttpClient.Do(req)
if err != nil {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %v", msg.Group,
msg.Cluster, notifier.groupIds[msg.Cluster][msg.Group].Id, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent DELETE for group %s in cluster %s (Id %s)", msg.Group, msg.Cluster,
notifier.groupIds[msg.Cluster][msg.Group].Id)
} else {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %s", msg.Group,
msg.Cluster, notifier.groupIds[msg.Cluster][msg.Group].Id, resp.Status)
}
// Remove ID for group that is now clear
delete(notifier.groupIds[msg.Cluster], msg.Group)
}
}
return nil
}

75
notifier/notifier.go Normal file
Просмотреть файл

@ -0,0 +1,75 @@
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package notifier
import (
"encoding/json"
)
type StatusConstant int
const (
StatusNotFound StatusConstant = 0
StatusOK StatusConstant = 1
StatusWarning StatusConstant = 2
StatusError StatusConstant = 3
StatusStop StatusConstant = 4
StatusStall StatusConstant = 5
StatusRewind StatusConstant = 6
)
var StatusStrings = [...]string{"NOTFOUND", "OK", "WARN", "ERR", "STOP", "STALL", "REWIND"}
func (c StatusConstant) String() string {
if (c >= 0) && (c < StatusConstant(len(StatusStrings))) {
return StatusStrings[c]
} else {
return "UNKNOWN"
}
}
func (c StatusConstant) MarshalText() ([]byte, error) {
return []byte(c.String()), nil
}
func (c StatusConstant) MarshalJSON() ([]byte, error) {
return json.Marshal(c.String())
}
type ConsumerOffset struct {
Offset int64
Timestamp int64
Lag int64
artificial bool
}
type PartitionStatus struct {
Topic string
Partition int32
Status StatusConstant
Start ConsumerOffset
End ConsumerOffset
}
type Message struct {
Cluster string
Group string
Status StatusConstant
Complete bool
Partitions []*PartitionStatus
TotalPartitions int
Maxlag *PartitionStatus
TotalLag uint64
}
type Notifier interface {
Notify(msg Message) error
NotifierName() string
Ignore(msg Message) bool
}

Просмотреть файл

@ -8,7 +8,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
package notifier
import (
"bytes"
@ -17,22 +17,21 @@ import (
"fmt"
log "github.com/cihub/seelog"
"io/ioutil"
"net"
"net/http"
"time"
)
type SlackNotifier struct {
refreshTicker *time.Ticker
url string
channel string
username string
iconUrl string
iconEmoji string
threshold int
httpClient *http.Client
groups []string
groupResults map[string]Message
RefreshTicker *time.Ticker
Url string
Channel string
Username string
IconUrl string
IconEmoji string
Threshold int
HttpClient *http.Client
Groups []string
groupMsgs map[string]Message
}
type SlackMessage struct {
@ -58,68 +57,33 @@ func (slack *SlackNotifier) NotifierName() string {
}
func (slack *SlackNotifier) Ignore(msg Message) bool {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
return int(result.Status) < slack.threshold
}
return true
return int(msg.Status) < slack.Threshold
}
func (slack *SlackNotifier) Notify(msg Message) error {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
for _, group := range slack.groups {
clusterGroup := fmt.Sprintf("%s,%s", result.Cluster, result.Group)
if clusterGroup == group {
slack.groupResults[clusterGroup] = msg
}
}
if len(slack.groups) == len(slack.groupResults) {
return slack.sendConsumerGroupStatusNotify()
}
if slack.groupMsgs == nil {
slack.groupMsgs = make(map[string]Message)
}
default:
log.Infof("msg type is not ConsumerGroupStatus")
for _, group := range slack.Groups {
clusterGroup := fmt.Sprintf("%s,%s", msg.Cluster, msg.Group)
if clusterGroup == group {
slack.groupMsgs[clusterGroup] = msg
}
}
if len(slack.Groups) == len(slack.groupMsgs) {
return slack.sendConsumerGroupStatusNotify()
}
return nil
}
func NewSlackNotifier(app *ApplicationContext) (*SlackNotifier, error) {
log.Info("Start Slack Notify")
return &SlackNotifier{
url: app.Config.Slacknotifier.Url,
groups: app.Config.Slacknotifier.Groups,
groupResults: make(map[string]Message),
threshold: app.Config.Slacknotifier.Threshold,
channel: app.Config.Slacknotifier.Channel,
username: app.Config.Slacknotifier.Username,
iconUrl: app.Config.Slacknotifier.IconUrl,
iconEmoji: app.Config.Slacknotifier.IconEmoji,
httpClient: &http.Client{
Timeout: time.Duration(app.Config.Slacknotifier.Timeout) * time.Second,
Transport: &http.Transport{
Dial: (&net.Dialer{
KeepAlive: time.Duration(app.Config.Slacknotifier.Keepalive) * time.Second,
}).Dial,
Proxy: http.ProxyFromEnvironment,
},
},
}, nil
}
func (slack *SlackNotifier) sendConsumerGroupStatusNotify() error {
results := make([]attachment, len(slack.groups))
msgs := make([]attachment, len(slack.Groups))
i := 0
for _, groupResult := range slack.groupResults {
result, _ := groupResult.(*ConsumerGroupStatus)
for _, msg := range slack.groupMsgs {
var emoji, color string
switch result.Status {
switch msg.Status {
case StatusOK:
emoji = ":white_check_mark:"
color = "good"
@ -132,18 +96,18 @@ func (slack *SlackNotifier) sendConsumerGroupStatusNotify() error {
}
title := "Burrow monitoring report"
fallback := fmt.Sprintf("%s is %s", result.Group, result.Status)
pretext := fmt.Sprintf("%s Group `%s` in Cluster `%s` is *%s*", emoji, result.Group, result.Cluster, result.Status)
fallback := fmt.Sprintf("%s is %s", msg.Group, msg.Status)
pretext := fmt.Sprintf("%s Group `%s` in Cluster `%s` is *%s*", emoji, msg.Group, msg.Cluster, msg.Status)
detailedBody := fmt.Sprintf("*Detail:* Total Partition = `%d` Fail Partition = `%d`\n",
result.TotalPartitions, len(result.Partitions))
msg.TotalPartitions, len(msg.Partitions))
for _, p := range result.Partitions {
for _, p := range msg.Partitions {
detailedBody += fmt.Sprintf("*%s* *[%s:%d]* (%d, %d) -> (%d, %d)\n",
p.Status.String(), p.Topic, p.Partition, p.Start.Offset, p.Start.Lag, p.End.Offset, p.End.Lag)
}
results[i] = attachment{
msgs[i] = attachment{
Color: color,
Title: title,
Fallback: fallback,
@ -154,14 +118,14 @@ func (slack *SlackNotifier) sendConsumerGroupStatusNotify() error {
i++
}
slack.groupResults = make(map[string]Message)
slack.groupMsgs = make(map[string]Message)
slackMessage := &SlackMessage{
Channel: slack.channel,
Username: slack.username,
IconUrl: slack.iconUrl,
IconEmoji: slack.iconEmoji,
Attachments: results,
Channel: slack.Channel,
Username: slack.Username,
IconUrl: slack.IconUrl,
IconEmoji: slack.IconEmoji,
Attachments: msgs,
}
return slack.postToSlack(slackMessage)
@ -176,10 +140,10 @@ func (slack *SlackNotifier) postToSlack(slackMessage *SlackMessage) error {
log.Debugf("struct = %+v, json = %s", slackMessage, string(data))
b := bytes.NewBuffer(data)
req, err := http.NewRequest("POST", slack.url, b)
req, err := http.NewRequest("POST", slack.Url, b)
req.Header.Set("Content-Type", "application/json")
if res, err := slack.httpClient.Do(req); err != nil {
if res, err := slack.HttpClient.Do(req); err != nil {
log.Errorf("Unable to send data to slack:%+v", err)
return err
} else {

Просмотреть файл

@ -12,8 +12,12 @@ package main
import (
log "github.com/cihub/seelog"
"github.com/linkedin/Burrow/notifier"
"math/rand"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
)
@ -21,7 +25,7 @@ import (
type NotifyCenter struct {
app *ApplicationContext
interval int64
notifiers []Notifier
notifiers []notifier.Notifier
refreshTicker *time.Ticker
quitChan chan struct{}
groupList map[string]map[string]bool
@ -30,7 +34,7 @@ type NotifyCenter struct {
}
func LoadNotifiers(app *ApplicationContext) error {
notifiers := []Notifier{}
notifiers := []notifier.Notifier{}
if app.Config.Httpnotifier.Enable {
if httpNotifier, err := NewHttpNotifier(app); err == nil {
notifiers = append(notifiers, httpNotifier)
@ -107,9 +111,11 @@ func StopNotifiers(app *ApplicationContext) {
}
func (nc *NotifyCenter) handleEvaluationResponse(result *ConsumerGroupStatus) {
// TODO convert result to Message
msg := notifier.Message{}
for _, notifier := range nc.notifiers {
if !notifier.Ignore(result) {
notifier.Notify(result)
if !notifier.Ignore(msg) {
notifier.Notify(msg)
}
}
}
@ -177,3 +183,80 @@ func (nc *NotifyCenter) startConsumerGroupEvaluator(group string, cluster string
time.Sleep(time.Duration(nc.interval) * time.Second)
}
}
func NewEmailNotifier(app *ApplicationContext) ([]*notifier.EmailNotifier, error) {
log.Info("Start email notify")
emailers := []*notifier.EmailNotifier{}
for to, cfg := range app.Config.Emailnotifier {
if cfg.Enable {
emailer := &notifier.EmailNotifier{
Threshold: cfg.Threshold,
TemplateFile: app.Config.Smtp.Template,
Server: app.Config.Smtp.Server,
Port: app.Config.Smtp.Port,
Username: app.Config.Smtp.Username,
Password: app.Config.Smtp.Password,
AuthType: app.Config.Smtp.AuthType,
Interval: cfg.Interval,
From: app.Config.Smtp.From,
To: to,
Groups: cfg.Groups,
}
emailers = append(emailers, emailer)
}
}
return emailers, nil
}
func NewHttpNotifier(app *ApplicationContext) (*notifier.HttpNotifier, error) {
httpConfig := app.Config.Httpnotifier
// Parse the extra parameters for the templates
extras := make(map[string]string)
for _, extra := range httpConfig.Extras {
parts := strings.Split(extra, "=")
extras[parts[0]] = parts[1]
}
return &notifier.HttpNotifier{
Url: httpConfig.Url,
Threshold: httpConfig.PostThreshold,
SendDelete: httpConfig.SendDelete,
TemplatePostFile: httpConfig.TemplatePost,
TemplateDeleteFile: httpConfig.TemplateDelete,
Extras: extras,
HttpClient: &http.Client{
Timeout: time.Duration(httpConfig.Timeout) * time.Second,
Transport: &http.Transport{
Dial: (&net.Dialer{
KeepAlive: time.Duration(httpConfig.Keepalive) * time.Second,
}).Dial,
Proxy: http.ProxyFromEnvironment,
},
},
}, nil
}
func NewSlackNotifier(app *ApplicationContext) (*notifier.SlackNotifier, error) {
log.Info("Start Slack Notify")
return &notifier.SlackNotifier{
Url: app.Config.Slacknotifier.Url,
Groups: app.Config.Slacknotifier.Groups,
Threshold: app.Config.Slacknotifier.Threshold,
Channel: app.Config.Slacknotifier.Channel,
Username: app.Config.Slacknotifier.Username,
IconUrl: app.Config.Slacknotifier.IconUrl,
IconEmoji: app.Config.Slacknotifier.IconEmoji,
HttpClient: &http.Client{
Timeout: time.Duration(app.Config.Slacknotifier.Timeout) * time.Second,
Transport: &http.Transport{
Dial: (&net.Dialer{
KeepAlive: time.Duration(app.Config.Slacknotifier.Keepalive) * time.Second,
}).Dial,
Proxy: http.ProxyFromEnvironment,
},
},
}, nil
}