2015-06-02 16:03:10 +03:00
/ * Copyright 2015 LinkedIn Corp . Licensed under the Apache License , Version
* 2.0 ( the "License" ) ; you may not use this file except in compliance with
* the License . You may obtain a copy of the License at
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* /
package main
import (
"container/ring"
"encoding/json"
2016-01-30 01:04:00 +03:00
"fmt"
2015-07-30 20:40:03 +03:00
log "github.com/cihub/seelog"
2015-06-02 16:03:10 +03:00
"regexp"
"sync"
"time"
)
type PartitionOffset struct {
Cluster string
Topic string
Partition int32
Offset int64
Timestamp int64
Group string
TopicPartitionCount int
}
type BrokerOffset struct {
Offset int64
Timestamp int64
}
type ConsumerOffset struct {
2016-01-30 01:04:00 +03:00
Offset int64 ` json:"offset" `
Timestamp int64 ` json:"timestamp" `
Lag int64 ` json:"lag" `
artificial bool
2015-06-02 16:03:10 +03:00
}
type ClusterOffsets struct {
broker map [ string ] [ ] * BrokerOffset
consumer map [ string ] map [ string ] [ ] * ring . Ring
brokerLock * sync . RWMutex
consumerLock * sync . RWMutex
}
type OffsetStorage struct {
app * ApplicationContext
quit chan struct { }
offsetChannel chan * PartitionOffset
requestChannel chan interface { }
offsets map [ string ] * ClusterOffsets
groupBlacklist * regexp . Regexp
}
type StatusConstant int
const (
StatusNotFound StatusConstant = 0
StatusOK StatusConstant = 1
StatusWarning StatusConstant = 2
StatusError StatusConstant = 3
StatusStop StatusConstant = 4
StatusStall StatusConstant = 5
2015-07-30 20:40:03 +03:00
StatusRewind StatusConstant = 6
2015-06-02 16:03:10 +03:00
)
2015-07-30 20:40:03 +03:00
var StatusStrings = [ ... ] string { "NOTFOUND" , "OK" , "WARN" , "ERR" , "STOP" , "STALL" , "REWIND" }
2015-06-02 16:03:10 +03:00
func ( c StatusConstant ) String ( ) string {
2015-10-19 04:24:55 +03:00
if ( c >= 0 ) && ( c < StatusConstant ( len ( StatusStrings ) ) ) {
2015-06-02 16:03:10 +03:00
return StatusStrings [ c ]
} else {
return "UNKNOWN"
}
}
func ( c StatusConstant ) MarshalText ( ) ( [ ] byte , error ) {
return [ ] byte ( c . String ( ) ) , nil
}
func ( c StatusConstant ) MarshalJSON ( ) ( [ ] byte , error ) {
return json . Marshal ( c . String ( ) )
}
type PartitionStatus struct {
Topic string ` json:"topic" `
Partition int32 ` json:"partition" `
Status StatusConstant ` json:"status" `
Start ConsumerOffset ` json:"start" `
End ConsumerOffset ` json:"end" `
}
type ConsumerGroupStatus struct {
Cluster string ` json:"cluster" `
Group string ` json:"group" `
Status StatusConstant ` json:"status" `
Complete bool ` json:"complete" `
Partitions [ ] * PartitionStatus ` json:"partitions" `
2015-10-08 00:21:37 +03:00
Maxlag * PartitionStatus ` json:"maxlag" `
2015-06-02 16:03:10 +03:00
}
type ResponseTopicList struct {
TopicList [ ] string
Error bool
}
type ResponseOffsets struct {
OffsetList [ ] int64
ErrorGroup bool
ErrorTopic bool
}
type RequestClusterList struct {
Result chan [ ] string
}
type RequestConsumerList struct {
Result chan [ ] string
Cluster string
}
type RequestTopicList struct {
Result chan * ResponseTopicList
Cluster string
Group string
}
type RequestOffsets struct {
Result chan * ResponseOffsets
Cluster string
Topic string
Group string
}
type RequestConsumerStatus struct {
Result chan * ConsumerGroupStatus
Cluster string
Group string
2015-10-08 00:46:33 +03:00
Showall bool
2015-06-02 16:03:10 +03:00
}
2015-07-31 01:12:33 +03:00
type RequestConsumerDrop struct {
Result chan StatusConstant
Cluster string
Group string
}
2015-06-02 16:03:10 +03:00
func NewOffsetStorage ( app * ApplicationContext ) ( * OffsetStorage , error ) {
storage := & OffsetStorage {
app : app ,
quit : make ( chan struct { } ) ,
offsetChannel : make ( chan * PartitionOffset , 10000 ) ,
requestChannel : make ( chan interface { } ) ,
offsets : make ( map [ string ] * ClusterOffsets ) ,
}
if app . Config . General . GroupBlacklist != "" {
re , err := regexp . Compile ( app . Config . General . GroupBlacklist )
if err != nil {
return nil , err
}
storage . groupBlacklist = re
}
for cluster , _ := range app . Config . Kafka {
storage . offsets [ cluster ] = & ClusterOffsets {
broker : make ( map [ string ] [ ] * BrokerOffset ) ,
consumer : make ( map [ string ] map [ string ] [ ] * ring . Ring ) ,
brokerLock : & sync . RWMutex { } ,
consumerLock : & sync . RWMutex { } ,
}
}
go func ( ) {
for {
select {
case o := <- storage . offsetChannel :
if o . Group == "" {
go storage . addBrokerOffset ( o )
} else {
go storage . addConsumerOffset ( o )
}
case r := <- storage . requestChannel :
switch r . ( type ) {
case * RequestConsumerList :
request , _ := r . ( * RequestConsumerList )
go storage . requestConsumerList ( request )
case * RequestTopicList :
request , _ := r . ( * RequestTopicList )
go storage . requestTopicList ( request )
case * RequestOffsets :
request , _ := r . ( * RequestOffsets )
go storage . requestOffsets ( request )
case * RequestConsumerStatus :
request , _ := r . ( * RequestConsumerStatus )
2015-10-08 00:46:33 +03:00
go storage . evaluateGroup ( request . Cluster , request . Group , request . Result , request . Showall )
2015-07-31 01:12:33 +03:00
case * RequestConsumerDrop :
request , _ := r . ( * RequestConsumerDrop )
go storage . dropGroup ( request . Cluster , request . Group , request . Result )
2015-06-02 16:03:10 +03:00
default :
// Silently drop unknown requests
}
case <- storage . quit :
return
}
}
} ( )
return storage , nil
}
func ( storage * OffsetStorage ) addBrokerOffset ( offset * PartitionOffset ) {
2015-12-20 19:42:31 +03:00
clusterMap , ok := storage . offsets [ offset . Cluster ]
if ! ok {
2015-06-02 16:03:10 +03:00
// Ignore offsets for clusters that we don't know about - should never happen anyways
return
}
2015-12-20 19:42:31 +03:00
clusterMap . brokerLock . Lock ( )
topicList , ok := clusterMap . broker [ offset . Topic ]
if ! ok {
clusterMap . broker [ offset . Topic ] = make ( [ ] * BrokerOffset , offset . TopicPartitionCount )
topicList = clusterMap . broker [ offset . Topic ]
2015-06-02 16:03:10 +03:00
}
2015-12-20 19:42:31 +03:00
if offset . TopicPartitionCount >= len ( topicList ) {
2015-06-02 16:03:10 +03:00
// The partition count has increased. Append enough extra partitions to our slice
2015-12-20 19:42:31 +03:00
for i := len ( topicList ) ; i < offset . TopicPartitionCount ; i ++ {
topicList = append ( topicList , nil )
2015-06-02 16:03:10 +03:00
}
}
2015-12-20 19:42:31 +03:00
partitionEntry := topicList [ offset . Partition ]
if partitionEntry == nil {
2015-12-20 23:21:51 +03:00
topicList [ offset . Partition ] = & BrokerOffset {
2015-06-02 16:03:10 +03:00
Offset : offset . Offset ,
Timestamp : offset . Timestamp ,
}
2015-12-20 23:21:51 +03:00
partitionEntry = topicList [ offset . Partition ]
2015-06-02 16:03:10 +03:00
} else {
2015-12-20 19:42:31 +03:00
partitionEntry . Offset = offset . Offset
partitionEntry . Timestamp = offset . Timestamp
2015-06-02 16:03:10 +03:00
}
2015-12-20 19:42:31 +03:00
clusterMap . brokerLock . Unlock ( )
2015-06-02 16:03:10 +03:00
}
func ( storage * OffsetStorage ) addConsumerOffset ( offset * PartitionOffset ) {
// Ignore offsets for clusters that we don't know about - should never happen anyways
2015-12-20 19:00:28 +03:00
clusterOffsets , ok := storage . offsets [ offset . Cluster ]
if ! ok {
2015-06-02 16:03:10 +03:00
return
}
// Ignore groups that match our blacklist
if ( storage . groupBlacklist != nil ) && storage . groupBlacklist . MatchString ( offset . Group ) {
2016-01-30 01:04:00 +03:00
log . Debugf ( "Dropped offset (blacklist): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset )
2015-06-02 16:03:10 +03:00
return
}
// Get broker partition count and offset for this topic and partition first
2015-12-20 19:00:28 +03:00
clusterOffsets . brokerLock . RLock ( )
topicPartitionList , ok := clusterOffsets . broker [ offset . Topic ]
if ! ok {
// We don't know about this topic from the brokers yet - skip consumer offsets for now
clusterOffsets . brokerLock . RUnlock ( )
2016-01-30 01:04:00 +03:00
log . Debugf ( "Dropped offset (no topic): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset )
2015-12-20 19:00:28 +03:00
return
}
if offset . Partition < 0 {
// This should never happen, but if it does, log an warning with the offset information for review
log . Warnf ( "Got a negative partition ID: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset )
clusterOffsets . brokerLock . RUnlock ( )
return
}
if offset . Partition >= int32 ( len ( topicPartitionList ) ) {
// We know about the topic, but partitions have been expanded and we haven't seen that from the broker yet
clusterOffsets . brokerLock . RUnlock ( )
2016-01-30 01:04:00 +03:00
log . Debugf ( "Dropped offset (expanded): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset )
2015-12-20 19:00:28 +03:00
return
}
if topicPartitionList [ offset . Partition ] == nil {
// We know about the topic and partition, but we haven't actually gotten the broker offset yet
clusterOffsets . brokerLock . RUnlock ( )
2016-01-30 01:04:00 +03:00
log . Debugf ( "Dropped offset (broker offset): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset )
2015-06-02 16:03:10 +03:00
return
}
2015-12-20 19:00:28 +03:00
brokerOffset := topicPartitionList [ offset . Partition ] . Offset
partitionCount := len ( topicPartitionList )
clusterOffsets . brokerLock . RUnlock ( )
2015-06-02 16:03:10 +03:00
2015-12-20 19:00:28 +03:00
clusterOffsets . consumerLock . Lock ( )
consumerMap , ok := clusterOffsets . consumer [ offset . Group ]
if ! ok {
2015-12-20 19:42:31 +03:00
clusterOffsets . consumer [ offset . Group ] = make ( map [ string ] [ ] * ring . Ring )
consumerMap = clusterOffsets . consumer [ offset . Group ]
2015-06-02 16:03:10 +03:00
}
2015-12-20 19:00:28 +03:00
consumerTopicMap , ok := consumerMap [ offset . Topic ]
if ! ok {
2015-12-20 19:42:31 +03:00
consumerMap [ offset . Topic ] = make ( [ ] * ring . Ring , partitionCount )
consumerTopicMap = consumerMap [ offset . Topic ]
2015-06-02 16:03:10 +03:00
}
2015-12-20 19:00:28 +03:00
if int ( offset . Partition ) >= len ( consumerTopicMap ) {
2015-06-02 16:03:10 +03:00
// The partition count must have increased. Append enough extra partitions to our slice
2015-12-20 19:00:28 +03:00
for i := len ( consumerTopicMap ) ; i < partitionCount ; i ++ {
consumerTopicMap = append ( consumerTopicMap , nil )
2015-06-02 16:03:10 +03:00
}
}
2015-12-20 19:00:28 +03:00
consumerPartitionRing := consumerTopicMap [ offset . Partition ]
if consumerPartitionRing == nil {
2015-12-20 23:21:51 +03:00
consumerTopicMap [ offset . Partition ] = ring . New ( storage . app . Config . Lagcheck . Intervals )
consumerPartitionRing = consumerTopicMap [ offset . Partition ]
2015-09-22 20:36:12 +03:00
} else {
2016-01-30 01:04:00 +03:00
lastOffset := consumerPartitionRing . Prev ( ) . Value . ( * ConsumerOffset )
timestampDifference := offset . Timestamp - lastOffset . Timestamp
// Prevent old offset commits, but only if the offsets don't advance (because of artifical commits below)
if ( timestampDifference <= 0 ) && ( offset . Offset <= lastOffset . Offset ) {
clusterOffsets . consumerLock . Unlock ( )
log . Debugf ( "Dropped offset (noadvance): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v tsdiff=%v lag=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset ,
timestampDifference , brokerOffset - offset . Offset )
return
}
// Prevent new commits that are too fast (less than the min-distance config) if the last offset was not artificial
if ( ! lastOffset . artificial ) && ( timestampDifference >= 0 ) && ( timestampDifference < ( storage . app . Config . Lagcheck . MinDistance * 1000 ) ) {
2015-12-20 19:00:28 +03:00
clusterOffsets . consumerLock . Unlock ( )
2016-01-30 01:04:00 +03:00
log . Debugf ( "Dropped offset (mindistance): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v tsdiff=%v lag=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset ,
timestampDifference , brokerOffset - offset . Offset )
2015-09-22 20:36:12 +03:00
return
}
2015-06-02 16:03:10 +03:00
}
// Calculate the lag against the brokerOffset
partitionLag := brokerOffset - offset . Offset
if partitionLag < 0 {
// Little bit of a hack - because we only get broker offsets periodically, it's possible the consumer offset could be ahead of where we think the broker
// is. In this case, just mark it as zero lag.
partitionLag = 0
}
// Update or create the ring value at the current pointer
2015-12-20 19:00:28 +03:00
if consumerPartitionRing . Value == nil {
consumerPartitionRing . Value = & ConsumerOffset {
2016-01-30 01:04:00 +03:00
Offset : offset . Offset ,
Timestamp : offset . Timestamp ,
Lag : partitionLag ,
artificial : false ,
2015-06-02 16:03:10 +03:00
}
} else {
2015-12-20 19:00:28 +03:00
ringval , _ := consumerPartitionRing . Value . ( * ConsumerOffset )
2015-06-02 16:03:10 +03:00
ringval . Offset = offset . Offset
ringval . Timestamp = offset . Timestamp
ringval . Lag = partitionLag
2016-01-30 01:04:00 +03:00
ringval . artificial = false
2015-06-02 16:03:10 +03:00
}
2016-01-30 01:04:00 +03:00
log . Tracef ( "Commit offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=%v" ,
offset . Cluster , offset . Topic , offset . Partition , offset . Group , offset . Timestamp , offset . Offset ,
partitionLag )
2015-06-02 16:03:10 +03:00
// Advance the ring pointer
2015-12-20 23:21:51 +03:00
consumerTopicMap [ offset . Partition ] = consumerTopicMap [ offset . Partition ] . Next ( )
2015-12-20 19:00:28 +03:00
clusterOffsets . consumerLock . Unlock ( )
2015-06-02 16:03:10 +03:00
}
func ( storage * OffsetStorage ) Stop ( ) {
close ( storage . quit )
}
2015-07-31 01:12:33 +03:00
func ( storage * OffsetStorage ) dropGroup ( cluster string , group string , resultChannel chan StatusConstant ) {
storage . offsets [ cluster ] . consumerLock . Lock ( )
if _ , ok := storage . offsets [ cluster ] . consumer [ group ] ; ok {
log . Infof ( "Removing group %s from cluster %s by request" , group , cluster )
delete ( storage . offsets [ cluster ] . consumer , group )
resultChannel <- StatusOK
} else {
resultChannel <- StatusNotFound
}
storage . offsets [ cluster ] . consumerLock . Unlock ( )
}
2015-06-02 16:03:10 +03:00
// Evaluate a consumer group based on specific rules about lag
2015-12-21 22:37:31 +03:00
// Rule 1: If over the stored period, the lag is ever zero for the partition, the period is OK
// Rule 2: If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled)
// Rule 3: If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow)
// Rule 4: If the difference between now and the last offset timestamp is greater than the difference between the last and first offset timestamps, the
// consumer has stopped committing offsets for that partition (error), unless
// Rule 5: If the lag is -1, this is a special value that means there is no broker offset yet. Consider it good (will get caught in the next refresh of topics)
// Rule 6: If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error)
2015-10-08 00:46:33 +03:00
func ( storage * OffsetStorage ) evaluateGroup ( cluster string , group string , resultChannel chan * ConsumerGroupStatus , showall bool ) {
2015-06-02 16:03:10 +03:00
status := & ConsumerGroupStatus {
Cluster : cluster ,
Group : group ,
Status : StatusNotFound ,
Complete : true ,
Partitions : make ( [ ] * PartitionStatus , 0 ) ,
2015-10-08 00:21:37 +03:00
Maxlag : nil ,
2015-06-02 16:03:10 +03:00
}
2015-11-03 23:44:59 +03:00
// Make sure the cluster exists
2015-12-20 19:42:31 +03:00
clusterMap , ok := storage . offsets [ cluster ]
if ! ok {
2015-11-03 23:44:59 +03:00
resultChannel <- status
return
}
2015-06-02 16:03:10 +03:00
// Make sure the group even exists
2016-01-30 01:04:00 +03:00
clusterMap . consumerLock . Lock ( )
2015-12-20 19:42:31 +03:00
consumerMap , ok := clusterMap . consumer [ group ]
if ! ok {
2016-01-30 01:04:00 +03:00
clusterMap . consumerLock . Unlock ( )
2015-06-02 16:03:10 +03:00
resultChannel <- status
return
}
// Scan the offsets table once and store all the offsets for the group locally
status . Status = StatusOK
2015-12-20 19:42:31 +03:00
offsetList := make ( map [ string ] [ ] [ ] ConsumerOffset , len ( consumerMap ) )
2015-07-30 20:40:03 +03:00
var youngestOffset int64
2015-12-20 19:42:31 +03:00
for topic , partitions := range consumerMap {
2015-06-02 16:03:10 +03:00
offsetList [ topic ] = make ( [ ] [ ] ConsumerOffset , len ( partitions ) )
for partition , offsetRing := range partitions {
// If we don't have our ring full yet, make sure we let the caller know
2015-10-22 02:10:13 +03:00
if ( offsetRing == nil ) || ( offsetRing . Value == nil ) {
2015-06-02 16:03:10 +03:00
status . Complete = false
continue
}
2016-03-08 02:23:43 +03:00
// Add an artificial offset commit if the consumer has no lag against the current broker offset
2016-01-30 01:04:00 +03:00
lastOffset := offsetRing . Prev ( ) . Value . ( * ConsumerOffset )
2016-03-08 02:23:43 +03:00
if lastOffset . Offset >= clusterMap . broker [ topic ] [ partition ] . Offset {
2016-01-30 01:04:00 +03:00
ringval , _ := offsetRing . Value . ( * ConsumerOffset )
ringval . Offset = lastOffset . Offset
ringval . Timestamp = time . Now ( ) . Unix ( ) * 1000
ringval . Lag = 0
ringval . artificial = true
partitions [ partition ] = partitions [ partition ] . Next ( )
log . Tracef ( "Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=0" ,
cluster , topic , partition , group , ringval . Timestamp , lastOffset . Offset )
}
2015-06-02 16:03:10 +03:00
// Pull out the offsets once so we can unlock the map
2015-06-18 21:33:17 +03:00
offsetList [ topic ] [ partition ] = make ( [ ] ConsumerOffset , storage . app . Config . Lagcheck . Intervals )
2015-12-20 19:42:31 +03:00
partitionMap := offsetList [ topic ] [ partition ]
2015-06-02 16:03:10 +03:00
idx := - 1
2016-01-30 01:04:00 +03:00
partitions [ partition ] . Do ( func ( val interface { } ) {
2015-06-02 16:03:10 +03:00
idx += 1
ptr , _ := val . ( * ConsumerOffset )
2015-12-20 19:42:31 +03:00
partitionMap [ idx ] = * ptr
2015-06-02 16:03:10 +03:00
2015-07-30 20:40:03 +03:00
// Track the youngest offset we have found to check expiration
2015-12-20 19:42:31 +03:00
if partitionMap [ idx ] . Timestamp > youngestOffset {
youngestOffset = partitionMap [ idx ] . Timestamp
2015-07-30 20:40:03 +03:00
}
2015-06-18 21:33:17 +03:00
} )
2015-06-02 16:03:10 +03:00
}
}
2015-06-18 21:33:17 +03:00
2015-10-22 02:10:13 +03:00
// If the youngest offset is earlier than our expiration window, flush the group
2015-07-30 20:40:03 +03:00
if ( youngestOffset > 0 ) && ( youngestOffset < ( ( time . Now ( ) . Unix ( ) - storage . app . Config . Lagcheck . ExpireGroup ) * 1000 ) ) {
log . Infof ( "Removing expired group %s from cluster %s" , group , cluster )
2015-12-20 19:42:31 +03:00
delete ( clusterMap . consumer , group )
clusterMap . consumerLock . Unlock ( )
2015-06-02 16:03:10 +03:00
2015-07-30 20:40:03 +03:00
// Return the group as a 404
status . Status = StatusNotFound
2015-06-02 16:03:10 +03:00
resultChannel <- status
return
2015-07-30 20:40:03 +03:00
}
2016-01-30 01:04:00 +03:00
clusterMap . consumerLock . Unlock ( )
2015-06-02 16:03:10 +03:00
2015-10-08 00:46:33 +03:00
var maxlag int64
2015-06-02 16:03:10 +03:00
for topic , partitions := range offsetList {
for partition , offsets := range partitions {
// Skip partitions we're missing offsets for
if len ( offsets ) == 0 {
continue
}
maxidx := len ( offsets ) - 1
2015-12-20 19:42:31 +03:00
firstOffset := offsets [ 0 ]
lastOffset := offsets [ maxidx ]
2015-06-02 16:03:10 +03:00
2015-07-30 20:40:03 +03:00
// Rule 5 - we're missing broker offsets so we're not complete yet
2015-12-20 19:42:31 +03:00
if firstOffset . Lag == - 1 {
2015-06-02 16:03:10 +03:00
status . Complete = false
continue
}
2015-10-08 00:46:33 +03:00
// We may always add this partition, so create it once
thispart := & PartitionStatus {
Topic : topic ,
Partition : int32 ( partition ) ,
Status : StatusOK ,
2015-12-20 19:42:31 +03:00
Start : firstOffset ,
End : lastOffset ,
2015-10-08 00:46:33 +03:00
}
2015-10-08 00:21:37 +03:00
// Check if this partition is the one with the most lag currently
2015-12-20 19:42:31 +03:00
if lastOffset . Lag > maxlag {
2015-10-08 00:46:33 +03:00
status . Maxlag = thispart
2015-10-08 00:21:37 +03:00
}
2015-07-30 20:40:03 +03:00
// Rule 4 - Offsets haven't been committed in a while
2015-12-20 19:42:31 +03:00
if ( ( time . Now ( ) . Unix ( ) * 1000 ) - lastOffset . Timestamp ) > ( lastOffset . Timestamp - firstOffset . Timestamp ) {
2016-01-30 01:04:00 +03:00
status . Status = StatusError
thispart . Status = StatusStop
status . Partitions = append ( status . Partitions , thispart )
continue
2015-06-02 16:03:10 +03:00
}
2015-07-30 20:40:03 +03:00
// Rule 6 - Did the consumer offsets rewind at any point?
// We check this first because we always want to know about a rewind - it's bad behavior
for i := 1 ; i <= maxidx ; i ++ {
if offsets [ i ] . Offset < offsets [ i - 1 ] . Offset {
status . Status = StatusError
2015-10-08 00:46:33 +03:00
thispart . Status = StatusRewind
status . Partitions = append ( status . Partitions , thispart )
2015-07-30 20:40:03 +03:00
continue
}
}
2015-06-02 16:03:10 +03:00
// Rule 1
2015-12-20 19:42:31 +03:00
if lastOffset . Lag == 0 {
2015-10-08 00:46:33 +03:00
if showall {
status . Partitions = append ( status . Partitions , thispart )
}
2015-06-02 16:03:10 +03:00
continue
}
2015-12-20 19:42:31 +03:00
if lastOffset . Offset == firstOffset . Offset {
2015-06-02 16:03:10 +03:00
// Rule 1
2015-12-20 19:42:31 +03:00
if firstOffset . Lag == 0 {
2015-10-08 00:46:33 +03:00
if showall {
status . Partitions = append ( status . Partitions , thispart )
}
2015-06-02 16:03:10 +03:00
continue
}
// Rule 2
status . Status = StatusError
2015-10-08 00:46:33 +03:00
thispart . Status = StatusStall
2015-06-02 16:03:10 +03:00
} else {
// Rule 1 passes, or shortcut a full check on Rule 3 if we can
2015-12-20 19:42:31 +03:00
if ( firstOffset . Lag == 0 ) || ( lastOffset . Lag <= firstOffset . Lag ) {
2015-10-08 00:46:33 +03:00
if showall {
status . Partitions = append ( status . Partitions , thispart )
}
2015-06-02 16:03:10 +03:00
continue
}
lagDropped := false
for i := 0 ; i <= maxidx ; i ++ {
// Rule 1 passes or Rule 3 is shortcut (lag dropped somewhere in the period)
if ( offsets [ i ] . Lag == 0 ) || ( ( i > 0 ) && ( offsets [ i ] . Lag < offsets [ i - 1 ] . Lag ) ) {
lagDropped = true
break
}
}
if ! lagDropped {
// Rule 3
if status . Status == StatusOK {
status . Status = StatusWarning
}
2015-10-08 00:46:33 +03:00
thispart . Status = StatusWarning
2015-06-02 16:03:10 +03:00
}
}
2015-10-08 00:46:33 +03:00
// Always add the partition if it's not OK
if ( thispart . Status != StatusOK ) || showall {
status . Partitions = append ( status . Partitions , thispart )
}
2015-06-02 16:03:10 +03:00
}
}
2015-10-08 00:46:33 +03:00
resultChannel <- status
2015-06-02 16:03:10 +03:00
}
func ( storage * OffsetStorage ) requestClusterList ( request * RequestClusterList ) {
clusterList := make ( [ ] string , len ( storage . offsets ) )
i := 0
for group , _ := range storage . offsets {
clusterList [ i ] = group
i += 1
}
request . Result <- clusterList
}
func ( storage * OffsetStorage ) requestConsumerList ( request * RequestConsumerList ) {
if _ , ok := storage . offsets [ request . Cluster ] ; ! ok {
request . Result <- make ( [ ] string , 0 )
return
}
storage . offsets [ request . Cluster ] . consumerLock . RLock ( )
consumerList := make ( [ ] string , len ( storage . offsets [ request . Cluster ] . consumer ) )
i := 0
for group := range storage . offsets [ request . Cluster ] . consumer {
consumerList [ i ] = group
i += 1
}
storage . offsets [ request . Cluster ] . consumerLock . RUnlock ( )
request . Result <- consumerList
}
func ( storage * OffsetStorage ) requestTopicList ( request * RequestTopicList ) {
if _ , ok := storage . offsets [ request . Cluster ] ; ! ok {
request . Result <- & ResponseTopicList { Error : true }
return
}
response := & ResponseTopicList { Error : false }
if request . Group == "" {
storage . offsets [ request . Cluster ] . brokerLock . RLock ( )
response . TopicList = make ( [ ] string , len ( storage . offsets [ request . Cluster ] . broker ) )
i := 0
for topic := range storage . offsets [ request . Cluster ] . broker {
response . TopicList [ i ] = topic
i += 1
}
storage . offsets [ request . Cluster ] . brokerLock . RUnlock ( )
} else {
storage . offsets [ request . Cluster ] . consumerLock . RLock ( )
if _ , ok := storage . offsets [ request . Cluster ] . consumer [ request . Group ] ; ok {
response . TopicList = make ( [ ] string , len ( storage . offsets [ request . Cluster ] . consumer [ request . Group ] ) )
i := 0
for topic := range storage . offsets [ request . Cluster ] . consumer [ request . Group ] {
response . TopicList [ i ] = topic
i += 1
}
} else {
response . Error = true
}
storage . offsets [ request . Cluster ] . consumerLock . RUnlock ( )
}
request . Result <- response
}
func ( storage * OffsetStorage ) requestOffsets ( request * RequestOffsets ) {
if _ , ok := storage . offsets [ request . Cluster ] ; ! ok {
request . Result <- & ResponseOffsets { ErrorTopic : true , ErrorGroup : true }
return
}
response := & ResponseOffsets { ErrorGroup : false , ErrorTopic : false }
if request . Group == "" {
storage . offsets [ request . Cluster ] . brokerLock . RLock ( )
if _ , ok := storage . offsets [ request . Cluster ] . broker [ request . Topic ] ; ok {
response . OffsetList = make ( [ ] int64 , len ( storage . offsets [ request . Cluster ] . broker [ request . Topic ] ) )
for partition , offset := range storage . offsets [ request . Cluster ] . broker [ request . Topic ] {
if offset == nil {
response . OffsetList [ partition ] = - 1
} else {
response . OffsetList [ partition ] = offset . Offset
}
}
} else {
response . ErrorTopic = true
}
storage . offsets [ request . Cluster ] . brokerLock . RUnlock ( )
} else {
storage . offsets [ request . Cluster ] . consumerLock . RLock ( )
if _ , ok := storage . offsets [ request . Cluster ] . consumer [ request . Group ] ; ok {
if _ , ok := storage . offsets [ request . Cluster ] . consumer [ request . Group ] [ request . Topic ] ; ok {
response . OffsetList = make ( [ ] int64 , len ( storage . offsets [ request . Cluster ] . consumer [ request . Group ] [ request . Topic ] ) )
for partition , oring := range storage . offsets [ request . Cluster ] . consumer [ request . Group ] [ request . Topic ] {
2015-06-08 23:52:13 +03:00
if oring == nil {
2015-06-02 16:03:10 +03:00
response . OffsetList [ partition ] = - 1
} else {
2015-06-08 23:52:13 +03:00
offset , _ := oring . Prev ( ) . Value . ( * ConsumerOffset )
if offset == nil {
response . OffsetList [ partition ] = - 1
} else {
response . OffsetList [ partition ] = offset . Offset
}
2015-06-02 16:03:10 +03:00
}
}
} else {
response . ErrorTopic = true
}
} else {
response . ErrorGroup = true
}
storage . offsets [ request . Cluster ] . consumerLock . RUnlock ( )
}
request . Result <- response
}
2016-01-30 01:04:00 +03:00
func ( storage * OffsetStorage ) debugPrintGroup ( cluster string , group string ) {
// Make sure the cluster exists
clusterMap , ok := storage . offsets [ cluster ]
if ! ok {
log . Debugf ( "Detail cluster=%s,group=%s: No Cluster" , cluster , group )
return
}
// Make sure the group even exists
clusterMap . consumerLock . RLock ( )
consumerMap , ok := clusterMap . consumer [ group ]
if ! ok {
clusterMap . consumerLock . RUnlock ( )
log . Debugf ( "Detail cluster=%s,group=%s: No Group" , cluster , group )
return
}
// Scan the offsets table and print all partitions (full ring) for the group
for topic , partitions := range consumerMap {
for partition , offsetRing := range partitions {
if ( offsetRing == nil ) || ( offsetRing . Value == nil ) {
log . Debugf ( "Detail cluster=%s,group=%s,topic=%s,partition=%v: No Ring" , cluster , group , topic , partition )
continue
}
// Pull out the offsets once so we can unlock the map
ringStr := ""
offsetRing . Do ( func ( val interface { } ) {
if val == nil {
ringStr += "(),"
} else {
ptr , _ := val . ( * ConsumerOffset )
ringStr += fmt . Sprintf ( "(%v,%v,%v,%v)" , ptr . Timestamp , ptr . Offset , ptr . Lag , ptr . artificial )
}
} )
log . Debugf ( "Detail cluster=%s,group=%s,topic=%s,partition=%v: %s" , cluster , group , topic , partition , ringStr )
}
}
clusterMap . consumerLock . RUnlock ( )
}