Move loadNotifiers to notify_center

This commit is contained in:
jsvisa 2016-04-21 14:00:37 +08:00 коммит произвёл Delweng Zheng
Родитель abb330b8aa
Коммит 053606df50
6 изменённых файлов: 310 добавлений и 432 удалений

Просмотреть файл

@ -74,6 +74,9 @@ type BurrowConfig struct {
Enable bool `gcfg:"server"`
Port int `gcfg:"port"`
}
Notify struct {
Interval int64 `gcfg:"interval"`
}
Smtp struct {
Server string `gcfg:"server"`
Port int `gcfg:"port"`
@ -83,12 +86,15 @@ type BurrowConfig struct {
From string `gcfg:"from"`
Template string `gcfg:"template"`
}
Email map[string]*struct {
Emailnotifier map[string]*struct {
Enable bool `gcfg:"enable"`
Groups []string `gcfg:"group"`
Interval int `gcfg:"interval"`
Interval int64 `gcfg:"interval"`
Threshold string `gcfg:"threhsold"`
}
Httpnotifier struct {
Enable bool `gcfg:"enable"`
Groups []string `gcfg:"group"`
Url string `gcfg:"url"`
Interval int64 `gcfg:"interval"`
Extras []string `gcfg:"extra"`
@ -338,7 +344,7 @@ func ValidateConfig(app *ApplicationContext) error {
// Username and password are not validated - they're optional
// Email configs
for email, cfg := range app.Config.Email {
for email, cfg := range app.Config.Emailnotifier {
if !validateEmail(email) {
errs = append(errs, "Email address is invalid")
}
@ -375,7 +381,7 @@ func ValidateConfig(app *ApplicationContext) error {
}
}
} else {
if len(app.Config.Email) > 0 {
if len(app.Config.Emailnotifier) > 0 {
errs = append(errs, "Email notifications are configured, but SMTP server is not configured")
}
}

120
email_notifier.go Normal file
Просмотреть файл

@ -0,0 +1,120 @@
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
import (
"bytes"
"fmt"
log "github.com/cihub/seelog"
"net/smtp"
"text/template"
)
type EmailNotifier struct {
template *template.Template
auth smtp.Auth
server string
port int
interval int64
threshold int
username string
password string
from string
to string
groups []string
}
func (emailer *EmailNotifier) NotifierName() string {
return "email-notify"
}
func (emailer *EmailNotifier) Notify(msg Message) error {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
return emailer.sendConsumerGroupStatusNotify(result)
}
return nil
}
func (emailer *EmailNotifier) Ignore(msg Message) bool {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
return int(result.Status) < emailer.threshold
}
return false
}
func NewEmailNotifier(app *ApplicationContext) ([]*EmailNotifier, error) {
log.Info("Start email notify")
template, err := template.ParseFiles(app.Config.Smtp.Template)
if err != nil {
log.Critical("Cannot parse email template: %v", err)
return nil, err
}
var auth smtp.Auth
switch app.Config.Smtp.AuthType {
case "plain":
auth = smtp.PlainAuth("", app.Config.Smtp.Username, app.Config.Smtp.Password, app.Config.Smtp.Server)
case "crammd5":
auth = smtp.CRAMMD5Auth(app.Config.Smtp.Username, app.Config.Smtp.Password)
}
emailers := []*EmailNotifier{}
for to, cfg := range app.Config.Emailnotifier {
if cfg.Enable {
emailer := &EmailNotifier{
threshold: cfg.Threshold,
template: template,
server: app.Config.Smtp.Server,
port: app.Config.Smtp.Port,
username: app.Config.Smtp.Username,
password: app.Config.Smtp.Password,
auth: auth,
groups: cfg.Groups,
interval: cfg.Interval,
from: app.Config.Smtp.From,
to: to,
}
emailers = append(emailers, emailer)
}
}
return emailers, nil
}
func (emailer *EmailNotifier) sendConsumerGroupStatusNotify(result *ConsumerGroupStatus) error {
var bytesToSend bytes.Buffer
err := emailer.template.Execute(&bytesToSend, struct {
From string
To string
Results []*ConsumerGroupStatus
}{
From: emailer.from,
To: emailer.to,
Results: []*ConsumerGroupStatus{result},
})
if err != nil {
log.Error("Failed to assemble email:", err)
return err
}
err = smtp.SendMail(fmt.Sprintf("%s:%v", emailer.server, emailer.port),
emailer.auth, emailer.from, []string{emailer.to}, bytesToSend.Bytes())
if err != nil {
log.Error("Failed to send email message:", err)
return err
}
return nil
}

Просмотреть файл

@ -1,132 +0,0 @@
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
import (
"bytes"
"fmt"
log "github.com/cihub/seelog"
"net/smtp"
"os"
"strings"
"text/template"
"time"
)
type Emailer struct {
app *ApplicationContext
template *template.Template
Tickers map[string]*time.Ticker
quitSends chan struct{}
auth smtp.Auth
}
func NewEmailer(app *ApplicationContext) (*Emailer, error) {
template, err := template.ParseFiles(app.Config.Smtp.Template)
if err != nil {
log.Critical("Cannot parse email template: %v", err)
os.Exit(1)
}
var auth smtp.Auth
switch app.Config.Smtp.AuthType {
case "plain":
auth = smtp.PlainAuth("", app.Config.Smtp.Username, app.Config.Smtp.Password, app.Config.Smtp.Server)
case "crammd5":
auth = smtp.CRAMMD5Auth(app.Config.Smtp.Username, app.Config.Smtp.Password)
}
return &Emailer{
app: app,
template: template,
Tickers: make(map[string]*time.Ticker),
quitSends: make(chan struct{}),
auth: auth,
}, nil
}
func (emailer *Emailer) Start() {
for email, cfg := range emailer.app.Config.Email {
emailer.Tickers[email] = time.NewTicker(time.Duration(cfg.Interval) * time.Second)
go emailer.sendEmailNotifications(email, cfg.Threshold, cfg.Groups, emailer.Tickers[email].C)
}
}
func (emailer *Emailer) Stop() {
close(emailer.quitSends)
}
func (emailer *Emailer) sendEmail(to string, results []*ConsumerGroupStatus) {
var bytesToSend bytes.Buffer
err := emailer.template.Execute(&bytesToSend, struct {
From string
To string
Results []*ConsumerGroupStatus
}{
From: emailer.app.Config.Smtp.From,
To: to,
Results: results,
})
if err != nil {
log.Error("Failed to assemble email:", err)
}
err = smtp.SendMail(fmt.Sprintf("%s:%v", emailer.app.Config.Smtp.Server, emailer.app.Config.Smtp.Port),
emailer.auth, emailer.app.Config.Smtp.From, []string{to}, bytesToSend.Bytes())
if err != nil {
log.Error("Failed to send email message:", err)
}
}
func (emailer *Emailer) sendEmailNotifications(email string, threshold string, groups []string, ticker <-chan time.Time) {
// Convert the config threshold string into a value
thresholdVal := StatusWarning
if threshold == "ERROR" {
thresholdVal = StatusError
}
OUTERLOOP:
for {
select {
case <-emailer.quitSends:
for _, ticker := range emailer.Tickers {
ticker.Stop()
}
break OUTERLOOP
case <-ticker:
results := make([]*ConsumerGroupStatus, 0, len(groups))
resultChannel := make(chan *ConsumerGroupStatus)
for _, group := range groups {
groupParts := strings.Split(group, ",")
storageRequest := &RequestConsumerStatus{Result: resultChannel, Cluster: groupParts[0], Group: groupParts[1]}
emailer.app.Storage.requestChannel <- storageRequest
}
for {
result := <-resultChannel
results = append(results, result)
if len(results) == len(groups) {
break
}
}
// Send an email if any of the results breaches the threshold
for _, result := range results {
if result.Status >= thresholdVal {
emailer.sendEmail(email, results)
break
}
}
}
}
}

Просмотреть файл

@ -16,27 +16,22 @@ import (
"github.com/pborman/uuid"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"os"
"strings"
"sync"
"text/template"
"time"
)
type HttpNotifier struct {
app *ApplicationContext
url string
templatePost *template.Template
templateDelete *template.Template
threshold int
sendDelete bool
extras map[string]string
refreshTicker *time.Ticker
quitChan chan struct{}
groupIds map[string]map[string]Event
groupList map[string]map[string]bool
groupLock sync.RWMutex
resultsChannel chan *ConsumerGroupStatus
httpClient *http.Client
}
@ -45,7 +40,33 @@ type Event struct {
Start time.Time
}
func (notify *HttpNotifier) NotifierName() string {
return "http-notify"
}
func (notifier *HttpNotifier) Notify(msg Message) error {
switch msg.(type) {
case *ConsumerGroupStatus:
result := msg.(*ConsumerGroupStatus)
return notifier.sendConsumerGroupStatusNotify(result)
default:
return nil
}
return nil
}
func (notifier *HttpNotifier) Ignore(msg Message) bool {
switch msg.(type) {
case *ConsumerGroupStatus:
result, _ := msg.(*ConsumerGroupStatus)
return int(result.Status) < notifier.threshold
}
return true
}
func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
httpConfig := app.Config.Httpnotifier
// Helper functions for templates
fmap := template.FuncMap{
"jsonencoder": templateJsonEncoder,
@ -59,14 +80,14 @@ func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
}
// Compile the templates
templatePost, err := template.New("post").Funcs(fmap).ParseFiles(app.Config.Httpnotifier.TemplatePost)
templatePost, err := template.New("post").Funcs(fmap).ParseFiles(httpConfig.TemplatePost)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier POST template: %v", err)
os.Exit(1)
}
templatePost = templatePost.Templates()[0]
templateDelete, err := template.New("delete").Funcs(fmap).ParseFiles(app.Config.Httpnotifier.TemplateDelete)
templateDelete, err := template.New("delete").Funcs(fmap).ParseFiles(httpConfig.TemplateDelete)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier DELETE template: %v", err)
os.Exit(1)
@ -75,26 +96,24 @@ func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
// Parse the extra parameters for the templates
extras := make(map[string]string)
for _, extra := range app.Config.Httpnotifier.Extras {
for _, extra := range httpConfig.Extras {
parts := strings.Split(extra, "=")
extras[parts[0]] = parts[1]
}
return &HttpNotifier{
app: app,
url: httpConfig.Url,
threshold: httpConfig.PostThreshold,
sendDelete: httpConfig.SendDelete,
templatePost: templatePost,
templateDelete: templateDelete,
extras: extras,
quitChan: make(chan struct{}),
groupIds: make(map[string]map[string]Event),
groupList: make(map[string]map[string]bool),
groupLock: sync.RWMutex{},
resultsChannel: make(chan *ConsumerGroupStatus),
httpClient: &http.Client{
Timeout: time.Duration(app.Config.Httpnotifier.Timeout) * time.Second,
Timeout: time.Duration(httpConfig.Timeout) * time.Second,
Transport: &http.Transport{
Dial: (&net.Dialer{
KeepAlive: time.Duration(app.Config.Httpnotifier.Keepalive) * time.Second,
KeepAlive: time.Duration(httpConfig.Keepalive) * time.Second,
}).Dial,
Proxy: http.ProxyFromEnvironment,
},
@ -102,75 +121,73 @@ func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
}, nil
}
func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStatus) {
if int(result.Status) >= notifier.app.Config.Httpnotifier.PostThreshold {
// We only use IDs if we are sending deletes
idStr := ""
startTime := time.Now()
if notifier.app.Config.Httpnotifier.SendDelete {
if _, ok := notifier.groupIds[result.Cluster]; !ok {
// Create the cluster map
notifier.groupIds[result.Cluster] = make(map[string]Event)
func (notifier *HttpNotifier) sendConsumerGroupStatusNotify(result *ConsumerGroupStatus) error {
// We only use IDs if we are sending deletes
idStr := ""
startTime := time.Now()
if notifier.sendDelete {
if _, ok := notifier.groupIds[result.Cluster]; !ok {
// Create the cluster map
notifier.groupIds[result.Cluster] = make(map[string]Event)
}
if _, ok := notifier.groupIds[result.Cluster][result.Group]; !ok {
// Create Event and Id
eventId := uuid.NewRandom()
idStr = eventId.String()
notifier.groupIds[result.Cluster][result.Group] = Event{
Id: idStr,
Start: startTime,
}
if _, ok := notifier.groupIds[result.Cluster][result.Group]; !ok {
// Create Event and Id
eventId := uuid.NewRandom()
idStr = eventId.String()
notifier.groupIds[result.Cluster][result.Group] = Event{
Id: idStr,
Start: startTime,
}
} else {
idStr = notifier.groupIds[result.Cluster][result.Group].Id
startTime = notifier.groupIds[result.Cluster][result.Group].Start
}
}
// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
bytesToSend := new(bytes.Buffer)
err := notifier.templatePost.Execute(bytesToSend, struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
Result *ConsumerGroupStatus
JsonEncode func(interface{}) string
}{
Cluster: result.Cluster,
Group: result.Group,
Id: idStr,
Start: startTime,
Extras: notifier.extras,
Result: result,
JsonEncode: templateJsonEncoder,
})
if err != nil {
log.Errorf("Failed to assemble POST: %v", err)
return
}
// Send POST to HTTP endpoint
req, err := http.NewRequest("POST", notifier.app.Config.Httpnotifier.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")
resp, err := notifier.httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %v", result.Group, result.Cluster, result.Status, idStr, err)
return
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent POST for group %s in cluster %s at severity %v (Id %s)", result.Group, result.Cluster, result.Status, idStr)
} else {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %s", result.Group,
result.Cluster, result.Status, idStr, resp.Status)
idStr = notifier.groupIds[result.Cluster][result.Group].Id
startTime = notifier.groupIds[result.Cluster][result.Group].Start
}
}
if notifier.app.Config.Httpnotifier.SendDelete && (result.Status == StatusOK) {
// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
bytesToSend := new(bytes.Buffer)
err := notifier.templatePost.Execute(bytesToSend, struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
Result *ConsumerGroupStatus
JsonEncode func(interface{}) string
}{
Cluster: result.Cluster,
Group: result.Group,
Id: idStr,
Start: startTime,
Extras: notifier.extras,
Result: result,
JsonEncode: templateJsonEncoder,
})
if err != nil {
log.Errorf("Failed to assemble POST: %v", err)
return err
}
// Send POST to HTTP endpoint
req, err := http.NewRequest("POST", notifier.url, bytesToSend)
req.Header.Set("Content-Type", "application/json")
resp, err := notifier.httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %v", result.Group, result.Cluster, result.Status, idStr, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent POST for group %s in cluster %s at severity %v (Id %s)", result.Group, result.Cluster, result.Status, idStr)
} else {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %s", result.Group,
result.Cluster, result.Status, idStr, resp.Status)
}
if notifier.sendDelete && (result.Status == StatusOK) {
if _, ok := notifier.groupIds[result.Cluster][result.Group]; ok {
// Send DELETE to HTTP endpoint
bytesToSend := new(bytes.Buffer)
@ -190,17 +207,17 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
if err != nil {
log.Errorf("Failed to assemble DELETE for group %s in cluster %s (Id %s): %v", result.Group,
result.Cluster, notifier.groupIds[result.Cluster][result.Group].Id, err)
return
return err
}
req, err := http.NewRequest("DELETE", notifier.app.Config.Httpnotifier.Url, bytesToSend)
req, err := http.NewRequest("DELETE", notifier.url, bytesToSend)
req.Header.Set("Content-Type", "application/json")
resp, err := notifier.httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %v", result.Group,
result.Cluster, notifier.groupIds[result.Cluster][result.Group].Id, err)
return
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
@ -217,106 +234,5 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
delete(notifier.groupIds[result.Cluster], result.Group)
}
}
}
func (notifier *HttpNotifier) refreshConsumerGroups() {
notifier.groupLock.Lock()
defer notifier.groupLock.Unlock()
for cluster, _ := range notifier.app.Config.Kafka {
clusterGroups, ok := notifier.groupList[cluster]
if !ok {
notifier.groupList[cluster] = make(map[string]bool)
clusterGroups = notifier.groupList[cluster]
}
// Get a current list of consumer groups
storageRequest := &RequestConsumerList{Result: make(chan []string), Cluster: cluster}
notifier.app.Storage.requestChannel <- storageRequest
consumerGroups := <-storageRequest.Result
// Mark all existing groups false
for consumerGroup := range notifier.groupList {
clusterGroups[consumerGroup] = false
}
// Check for new groups, mark existing groups true
for _, consumerGroup := range consumerGroups {
// Don't bother adding groups in the blacklist
if (notifier.app.Storage.groupBlacklist != nil) && notifier.app.Storage.groupBlacklist.MatchString(consumerGroup) {
continue
}
if _, ok := clusterGroups[consumerGroup]; !ok {
// Add new consumer group and start checking it
log.Debugf("Start evaluating consumer group %s in cluster %s", consumerGroup, cluster)
go notifier.startConsumerGroupEvaluator(consumerGroup, cluster)
}
clusterGroups[consumerGroup] = true
}
// Delete groups that are still false
for consumerGroup := range clusterGroups {
if !clusterGroups[consumerGroup] {
log.Debugf("Remove evaluator for consumer group %s in cluster %s", consumerGroup, cluster)
delete(clusterGroups, consumerGroup)
}
}
}
}
func (notifier *HttpNotifier) startConsumerGroupEvaluator(group string, cluster string) {
// Sleep for a random portion of the check interval
time.Sleep(time.Duration(rand.Int63n(notifier.app.Config.Httpnotifier.Interval*1000)) * time.Millisecond)
for {
// Make sure this group still exists
notifier.groupLock.RLock()
if _, ok := notifier.groupList[cluster][group]; !ok {
notifier.groupLock.RUnlock()
log.Debugf("Stopping evaluator for consumer group %s in cluster %s", group, cluster)
break
}
notifier.groupLock.RUnlock()
// Send requests for group status - responses are handled by the main loop (for now)
storageRequest := &RequestConsumerStatus{Result: notifier.resultsChannel, Cluster: cluster, Group: group}
notifier.app.Storage.requestChannel <- storageRequest
// Sleep for the check interval
time.Sleep(time.Duration(notifier.app.Config.Httpnotifier.Interval) * time.Second)
}
}
func (notifier *HttpNotifier) Start() {
// Get a group list to start with (this will start the notifiers)
notifier.refreshConsumerGroups()
// Set a ticker to refresh the group list periodically
notifier.refreshTicker = time.NewTicker(time.Duration(notifier.app.Config.Lagcheck.ZKGroupRefresh) * time.Second)
// Main loop to handle refreshes and evaluation responses
go func() {
OUTERLOOP:
for {
select {
case <-notifier.quitChan:
break OUTERLOOP
case <-notifier.refreshTicker.C:
notifier.refreshConsumerGroups()
case result := <-notifier.resultsChannel:
go notifier.handleEvaluationResponse(result)
}
}
}()
}
func (notifier *HttpNotifier) Stop() {
if notifier.refreshTicker != nil {
notifier.refreshTicker.Stop()
notifier.groupLock.Lock()
notifier.groupList = make(map[string]map[string]bool)
notifier.groupLock.Unlock()
}
close(notifier.quitChan)
return nil
}

