зеркало из https://github.com/microsoft/Burrow.git
Add a template helper for categorizing topics by status code. Move helpers to a separate file
This commit is contained in:
Родитель
f8e86e0c7c
Коммит
176731a734
|
@ -1 +1 @@
|
|||
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.Id}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","complete":{{.Result.Complete}},"partitions":{{call .JsonEncode .Result.Partitions}}}}]}
|
||||
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.Id}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// Helper function for the templates to encode an object into a JSON string
|
||||
func templateJsonEncoder(encodeMe interface{}) string {
|
||||
jsonStr, _ := json.Marshal(encodeMe)
|
||||
return string(jsonStr)
|
||||
}
|
||||
|
||||
// Helper - recategorize partitions as a map of lists
|
||||
// map[string][]string => status short name -> list of topics
|
||||
func classifyTopicsByStatus(partitions []*PartitionStatus) map[string][]string {
|
||||
tmp_map := make(map[string]map[string]bool)
|
||||
for _, partition := range partitions {
|
||||
if _, ok := tmp_map[partition.Status.String()]; !ok {
|
||||
tmp_map[partition.Status.String()] = make(map[string]bool)
|
||||
}
|
||||
tmp_map[partition.Status.String()][partition.Topic] = true
|
||||
}
|
||||
|
||||
rv := make(map[string][]string)
|
||||
for status, topicMap := range tmp_map {
|
||||
rv[status] = make([]string, 0, len(topicMap))
|
||||
for topic, _ := range topicMap {
|
||||
rv[status] = append(rv[status], topic)
|
||||
}
|
||||
}
|
||||
|
||||
return rv
|
||||
}
|
|
@ -12,7 +12,6 @@ package main
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/pborman/uuid"
|
||||
"net/http"
|
||||
|
@ -39,17 +38,26 @@ type Event struct {
|
|||
}
|
||||
|
||||
func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
|
||||
// Helper functions for templates
|
||||
fmap := template.FuncMap{
|
||||
"jsonencoder": templateJsonEncoder,
|
||||
"topicsbystatus": classifyTopicsByStatus,
|
||||
}
|
||||
|
||||
// Compile the templates
|
||||
templatePost, err := template.ParseFiles(app.Config.Httpnotifier.TemplatePost)
|
||||
templatePost, err := template.New("post").Funcs(fmap).ParseFiles(app.Config.Httpnotifier.TemplatePost)
|
||||
if err != nil {
|
||||
log.Criticalf("Cannot parse HTTP notifier POST template: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
templateDelete, err := template.ParseFiles(app.Config.Httpnotifier.TemplateDelete)
|
||||
templatePost = templatePost.Templates()[0]
|
||||
|
||||
templateDelete, err := template.New("delete").Funcs(fmap).ParseFiles(app.Config.Httpnotifier.TemplateDelete)
|
||||
if err != nil {
|
||||
log.Criticalf("Cannot parse HTTP notifier DELETE template: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
templateDelete = templateDelete.Templates()[0]
|
||||
|
||||
// Parse the extra parameters for the templates
|
||||
extras := make(map[string]string)
|
||||
|
@ -69,12 +77,6 @@ func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Helper function for the templates to encode an object into a JSON string
|
||||
func templateJsonEncoder(encodeMe interface{}) string {
|
||||
jsonStr, _ := json.Marshal(encodeMe)
|
||||
return string(jsonStr)
|
||||
}
|
||||
|
||||
func (notifier *HttpNotifier) sendEvaluationRequests() {
|
||||
for cluster, _ := range notifier.app.Config.Kafka {
|
||||
// Get a current list of consumer groups
|
||||
|
@ -102,6 +104,7 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
|
|||
notifier.groupIds[result.Cluster][result.Group] = eventId.String()
|
||||
}
|
||||
|
||||
// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
|
||||
bytesToSend := new(bytes.Buffer)
|
||||
err := notifier.templatePost.Execute(bytesToSend, struct {
|
||||
Cluster string
|
||||
|
|
|
@ -272,7 +272,7 @@ func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
|
|||
} else {
|
||||
// The minimum time as configured since the last offset commit has gone by
|
||||
previousTimestamp := storage.offsets[offset.Cluster].consumer[offset.Group][offset.Topic][offset.Partition].Prev().Value.(*ConsumerOffset).Timestamp
|
||||
if offset.Timestamp - previousTimestamp < (storage.app.Config.Lagcheck.MinDistance * 1000) {
|
||||
if offset.Timestamp-previousTimestamp < (storage.app.Config.Lagcheck.MinDistance * 1000) {
|
||||
storage.offsets[offset.Cluster].consumerLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче