Burrow/offsets_store.go

657 строки
20 KiB
Go
Исходник Обычный вид История

2015-06-02 16:03:10 +03:00
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
import (
"container/ring"
"encoding/json"
2015-07-30 20:40:03 +03:00
log "github.com/cihub/seelog"
2015-06-02 16:03:10 +03:00
"regexp"
"sync"
"time"
)
type PartitionOffset struct {
Cluster string
Topic string
Partition int32
Offset int64
Timestamp int64
Group string
TopicPartitionCount int
}
type BrokerOffset struct {
Offset int64
Timestamp int64
}
type ConsumerOffset struct {
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"`
Lag int64 `json:"lag"`
}
type ClusterOffsets struct {
broker map[string][]*BrokerOffset
consumer map[string]map[string][]*ring.Ring
brokerLock *sync.RWMutex
consumerLock *sync.RWMutex
}
type OffsetStorage struct {
app *ApplicationContext
quit chan struct{}
offsetChannel chan *PartitionOffset
requestChannel chan interface{}
offsets map[string]*ClusterOffsets
groupBlacklist *regexp.Regexp
}
type StatusConstant int
const (
StatusNotFound StatusConstant = 0
StatusOK StatusConstant = 1
StatusWarning StatusConstant = 2
StatusError StatusConstant = 3
StatusStop StatusConstant = 4
StatusStall StatusConstant = 5
2015-07-30 20:40:03 +03:00
StatusRewind StatusConstant = 6
2015-06-02 16:03:10 +03:00
)
2015-07-30 20:40:03 +03:00
var StatusStrings = [...]string{"NOTFOUND", "OK", "WARN", "ERR", "STOP", "STALL", "REWIND"}
2015-06-02 16:03:10 +03:00
func (c StatusConstant) String() string {
if (c >= 0) && (c < StatusConstant(len(StatusStrings))) {
2015-06-02 16:03:10 +03:00
return StatusStrings[c]
} else {
return "UNKNOWN"
}
}
func (c StatusConstant) MarshalText() ([]byte, error) {
return []byte(c.String()), nil
}
func (c StatusConstant) MarshalJSON() ([]byte, error) {
return json.Marshal(c.String())
}
type PartitionStatus struct {
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Status StatusConstant `json:"status"`
Start ConsumerOffset `json:"start"`
End ConsumerOffset `json:"end"`
}
type ConsumerGroupStatus struct {
Cluster string `json:"cluster"`
Group string `json:"group"`
Status StatusConstant `json:"status"`
Complete bool `json:"complete"`
Partitions []*PartitionStatus `json:"partitions"`
Maxlag *PartitionStatus `json:"maxlag"`
2015-06-02 16:03:10 +03:00
}
type ResponseTopicList struct {
TopicList []string
Error bool
}
type ResponseOffsets struct {
OffsetList []int64
ErrorGroup bool
ErrorTopic bool
}
type RequestClusterList struct {
Result chan []string
}
type RequestConsumerList struct {
Result chan []string
Cluster string
}
type RequestTopicList struct {
Result chan *ResponseTopicList
Cluster string
Group string
}
type RequestOffsets struct {
Result chan *ResponseOffsets
Cluster string
Topic string
Group string
}
type RequestConsumerStatus struct {
Result chan *ConsumerGroupStatus
Cluster string
Group string
Showall bool
2015-06-02 16:03:10 +03:00
}
type RequestConsumerDrop struct {
Result chan StatusConstant
Cluster string
Group string
}
2015-06-02 16:03:10 +03:00
func NewOffsetStorage(app *ApplicationContext) (*OffsetStorage, error) {
storage := &OffsetStorage{
app: app,
quit: make(chan struct{}),
offsetChannel: make(chan *PartitionOffset, 10000),
requestChannel: make(chan interface{}),
offsets: make(map[string]*ClusterOffsets),
}
if app.Config.General.GroupBlacklist != "" {
re, err := regexp.Compile(app.Config.General.GroupBlacklist)
if err != nil {
return nil, err
}
storage.groupBlacklist = re
}
for cluster, _ := range app.Config.Kafka {
storage.offsets[cluster] = &ClusterOffsets{
broker: make(map[string][]*BrokerOffset),
consumer: make(map[string]map[string][]*ring.Ring),
brokerLock: &sync.RWMutex{},
consumerLock: &sync.RWMutex{},
}
}
go func() {
for {
select {
case o := <-storage.offsetChannel:
if o.Group == "" {
go storage.addBrokerOffset(o)
} else {
go storage.addConsumerOffset(o)
}
case r := <-storage.requestChannel:
switch r.(type) {
case *RequestConsumerList:
request, _ := r.(*RequestConsumerList)
go storage.requestConsumerList(request)
case *RequestTopicList:
request, _ := r.(*RequestTopicList)
go storage.requestTopicList(request)
case *RequestOffsets:
request, _ := r.(*RequestOffsets)
go storage.requestOffsets(request)
case *RequestConsumerStatus:
request, _ := r.(*RequestConsumerStatus)
go storage.evaluateGroup(request.Cluster, request.Group, request.Result, request.Showall)
case *RequestConsumerDrop:
request, _ := r.(*RequestConsumerDrop)
go storage.dropGroup(request.Cluster, request.Group, request.Result)
2015-06-02 16:03:10 +03:00
default:
// Silently drop unknown requests
}
case <-storage.quit:
return
}
}
}()
return storage, nil
}
func (storage *OffsetStorage) addBrokerOffset(offset *PartitionOffset) {
clusterMap, ok := storage.offsets[offset.Cluster]
if !ok {
2015-06-02 16:03:10 +03:00
// Ignore offsets for clusters that we don't know about - should never happen anyways
return
}
clusterMap.brokerLock.Lock()
topicList, ok := clusterMap.broker[offset.Topic]
if !ok {
clusterMap.broker[offset.Topic] = make([]*BrokerOffset, offset.TopicPartitionCount)
topicList = clusterMap.broker[offset.Topic]
2015-06-02 16:03:10 +03:00
}
if offset.TopicPartitionCount >= len(topicList) {
2015-06-02 16:03:10 +03:00
// The partition count has increased. Append enough extra partitions to our slice
for i := len(topicList); i < offset.TopicPartitionCount; i++ {
topicList = append(topicList, nil)
2015-06-02 16:03:10 +03:00
}
}
partitionEntry := topicList[offset.Partition]
if partitionEntry == nil {
topicList[offset.Partition] = &BrokerOffset{
2015-06-02 16:03:10 +03:00
Offset: offset.Offset,
Timestamp: offset.Timestamp,
}
partitionEntry = topicList[offset.Partition]
2015-06-02 16:03:10 +03:00
} else {
partitionEntry.Offset = offset.Offset
partitionEntry.Timestamp = offset.Timestamp
2015-06-02 16:03:10 +03:00
}
clusterMap.brokerLock.Unlock()
2015-06-02 16:03:10 +03:00
}
func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
// Ignore offsets for clusters that we don't know about - should never happen anyways
2015-12-20 19:00:28 +03:00
clusterOffsets, ok := storage.offsets[offset.Cluster]
if !ok {
2015-06-02 16:03:10 +03:00
return
}
// Ignore groups that match our blacklist
if (storage.groupBlacklist != nil) && storage.groupBlacklist.MatchString(offset.Group) {
return
}
// Get broker partition count and offset for this topic and partition first
2015-12-20 19:00:28 +03:00
clusterOffsets.brokerLock.RLock()
topicPartitionList, ok := clusterOffsets.broker[offset.Topic]
if !ok {
// We don't know about this topic from the brokers yet - skip consumer offsets for now
clusterOffsets.brokerLock.RUnlock()
return
}
if offset.Partition < 0 {
// This should never happen, but if it does, log an warning with the offset information for review
log.Warnf("Got a negative partition ID: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset)
clusterOffsets.brokerLock.RUnlock()
return
}
if offset.Partition >= int32(len(topicPartitionList)) {
// We know about the topic, but partitions have been expanded and we haven't seen that from the broker yet
clusterOffsets.brokerLock.RUnlock()
return
}
if topicPartitionList[offset.Partition] == nil {
// We know about the topic and partition, but we haven't actually gotten the broker offset yet
clusterOffsets.brokerLock.RUnlock()
2015-06-02 16:03:10 +03:00
return
}
2015-12-20 19:00:28 +03:00
brokerOffset := topicPartitionList[offset.Partition].Offset
partitionCount := len(topicPartitionList)
clusterOffsets.brokerLock.RUnlock()
2015-06-02 16:03:10 +03:00
2015-12-20 19:00:28 +03:00
clusterOffsets.consumerLock.Lock()
consumerMap, ok := clusterOffsets.consumer[offset.Group]
if !ok {
clusterOffsets.consumer[offset.Group] = make(map[string][]*ring.Ring)
consumerMap = clusterOffsets.consumer[offset.Group]
2015-06-02 16:03:10 +03:00
}
2015-12-20 19:00:28 +03:00
consumerTopicMap, ok := consumerMap[offset.Topic]
if !ok {
consumerMap[offset.Topic] = make([]*ring.Ring, partitionCount)
consumerTopicMap = consumerMap[offset.Topic]
2015-06-02 16:03:10 +03:00
}
2015-12-20 19:00:28 +03:00
if int(offset.Partition) >= len(consumerTopicMap) {
2015-06-02 16:03:10 +03:00
// The partition count must have increased. Append enough extra partitions to our slice
2015-12-20 19:00:28 +03:00
for i := len(consumerTopicMap); i < partitionCount; i++ {
consumerTopicMap = append(consumerTopicMap, nil)
2015-06-02 16:03:10 +03:00
}
}
2015-12-20 19:00:28 +03:00
consumerPartitionRing := consumerTopicMap[offset.Partition]
if consumerPartitionRing == nil {
consumerTopicMap[offset.Partition] = ring.New(storage.app.Config.Lagcheck.Intervals)
consumerPartitionRing = consumerTopicMap[offset.Partition]
} else {
// Prevent old offset commits, and new commits that are too fast (less than the min-distance config)
2015-12-20 19:00:28 +03:00
previousTimestamp := consumerPartitionRing.Prev().Value.(*ConsumerOffset).Timestamp
2015-10-22 02:10:13 +03:00
if offset.Timestamp-previousTimestamp < (storage.app.Config.Lagcheck.MinDistance * 1000) {
2015-12-20 19:00:28 +03:00
clusterOffsets.consumerLock.Unlock()
return
}
2015-06-02 16:03:10 +03:00
}
// Calculate the lag against the brokerOffset
partitionLag := brokerOffset - offset.Offset
if partitionLag < 0 {
// Little bit of a hack - because we only get broker offsets periodically, it's possible the consumer offset could be ahead of where we think the broker
// is. In this case, just mark it as zero lag.
partitionLag = 0
}
// Update or create the ring value at the current pointer
2015-12-20 19:00:28 +03:00
if consumerPartitionRing.Value == nil {
consumerPartitionRing.Value = &ConsumerOffset{
2015-06-02 16:03:10 +03:00
Offset: offset.Offset,
Timestamp: offset.Timestamp,
Lag: partitionLag,
}
} else {
2015-12-20 19:00:28 +03:00
ringval, _ := consumerPartitionRing.Value.(*ConsumerOffset)
2015-06-02 16:03:10 +03:00
ringval.Offset = offset.Offset
ringval.Timestamp = offset.Timestamp
ringval.Lag = partitionLag
}
// Advance the ring pointer
consumerTopicMap[offset.Partition] = consumerTopicMap[offset.Partition].Next()
2015-12-20 19:00:28 +03:00
clusterOffsets.consumerLock.Unlock()
2015-06-02 16:03:10 +03:00
}
func (storage *OffsetStorage) Stop() {
close(storage.quit)
}
func (storage *OffsetStorage) dropGroup(cluster string, group string, resultChannel chan StatusConstant) {
storage.offsets[cluster].consumerLock.Lock()
if _, ok := storage.offsets[cluster].consumer[group]; ok {
log.Infof("Removing group %s from cluster %s by request", group, cluster)
delete(storage.offsets[cluster].consumer, group)
resultChannel <- StatusOK
} else {
resultChannel <- StatusNotFound
}
storage.offsets[cluster].consumerLock.Unlock()
}
2015-06-02 16:03:10 +03:00
// Evaluate a consumer group based on specific rules about lag
// Rule 1: If over the stored period, the lag is ever zero for the partition, the period is OK
// Rule 2: If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled)
// Rule 3: If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow)
// Rule 4: If the difference between now and the last offset timestamp is greater than the difference between the last and first offset timestamps, the
// consumer has stopped committing offsets for that partition (error), unless
// Rule 4a: If the last consumer offset matches the broker offset, the consumer is OK whether it's stopped or not (ZK slow topic case)
// Rule 5: If the lag is -1, this is a special value that means there is no broker offset yet. Consider it good (will get caught in the next refresh of topics)
// Rule 6: If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error)
func (storage *OffsetStorage) evaluateGroup(cluster string, group string, resultChannel chan *ConsumerGroupStatus, showall bool) {
2015-06-02 16:03:10 +03:00
status := &ConsumerGroupStatus{
Cluster: cluster,
Group: group,
Status: StatusNotFound,
Complete: true,
Partitions: make([]*PartitionStatus, 0),
Maxlag: nil,
2015-06-02 16:03:10 +03:00
}
// Make sure the cluster exists
clusterMap, ok := storage.offsets[cluster]
if !ok {
resultChannel <- status
return
}
2015-06-02 16:03:10 +03:00
// Make sure the group even exists
clusterMap.consumerLock.RLock()
consumerMap, ok := clusterMap.consumer[group]
if !ok {
clusterMap.consumerLock.RUnlock()
2015-06-02 16:03:10 +03:00
resultChannel <- status
return
}
// Scan the offsets table once and store all the offsets for the group locally
status.Status = StatusOK
offsetList := make(map[string][][]ConsumerOffset, len(consumerMap))
2015-07-30 20:40:03 +03:00
var youngestOffset int64
for topic, partitions := range consumerMap {
2015-06-02 16:03:10 +03:00
offsetList[topic] = make([][]ConsumerOffset, len(partitions))
for partition, offsetRing := range partitions {
// If we don't have our ring full yet, make sure we let the caller know
2015-10-22 02:10:13 +03:00
if (offsetRing == nil) || (offsetRing.Value == nil) {
2015-06-02 16:03:10 +03:00
status.Complete = false
continue
}
// Pull out the offsets once so we can unlock the map
offsetList[topic][partition] = make([]ConsumerOffset, storage.app.Config.Lagcheck.Intervals)
partitionMap := offsetList[topic][partition]
2015-06-02 16:03:10 +03:00
idx := -1
offsetRing.Do(func(val interface{}) {
idx += 1
ptr, _ := val.(*ConsumerOffset)
partitionMap[idx] = *ptr
2015-06-02 16:03:10 +03:00
2015-07-30 20:40:03 +03:00
// Track the youngest offset we have found to check expiration
if partitionMap[idx].Timestamp > youngestOffset {
youngestOffset = partitionMap[idx].Timestamp
2015-07-30 20:40:03 +03:00
}
})
2015-06-02 16:03:10 +03:00
}
}
clusterMap.consumerLock.RUnlock()
2015-10-22 02:10:13 +03:00
// If the youngest offset is earlier than our expiration window, flush the group
2015-07-30 20:40:03 +03:00
if (youngestOffset > 0) && (youngestOffset < ((time.Now().Unix() - storage.app.Config.Lagcheck.ExpireGroup) * 1000)) {
clusterMap.consumerLock.Lock()
2015-07-30 20:40:03 +03:00
log.Infof("Removing expired group %s from cluster %s", group, cluster)
delete(clusterMap.consumer, group)
clusterMap.consumerLock.Unlock()
2015-06-02 16:03:10 +03:00
2015-07-30 20:40:03 +03:00
// Return the group as a 404
status.Status = StatusNotFound
2015-06-02 16:03:10 +03:00
resultChannel <- status
return
2015-07-30 20:40:03 +03:00
}
2015-06-02 16:03:10 +03:00
var maxlag int64
2015-06-02 16:03:10 +03:00
for topic, partitions := range offsetList {
for partition, offsets := range partitions {
// Skip partitions we're missing offsets for
if len(offsets) == 0 {
continue
}
maxidx := len(offsets) - 1
firstOffset := offsets[0]
lastOffset := offsets[maxidx]
2015-06-02 16:03:10 +03:00
2015-07-30 20:40:03 +03:00
// Rule 5 - we're missing broker offsets so we're not complete yet
if firstOffset.Lag == -1 {
2015-06-02 16:03:10 +03:00
status.Complete = false
continue
}
// We may always add this partition, so create it once
thispart := &PartitionStatus{
Topic: topic,
Partition: int32(partition),
Status: StatusOK,
Start: firstOffset,
End: lastOffset,
}
// Check if this partition is the one with the most lag currently
if lastOffset.Lag > maxlag {
status.Maxlag = thispart
}
2015-07-30 20:40:03 +03:00
// Rule 4 - Offsets haven't been committed in a while
if ((time.Now().Unix() * 1000) - lastOffset.Timestamp) > (lastOffset.Timestamp - firstOffset.Timestamp) {
// Rule 4a - Is the consumer caught up anyways?
if lastOffset.Offset < clusterMap.broker[topic][partition].Offset {
// No, so the consumer is actually stopped
status.Status = StatusError
thispart.Status = StatusStop
status.Partitions = append(status.Partitions, thispart)
continue
}
2015-06-02 16:03:10 +03:00
}
2015-07-30 20:40:03 +03:00
// Rule 6 - Did the consumer offsets rewind at any point?
// We check this first because we always want to know about a rewind - it's bad behavior
for i := 1; i <= maxidx; i++ {
if offsets[i].Offset < offsets[i-1].Offset {
status.Status = StatusError
thispart.Status = StatusRewind
status.Partitions = append(status.Partitions, thispart)
2015-07-30 20:40:03 +03:00
continue
}
}
2015-06-02 16:03:10 +03:00
// Rule 1
if lastOffset.Lag == 0 {
if showall {
status.Partitions = append(status.Partitions, thispart)
}
2015-06-02 16:03:10 +03:00
continue
}
if lastOffset.Offset == firstOffset.Offset {
2015-06-02 16:03:10 +03:00
// Rule 1
if firstOffset.Lag == 0 {
if showall {
status.Partitions = append(status.Partitions, thispart)
}
2015-06-02 16:03:10 +03:00
continue
}
// Rule 2
status.Status = StatusError
thispart.Status = StatusStall
2015-06-02 16:03:10 +03:00
} else {
// Rule 1 passes, or shortcut a full check on Rule 3 if we can
if (firstOffset.Lag == 0) || (lastOffset.Lag <= firstOffset.Lag) {
if showall {
status.Partitions = append(status.Partitions, thispart)
}
2015-06-02 16:03:10 +03:00
continue
}
lagDropped := false
for i := 0; i <= maxidx; i++ {
// Rule 1 passes or Rule 3 is shortcut (lag dropped somewhere in the period)
if (offsets[i].Lag == 0) || ((i > 0) && (offsets[i].Lag < offsets[i-1].Lag)) {
lagDropped = true
break
}
}
if !lagDropped {
// Rule 3
if status.Status == StatusOK {
status.Status = StatusWarning
}
thispart.Status = StatusWarning
2015-06-02 16:03:10 +03:00
}
}
// Always add the partition if it's not OK
if (thispart.Status != StatusOK) || showall {
status.Partitions = append(status.Partitions, thispart)
}
2015-06-02 16:03:10 +03:00
}
}
resultChannel <- status
2015-06-02 16:03:10 +03:00
}
func (storage *OffsetStorage) requestClusterList(request *RequestClusterList) {
clusterList := make([]string, len(storage.offsets))
i := 0
for group, _ := range storage.offsets {
clusterList[i] = group
i += 1
}
request.Result <- clusterList
}
func (storage *OffsetStorage) requestConsumerList(request *RequestConsumerList) {
if _, ok := storage.offsets[request.Cluster]; !ok {
request.Result <- make([]string, 0)
return
}
storage.offsets[request.Cluster].consumerLock.RLock()
consumerList := make([]string, len(storage.offsets[request.Cluster].consumer))
i := 0
for group := range storage.offsets[request.Cluster].consumer {
consumerList[i] = group
i += 1
}
storage.offsets[request.Cluster].consumerLock.RUnlock()
request.Result <- consumerList
}
func (storage *OffsetStorage) requestTopicList(request *RequestTopicList) {
if _, ok := storage.offsets[request.Cluster]; !ok {
request.Result <- &ResponseTopicList{Error: true}
return
}
response := &ResponseTopicList{Error: false}
if request.Group == "" {
storage.offsets[request.Cluster].brokerLock.RLock()
response.TopicList = make([]string, len(storage.offsets[request.Cluster].broker))
i := 0
for topic := range storage.offsets[request.Cluster].broker {
response.TopicList[i] = topic
i += 1
}
storage.offsets[request.Cluster].brokerLock.RUnlock()
} else {
storage.offsets[request.Cluster].consumerLock.RLock()
if _, ok := storage.offsets[request.Cluster].consumer[request.Group]; ok {
response.TopicList = make([]string, len(storage.offsets[request.Cluster].consumer[request.Group]))
i := 0
for topic := range storage.offsets[request.Cluster].consumer[request.Group] {
response.TopicList[i] = topic
i += 1
}
} else {
response.Error = true
}
storage.offsets[request.Cluster].consumerLock.RUnlock()
}
request.Result <- response
}
func (storage *OffsetStorage) requestOffsets(request *RequestOffsets) {
if _, ok := storage.offsets[request.Cluster]; !ok {
request.Result <- &ResponseOffsets{ErrorTopic: true, ErrorGroup: true}
return
}
response := &ResponseOffsets{ErrorGroup: false, ErrorTopic: false}
if request.Group == "" {
storage.offsets[request.Cluster].brokerLock.RLock()
if _, ok := storage.offsets[request.Cluster].broker[request.Topic]; ok {
response.OffsetList = make([]int64, len(storage.offsets[request.Cluster].broker[request.Topic]))
for partition, offset := range storage.offsets[request.Cluster].broker[request.Topic] {
if offset == nil {
response.OffsetList[partition] = -1
} else {
response.OffsetList[partition] = offset.Offset
}
}
} else {
response.ErrorTopic = true
}
storage.offsets[request.Cluster].brokerLock.RUnlock()
} else {
storage.offsets[request.Cluster].consumerLock.RLock()
if _, ok := storage.offsets[request.Cluster].consumer[request.Group]; ok {
if _, ok := storage.offsets[request.Cluster].consumer[request.Group][request.Topic]; ok {
response.OffsetList = make([]int64, len(storage.offsets[request.Cluster].consumer[request.Group][request.Topic]))
for partition, oring := range storage.offsets[request.Cluster].consumer[request.Group][request.Topic] {
if oring == nil {
2015-06-02 16:03:10 +03:00
response.OffsetList[partition] = -1
} else {
offset, _ := oring.Prev().Value.(*ConsumerOffset)
if offset == nil {
response.OffsetList[partition] = -1
} else {
response.OffsetList[partition] = offset.Offset
}
2015-06-02 16:03:10 +03:00
}
}
} else {
response.ErrorTopic = true
}
} else {
response.ErrorGroup = true
}
storage.offsets[request.Cluster].consumerLock.RUnlock()
}
request.Result <- response
}