68
main.go
Просмотреть файл

@ -37,70 +37,10 @@ type ApplicationContext struct {
Clusters map[string]*KafkaCluster
Storms map[string]*StormCluster
Server *HttpServer
Emailer *Emailer
HttpNotifier *HttpNotifier
NotifyCenter *NotifyCenter
NotifierLock *zk.Lock
}
func loadNotifiers(app *ApplicationContext) error {
// Set up the Emailer, if configured
if len(app.Config.Email) > 0 {
log.Info("Configuring Email notifier")
emailer, err := NewEmailer(app)
if err != nil {
log.Criticalf("Cannot configure email notifier: %v", err)
return err
}
app.Emailer = emailer
}
// Set up the HTTP Notifier, if configured
if app.Config.Httpnotifier.Url != "" {
log.Info("Configuring HTTP notifier")
httpnotifier, err := NewHttpNotifier(app)
if err != nil {
log.Criticalf("Cannot configure HTTP notifier: %v", err)
return err
}
app.HttpNotifier = httpnotifier
}
return nil
}
func startNotifiers(app *ApplicationContext) {
// Do not proceed until we get the Zookeeper lock
err := app.NotifierLock.Lock()
if err != nil {
log.Criticalf("Cannot get ZK notifier lock: %v", err)
os.Exit(1)
}
log.Info("Acquired Zookeeper notifier lock")
if app.Emailer != nil {
log.Info("Starting Email notifier")
app.Emailer.Start()
}
if app.HttpNotifier != nil {
log.Info("Starting HTTP notifier")
app.HttpNotifier.Start()
}
}
func stopNotifiers(app *ApplicationContext) {
// Ignore errors on unlock - we're quitting anyways, and it might not be locked
app.NotifierLock.Unlock()
if app.Emailer != nil {
log.Info("Stopping Email notifier")
app.Emailer.Stop()
}
if app.HttpNotifier != nil {
log.Info("Stopping HTTP notifier")
app.HttpNotifier.Stop()
}
}
// Why two mains? Golang doesn't let main() return, which means defers will not run.
// So we do everything in a separate main, that way we can easily exit out with an error code and still run defers
func burrowMain() int {
@ -196,15 +136,15 @@ func burrowMain() int {
appContext.NotifierLock = zk.NewLock(zkconn, appContext.Config.Zookeeper.LockPath, zk.WorldACL(zk.PermAll))
// Load the notifiers, but do not start them
err = loadNotifiers(appContext)
err = LoadNotifiers(appContext)
if err != nil {
// Error was already logged
return 1
}
// Notifiers are started in a goroutine if we get the ZK lock
go startNotifiers(appContext)
defer stopNotifiers(appContext)
go StartNotifiers(appContext)
defer StopNotifiers(appContext)
// Register signal handlers for exiting
exitChannel := make(chan os.Signal, 1)

Просмотреть файл

@ -13,6 +13,7 @@ package main
import (
log "github.com/cihub/seelog"
"math/rand"
"os"
"sync"
"time"
)
@ -20,32 +21,92 @@ import (
type NotifyCenter struct {
app *ApplicationContext
interval int64
notifiers []Notifier
refreshTicker *time.Ticker
quitChan chan struct{}
groupIds map[string]map[string]event
groupList map[string]map[string]bool
groupLock sync.RWMutex
resultsChannel chan *ConsumerGroupStatus
}
type event struct {
Id string
Start time.Time
}
func LoadNotifiers(app *ApplicationContext) error {
notifiers := []Notifier{}
if app.Config.Httpnotifier.Enable {
if httpNotifier, err := NewHttpNotifier(app); err == nil {
notifiers = append(notifiers, httpNotifier)
}
}
if len(app.Config.Emailnotifier) > 0 {
if emailNotifiers, err := NewEmailNotifier(app); err == nil {
for _, emailer := range emailNotifiers {
notifiers = append(notifiers, emailer)
}
}
}
func NewNotifyCenter(app *ApplicationContext) (*NotifyCenter, error) {
return &NotifyCenter{
nc := &NotifyCenter{
app: app,
notifiers: notifiers,
interval: app.Config.Notify.Interval,
quitChan: make(chan struct{}),
groupIds: make(map[string]map[string]event),
groupList: make(map[string]map[string]bool),
groupLock: sync.RWMutex{},
resultsChannel: make(chan *ConsumerGroupStatus),
}, nil
}
app.NotifyCenter = nc
return nil
}
func (notifier *NotifyCenter) handleEvaluationResponse(result *ConsumerGroupStatus) {
// TODO: notify all
func StartNotifiers(app *ApplicationContext) {
nc := app.NotifyCenter
// Do not proceed until we get the Zookeeper lock
err := app.NotifierLock.Lock()
if err != nil {
log.Criticalf("Cannot get ZK nc lock: %v", err)
os.Exit(1)
}
log.Info("Acquired Zookeeper notify lock")
// Get a group list to start with (this will start the ncs)
nc.refreshConsumerGroups()
// Set a ticker to refresh the group list periodically
nc.refreshTicker = time.NewTicker(time.Duration(nc.app.Config.Lagcheck.ZKGroupRefresh) * time.Second)
// Main loop to handle refreshes and evaluation responses
OUTERLOOP:
for {
select {
case <-nc.quitChan:
break OUTERLOOP
case <-nc.refreshTicker.C:
nc.refreshConsumerGroups()
case result := <-nc.resultsChannel:
go nc.handleEvaluationResponse(result)
}
}
}
func StopNotifiers(app *ApplicationContext) {
// Ignore errors on unlock - we're quitting anyways, and it might not be locked
app.NotifierLock.Unlock()
nc := app.NotifyCenter
if nc.refreshTicker != nil {
nc.refreshTicker.Stop()
nc.groupLock.Lock()
nc.groupList = make(map[string]map[string]bool)
nc.groupLock.Unlock()
}
close(nc.quitChan)
// TODO stop all ncs
}
func (nc *NotifyCenter) handleEvaluationResponse(result *ConsumerGroupStatus) {
for _, notifier := range nc.notifiers {
if !notifier.Ignore(result) {
notifier.Notify(result)
}
}
}
func (notifier *NotifyCenter) refreshConsumerGroups() {
@ -73,7 +134,7 @@ func (notifier *NotifyCenter) refreshConsumerGroups() {
if _, ok := clusterGroups[consumerGroup]; !ok {
// Add new consumer group and start checking it
log.Debugf("Start evaluating consumer group %s in cluster %s", consumerGroup, cluster)
log.Infof("Start evaluating consumer group %s in cluster %s", consumerGroup, cluster)
go notifier.startConsumerGroupEvaluator(consumerGroup, cluster)
}
clusterGroups[consumerGroup] = true
@ -111,36 +172,3 @@ func (notifier *NotifyCenter) startConsumerGroupEvaluator(group string, cluster
time.Sleep(time.Duration(notifier.interval) * time.Second)
}
}
func (notifier *NotifyCenter) Start() {
// Get a group list to start with (this will start the notifiers)
notifier.refreshConsumerGroups()
// Set a ticker to refresh the group list periodically
notifier.refreshTicker = time.NewTicker(time.Duration(notifier.app.Config.Lagcheck.ZKGroupRefresh) * time.Second)
// Main loop to handle refreshes and evaluation responses
go func() {
OUTERLOOP:
for {
select {
case <-notifier.quitChan:
break OUTERLOOP
case <-notifier.refreshTicker.C:
notifier.refreshConsumerGroups()
case result := <-notifier.resultsChannel:
go notifier.handleEvaluationResponse(result)
}
}
}()
}
func (notifier *NotifyCenter) Stop() {
if notifier.refreshTicker != nil {
notifier.refreshTicker.Stop()
notifier.groupLock.Lock()
notifier.groupList = make(map[string]map[string]bool)
notifier.groupLock.Unlock()
}
close(notifier.quitChan)
}