Add a total lag calculation for the consumer

This commit is contained in:
Todd Palino 2016-05-01 20:57:26 -07:00
Родитель c517af1a1f
Коммит 0dfeddb5c1
3 изменённых файлов: 19 добавлений и 7 удалений

Просмотреть файл

@ -86,3 +86,11 @@ func templateMultiply(a, b int) int {
func templateDivide(a, b int) int {
return a / b
}
func maxLagHelper(a *PartitionStatus) int64 {
if a == nil {
return 0
} else {
return a.End.Lag
}
}

Просмотреть файл

@ -48,13 +48,14 @@ type Event struct {
func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
// Helper functions for templates
fmap := template.FuncMap{
"jsonencoder": templateJsonEncoder,
"topicsbystatus": classifyTopicsByStatus,
"partitioncounts": templateCountPartitions,
"add": templateAdd,
"minus": templateMinus,
"multiply": templateMultiply,
"divide": templateDivide,
"jsonencoder": templateJsonEncoder,
"topicsbystatus": classifyTopicsByStatus,
"partitioncounts": templateCountPartitions,
"add": templateAdd,
"minus": templateMinus,
"multiply": templateMultiply,
"divide": templateDivide,
"maxlag": maxLagHelper,
}
// Compile the templates

Просмотреть файл

@ -101,6 +101,7 @@ type ConsumerGroupStatus struct {
Partitions []*PartitionStatus `json:"partitions"`
TotalPartitions int `json:"partition_count"`
Maxlag *PartitionStatus `json:"maxlag"`
TotalLag uint64 `json:"totallag"`
}
type ResponseTopicList struct {
@ -402,6 +403,7 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
Complete: true,
Partitions: make([]*PartitionStatus, 0),
Maxlag: nil,
TotalLag: 0,
}
// Make sure the cluster exists
@ -509,6 +511,7 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
if lastOffset.Lag > maxlag {
status.Maxlag = thispart
}
status.TotalLag += uint64(lastOffset.Lag)
// Rule 4 - Offsets haven't been committed in a while
if ((time.Now().Unix() * 1000) - lastOffset.Timestamp) > (lastOffset.Timestamp - firstOffset.Timestamp) {