зеркало из https://github.com/microsoft/Burrow.git
680 строки
23 KiB
Go
680 строки
23 KiB
Go
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
|
|
* 2.0 (the "License"); you may not use this file except in compliance with
|
|
* the License. You may obtain a copy of the License at
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"container/ring"
|
|
"fmt"
|
|
log "github.com/cihub/seelog"
|
|
"github.com/linkedin/Burrow/protocol"
|
|
"regexp"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type OffsetStorage struct {
|
|
app *ApplicationContext
|
|
quit chan struct{}
|
|
offsetChannel chan *protocol.PartitionOffset
|
|
requestChannel chan interface{}
|
|
offsets map[string]*ClusterOffsets
|
|
groupBlacklist *regexp.Regexp
|
|
}
|
|
|
|
type BrokerOffset struct {
|
|
Offset int64
|
|
Timestamp int64
|
|
}
|
|
|
|
type ClusterOffsets struct {
|
|
broker map[string][]*BrokerOffset
|
|
consumer map[string]map[string][]*ring.Ring
|
|
brokerLock *sync.RWMutex
|
|
consumerLock *sync.RWMutex
|
|
}
|
|
|
|
type ResponseTopicList struct {
|
|
TopicList []string
|
|
Error bool
|
|
}
|
|
type ResponseOffsets struct {
|
|
OffsetList []int64
|
|
ErrorGroup bool
|
|
ErrorTopic bool
|
|
}
|
|
type RequestClusterList struct {
|
|
Result chan []string
|
|
}
|
|
type RequestConsumerList struct {
|
|
Result chan []string
|
|
Cluster string
|
|
}
|
|
type RequestTopicList struct {
|
|
Result chan *ResponseTopicList
|
|
Cluster string
|
|
Group string
|
|
}
|
|
type RequestOffsets struct {
|
|
Result chan *ResponseOffsets
|
|
Cluster string
|
|
Topic string
|
|
Group string
|
|
}
|
|
type RequestConsumerStatus struct {
|
|
Result chan *protocol.ConsumerGroupStatus
|
|
Cluster string
|
|
Group string
|
|
Showall bool
|
|
}
|
|
type RequestConsumerDrop struct {
|
|
Result chan protocol.StatusConstant
|
|
Cluster string
|
|
Group string
|
|
}
|
|
|
|
func NewOffsetStorage(app *ApplicationContext) (*OffsetStorage, error) {
|
|
storage := &OffsetStorage{
|
|
app: app,
|
|
quit: make(chan struct{}),
|
|
offsetChannel: make(chan *protocol.PartitionOffset, 10000),
|
|
requestChannel: make(chan interface{}),
|
|
offsets: make(map[string]*ClusterOffsets),
|
|
}
|
|
|
|
if app.Config.General.GroupBlacklist != "" {
|
|
re, err := regexp.Compile(app.Config.General.GroupBlacklist)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
storage.groupBlacklist = re
|
|
}
|
|
|
|
for cluster, _ := range app.Config.Kafka {
|
|
storage.offsets[cluster] = &ClusterOffsets{
|
|
broker: make(map[string][]*BrokerOffset),
|
|
consumer: make(map[string]map[string][]*ring.Ring),
|
|
brokerLock: &sync.RWMutex{},
|
|
consumerLock: &sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case o := <-storage.offsetChannel:
|
|
if o.Group == "" {
|
|
go storage.addBrokerOffset(o)
|
|
} else {
|
|
go storage.addConsumerOffset(o)
|
|
}
|
|
case r := <-storage.requestChannel:
|
|
switch r.(type) {
|
|
case *RequestConsumerList:
|
|
request, _ := r.(*RequestConsumerList)
|
|
go storage.requestConsumerList(request)
|
|
case *RequestTopicList:
|
|
request, _ := r.(*RequestTopicList)
|
|
go storage.requestTopicList(request)
|
|
case *RequestOffsets:
|
|
request, _ := r.(*RequestOffsets)
|
|
go storage.requestOffsets(request)
|
|
case *RequestConsumerStatus:
|
|
request, _ := r.(*RequestConsumerStatus)
|
|
go storage.evaluateGroup(request.Cluster, request.Group, request.Result, request.Showall)
|
|
case *RequestConsumerDrop:
|
|
request, _ := r.(*RequestConsumerDrop)
|
|
go storage.dropGroup(request.Cluster, request.Group, request.Result)
|
|
default:
|
|
// Silently drop unknown requests
|
|
}
|
|
case <-storage.quit:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return storage, nil
|
|
}
|
|
|
|
func (storage *OffsetStorage) addBrokerOffset(offset *protocol.PartitionOffset) {
|
|
clusterMap, ok := storage.offsets[offset.Cluster]
|
|
if !ok {
|
|
// Ignore offsets for clusters that we don't know about - should never happen anyways
|
|
return
|
|
}
|
|
|
|
clusterMap.brokerLock.Lock()
|
|
topicList, ok := clusterMap.broker[offset.Topic]
|
|
if !ok {
|
|
clusterMap.broker[offset.Topic] = make([]*BrokerOffset, offset.TopicPartitionCount)
|
|
topicList = clusterMap.broker[offset.Topic]
|
|
}
|
|
if offset.TopicPartitionCount >= len(topicList) {
|
|
// The partition count has increased. Append enough extra partitions to our slice
|
|
for i := len(topicList); i < offset.TopicPartitionCount; i++ {
|
|
topicList = append(topicList, nil)
|
|
}
|
|
}
|
|
|
|
partitionEntry := topicList[offset.Partition]
|
|
if partitionEntry == nil {
|
|
topicList[offset.Partition] = &BrokerOffset{
|
|
Offset: offset.Offset,
|
|
Timestamp: offset.Timestamp,
|
|
}
|
|
partitionEntry = topicList[offset.Partition]
|
|
} else {
|
|
partitionEntry.Offset = offset.Offset
|
|
partitionEntry.Timestamp = offset.Timestamp
|
|
}
|
|
|
|
clusterMap.brokerLock.Unlock()
|
|
}
|
|
|
|
func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset) {
|
|
// Ignore offsets for clusters that we don't know about - should never happen anyways
|
|
clusterOffsets, ok := storage.offsets[offset.Cluster]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Ignore groups that match our blacklist
|
|
if (storage.groupBlacklist != nil) && storage.groupBlacklist.MatchString(offset.Group) {
|
|
log.Debugf("Dropped offset (blacklist): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset)
|
|
return
|
|
}
|
|
|
|
// Get broker partition count and offset for this topic and partition first
|
|
clusterOffsets.brokerLock.RLock()
|
|
topicPartitionList, ok := clusterOffsets.broker[offset.Topic]
|
|
if !ok {
|
|
// We don't know about this topic from the brokers yet - skip consumer offsets for now
|
|
clusterOffsets.brokerLock.RUnlock()
|
|
log.Debugf("Dropped offset (no topic): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset)
|
|
return
|
|
}
|
|
if offset.Partition < 0 {
|
|
// This should never happen, but if it does, log an warning with the offset information for review
|
|
log.Warnf("Got a negative partition ID: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset)
|
|
clusterOffsets.brokerLock.RUnlock()
|
|
return
|
|
}
|
|
if offset.Partition >= int32(len(topicPartitionList)) {
|
|
// We know about the topic, but partitions have been expanded and we haven't seen that from the broker yet
|
|
clusterOffsets.brokerLock.RUnlock()
|
|
log.Debugf("Dropped offset (expanded): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset)
|
|
return
|
|
}
|
|
if topicPartitionList[offset.Partition] == nil {
|
|
// We know about the topic and partition, but we haven't actually gotten the broker offset yet
|
|
clusterOffsets.brokerLock.RUnlock()
|
|
log.Debugf("Dropped offset (broker offset): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset)
|
|
return
|
|
}
|
|
brokerOffset := topicPartitionList[offset.Partition].Offset
|
|
partitionCount := len(topicPartitionList)
|
|
clusterOffsets.brokerLock.RUnlock()
|
|
|
|
clusterOffsets.consumerLock.Lock()
|
|
consumerMap, ok := clusterOffsets.consumer[offset.Group]
|
|
if !ok {
|
|
clusterOffsets.consumer[offset.Group] = make(map[string][]*ring.Ring)
|
|
consumerMap = clusterOffsets.consumer[offset.Group]
|
|
}
|
|
consumerTopicMap, ok := consumerMap[offset.Topic]
|
|
if !ok {
|
|
consumerMap[offset.Topic] = make([]*ring.Ring, partitionCount)
|
|
consumerTopicMap = consumerMap[offset.Topic]
|
|
}
|
|
if int(offset.Partition) >= len(consumerTopicMap) {
|
|
// The partition count must have increased. Append enough extra partitions to our slice
|
|
for i := len(consumerTopicMap); i < partitionCount; i++ {
|
|
consumerTopicMap = append(consumerTopicMap, nil)
|
|
}
|
|
}
|
|
|
|
consumerPartitionRing := consumerTopicMap[offset.Partition]
|
|
if consumerPartitionRing == nil {
|
|
consumerTopicMap[offset.Partition] = ring.New(storage.app.Config.Lagcheck.Intervals)
|
|
consumerPartitionRing = consumerTopicMap[offset.Partition]
|
|
} else {
|
|
lastOffset := consumerPartitionRing.Prev().Value.(*protocol.ConsumerOffset)
|
|
timestampDifference := offset.Timestamp - lastOffset.Timestamp
|
|
|
|
// Prevent old offset commits, but only if the offsets don't advance (because of artifical commits below)
|
|
if (timestampDifference <= 0) && (offset.Offset <= lastOffset.Offset) {
|
|
clusterOffsets.consumerLock.Unlock()
|
|
log.Debugf("Dropped offset (noadvance): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v tsdiff=%v lag=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset,
|
|
timestampDifference, brokerOffset-offset.Offset)
|
|
return
|
|
}
|
|
|
|
// Prevent new commits that are too fast (less than the min-distance config) if the last offset was not artificial
|
|
if (!lastOffset.Artificial) && (timestampDifference >= 0) && (timestampDifference < (storage.app.Config.Lagcheck.MinDistance * 1000)) {
|
|
clusterOffsets.consumerLock.Unlock()
|
|
log.Debugf("Dropped offset (mindistance): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v tsdiff=%v lag=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset,
|
|
timestampDifference, brokerOffset-offset.Offset)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Calculate the lag against the brokerOffset
|
|
partitionLag := brokerOffset - offset.Offset
|
|
if partitionLag < 0 {
|
|
// Little bit of a hack - because we only get broker offsets periodically, it's possible the consumer offset could be ahead of where we think the broker
|
|
// is. In this case, just mark it as zero lag.
|
|
partitionLag = 0
|
|
}
|
|
|
|
// Update or create the ring value at the current pointer
|
|
if consumerPartitionRing.Value == nil {
|
|
consumerPartitionRing.Value = &protocol.ConsumerOffset{
|
|
Offset: offset.Offset,
|
|
Timestamp: offset.Timestamp,
|
|
Lag: partitionLag,
|
|
Artificial: false,
|
|
}
|
|
} else {
|
|
ringval, _ := consumerPartitionRing.Value.(*protocol.ConsumerOffset)
|
|
ringval.Offset = offset.Offset
|
|
ringval.Timestamp = offset.Timestamp
|
|
ringval.Lag = partitionLag
|
|
ringval.Artificial = false
|
|
}
|
|
|
|
log.Tracef("Commit offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=%v",
|
|
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset,
|
|
partitionLag)
|
|
|
|
// Advance the ring pointer
|
|
consumerTopicMap[offset.Partition] = consumerTopicMap[offset.Partition].Next()
|
|
clusterOffsets.consumerLock.Unlock()
|
|
}
|
|
|
|
func (storage *OffsetStorage) Stop() {
|
|
close(storage.quit)
|
|
}
|
|
|
|
func (storage *OffsetStorage) dropGroup(cluster string, group string, resultChannel chan protocol.StatusConstant) {
|
|
storage.offsets[cluster].consumerLock.Lock()
|
|
|
|
if _, ok := storage.offsets[cluster].consumer[group]; ok {
|
|
log.Infof("Removing group %s from cluster %s by request", group, cluster)
|
|
delete(storage.offsets[cluster].consumer, group)
|
|
resultChannel <- protocol.StatusOK
|
|
} else {
|
|
resultChannel <- protocol.StatusNotFound
|
|
}
|
|
|
|
storage.offsets[cluster].consumerLock.Unlock()
|
|
}
|
|
|
|
// Evaluate a consumer group based on specific rules about lag
|
|
// Rule 1: If over the stored period, the lag is ever zero for the partition, the period is OK
|
|
// Rule 2: If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled)
|
|
// Rule 3: If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow)
|
|
// Rule 4: If the difference between now and the last offset timestamp is greater than the difference between the last and first offset timestamps, the
|
|
// consumer has stopped committing offsets for that partition (error), unless
|
|
// Rule 5: If the lag is -1, this is a special value that means there is no broker offset yet. Consider it good (will get caught in the next refresh of topics)
|
|
// Rule 6: If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error)
|
|
func (storage *OffsetStorage) evaluateGroup(cluster string, group string, resultChannel chan *protocol.ConsumerGroupStatus, showall bool) {
|
|
status := &protocol.ConsumerGroupStatus{
|
|
Cluster: cluster,
|
|
Group: group,
|
|
Status: protocol.StatusNotFound,
|
|
Complete: true,
|
|
Partitions: make([]*protocol.PartitionStatus, 0),
|
|
Maxlag: nil,
|
|
TotalLag: 0,
|
|
}
|
|
|
|
// Make sure the cluster exists
|
|
clusterMap, ok := storage.offsets[cluster]
|
|
if !ok {
|
|
resultChannel <- status
|
|
return
|
|
}
|
|
|
|
// Make sure the group even exists
|
|
clusterMap.consumerLock.Lock()
|
|
consumerMap, ok := clusterMap.consumer[group]
|
|
if !ok {
|
|
clusterMap.consumerLock.Unlock()
|
|
resultChannel <- status
|
|
return
|
|
}
|
|
|
|
// Scan the offsets table once and store all the offsets for the group locally
|
|
status.Status = protocol.StatusOK
|
|
offsetList := make(map[string][][]protocol.ConsumerOffset, len(consumerMap))
|
|
var youngestOffset int64
|
|
for topic, partitions := range consumerMap {
|
|
offsetList[topic] = make([][]protocol.ConsumerOffset, len(partitions))
|
|
for partition, offsetRing := range partitions {
|
|
status.TotalPartitions += 1
|
|
|
|
// If we don't have our ring full yet, make sure we let the caller know
|
|
if (offsetRing == nil) || (offsetRing.Value == nil) {
|
|
status.Complete = false
|
|
continue
|
|
}
|
|
|
|
// Add an artificial offset commit if the consumer has no lag against the current broker offset
|
|
lastOffset := offsetRing.Prev().Value.(*protocol.ConsumerOffset)
|
|
if lastOffset.Offset >= clusterMap.broker[topic][partition].Offset {
|
|
ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset)
|
|
ringval.Offset = lastOffset.Offset
|
|
ringval.Timestamp = time.Now().Unix() * 1000
|
|
ringval.Lag = 0
|
|
ringval.Artificial = true
|
|
partitions[partition] = partitions[partition].Next()
|
|
|
|
log.Tracef("Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=0",
|
|
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset)
|
|
}
|
|
|
|
// Pull out the offsets once so we can unlock the map
|
|
offsetList[topic][partition] = make([]protocol.ConsumerOffset, storage.app.Config.Lagcheck.Intervals)
|
|
partitionMap := offsetList[topic][partition]
|
|
idx := -1
|
|
partitions[partition].Do(func(val interface{}) {
|
|
idx += 1
|
|
ptr, _ := val.(*protocol.ConsumerOffset)
|
|
partitionMap[idx] = *ptr
|
|
|
|
// Track the youngest offset we have found to check expiration
|
|
if partitionMap[idx].Timestamp > youngestOffset {
|
|
youngestOffset = partitionMap[idx].Timestamp
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// If the youngest offset is earlier than our expiration window, flush the group
|
|
if (youngestOffset > 0) && (youngestOffset < ((time.Now().Unix() - storage.app.Config.Lagcheck.ExpireGroup) * 1000)) {
|
|
log.Infof("Removing expired group %s from cluster %s", group, cluster)
|
|
delete(clusterMap.consumer, group)
|
|
clusterMap.consumerLock.Unlock()
|
|
|
|
// Return the group as a 404
|
|
status.Status = protocol.StatusNotFound
|
|
resultChannel <- status
|
|
return
|
|
}
|
|
clusterMap.consumerLock.Unlock()
|
|
|
|
var maxlag int64
|
|
for topic, partitions := range offsetList {
|
|
for partition, offsets := range partitions {
|
|
// Skip partitions we're missing offsets for
|
|
if len(offsets) == 0 {
|
|
continue
|
|
}
|
|
maxidx := len(offsets) - 1
|
|
firstOffset := offsets[0]
|
|
lastOffset := offsets[maxidx]
|
|
|
|
// Rule 5 - we're missing broker offsets so we're not complete yet
|
|
if firstOffset.Lag == -1 {
|
|
status.Complete = false
|
|
continue
|
|
}
|
|
|
|
// We may always add this partition, so create it once
|
|
thispart := &protocol.PartitionStatus{
|
|
Topic: topic,
|
|
Partition: int32(partition),
|
|
Status: protocol.StatusOK,
|
|
Start: firstOffset,
|
|
End: lastOffset,
|
|
}
|
|
|
|
// Check if this partition is the one with the most lag currently
|
|
if lastOffset.Lag > maxlag {
|
|
status.Maxlag = thispart
|
|
maxlag = lastOffset.Lag
|
|
}
|
|
status.TotalLag += uint64(lastOffset.Lag)
|
|
|
|
// Rule 4 - Offsets haven't been committed in a while
|
|
if ((time.Now().Unix() * 1000) - lastOffset.Timestamp) > (lastOffset.Timestamp - firstOffset.Timestamp) {
|
|
status.Status = protocol.StatusError
|
|
thispart.Status = protocol.StatusStop
|
|
status.Partitions = append(status.Partitions, thispart)
|
|
continue
|
|
}
|
|
|
|
// Rule 6 - Did the consumer offsets rewind at any point?
|
|
// We check this first because we always want to know about a rewind - it's bad behavior
|
|
for i := 1; i <= maxidx; i++ {
|
|
if offsets[i].Offset < offsets[i-1].Offset {
|
|
status.Status = protocol.StatusError
|
|
thispart.Status = protocol.StatusRewind
|
|
status.Partitions = append(status.Partitions, thispart)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Rule 1
|
|
if lastOffset.Lag == 0 {
|
|
if showall {
|
|
status.Partitions = append(status.Partitions, thispart)
|
|
}
|
|
continue
|
|
}
|
|
if lastOffset.Offset == firstOffset.Offset {
|
|
// Rule 1
|
|
if firstOffset.Lag == 0 {
|
|
if showall {
|
|
status.Partitions = append(status.Partitions, thispart)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Rule 2
|
|
status.Status = protocol.StatusError
|
|
thispart.Status = protocol.StatusStall
|
|
} else {
|
|
// Rule 1 passes, or shortcut a full check on Rule 3 if we can
|
|
if (firstOffset.Lag == 0) || (lastOffset.Lag <= firstOffset.Lag) {
|
|
if showall {
|
|
status.Partitions = append(status.Partitions, thispart)
|
|
}
|
|
continue
|
|
}
|
|
|
|
lagDropped := false
|
|
for i := 0; i <= maxidx; i++ {
|
|
// Rule 1 passes or Rule 3 is shortcut (lag dropped somewhere in the period)
|
|
if (offsets[i].Lag == 0) || ((i > 0) && (offsets[i].Lag < offsets[i-1].Lag)) {
|
|
lagDropped = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !lagDropped {
|
|
// Rule 3
|
|
if status.Status == protocol.StatusOK {
|
|
status.Status = protocol.StatusWarning
|
|
}
|
|
thispart.Status = protocol.StatusWarning
|
|
}
|
|
}
|
|
|
|
// Always add the partition if it's not OK
|
|
if (thispart.Status != protocol.StatusOK) || showall {
|
|
status.Partitions = append(status.Partitions, thispart)
|
|
}
|
|
}
|
|
}
|
|
resultChannel <- status
|
|
}
|
|
|
|
func (storage *OffsetStorage) requestClusterList(request *RequestClusterList) {
|
|
clusterList := make([]string, len(storage.offsets))
|
|
i := 0
|
|
for group, _ := range storage.offsets {
|
|
clusterList[i] = group
|
|
i += 1
|
|
}
|
|
|
|
request.Result <- clusterList
|
|
}
|
|
|
|
func (storage *OffsetStorage) requestConsumerList(request *RequestConsumerList) {
|
|
if _, ok := storage.offsets[request.Cluster]; !ok {
|
|
request.Result <- make([]string, 0)
|
|
return
|
|
}
|
|
|
|
storage.offsets[request.Cluster].consumerLock.RLock()
|
|
consumerList := make([]string, len(storage.offsets[request.Cluster].consumer))
|
|
i := 0
|
|
for group := range storage.offsets[request.Cluster].consumer {
|
|
consumerList[i] = group
|
|
i += 1
|
|
}
|
|
storage.offsets[request.Cluster].consumerLock.RUnlock()
|
|
|
|
request.Result <- consumerList
|
|
}
|
|
|
|
func (storage *OffsetStorage) requestTopicList(request *RequestTopicList) {
|
|
if _, ok := storage.offsets[request.Cluster]; !ok {
|
|
request.Result <- &ResponseTopicList{Error: true}
|
|
return
|
|
}
|
|
|
|
response := &ResponseTopicList{Error: false}
|
|
if request.Group == "" {
|
|
storage.offsets[request.Cluster].brokerLock.RLock()
|
|
response.TopicList = make([]string, len(storage.offsets[request.Cluster].broker))
|
|
i := 0
|
|
for topic := range storage.offsets[request.Cluster].broker {
|
|
response.TopicList[i] = topic
|
|
i += 1
|
|
}
|
|
storage.offsets[request.Cluster].brokerLock.RUnlock()
|
|
} else {
|
|
storage.offsets[request.Cluster].consumerLock.RLock()
|
|
if _, ok := storage.offsets[request.Cluster].consumer[request.Group]; ok {
|
|
response.TopicList = make([]string, len(storage.offsets[request.Cluster].consumer[request.Group]))
|
|
i := 0
|
|
for topic := range storage.offsets[request.Cluster].consumer[request.Group] {
|
|
response.TopicList[i] = topic
|
|
i += 1
|
|
}
|
|
} else {
|
|
response.Error = true
|
|
}
|
|
storage.offsets[request.Cluster].consumerLock.RUnlock()
|
|
}
|
|
request.Result <- response
|
|
}
|
|
|
|
func (storage *OffsetStorage) requestOffsets(request *RequestOffsets) {
|
|
if _, ok := storage.offsets[request.Cluster]; !ok {
|
|
request.Result <- &ResponseOffsets{ErrorTopic: true, ErrorGroup: true}
|
|
return
|
|
}
|
|
|
|
response := &ResponseOffsets{ErrorGroup: false, ErrorTopic: false}
|
|
if request.Group == "" {
|
|
storage.offsets[request.Cluster].brokerLock.RLock()
|
|
if _, ok := storage.offsets[request.Cluster].broker[request.Topic]; ok {
|
|
response.OffsetList = make([]int64, len(storage.offsets[request.Cluster].broker[request.Topic]))
|
|
for partition, offset := range storage.offsets[request.Cluster].broker[request.Topic] {
|
|
if offset == nil {
|
|
response.OffsetList[partition] = -1
|
|
} else {
|
|
response.OffsetList[partition] = offset.Offset
|
|
}
|
|
}
|
|
} else {
|
|
response.ErrorTopic = true
|
|
}
|
|
storage.offsets[request.Cluster].brokerLock.RUnlock()
|
|
} else {
|
|
storage.offsets[request.Cluster].consumerLock.RLock()
|
|
if _, ok := storage.offsets[request.Cluster].consumer[request.Group]; ok {
|
|
if _, ok := storage.offsets[request.Cluster].consumer[request.Group][request.Topic]; ok {
|
|
response.OffsetList = make([]int64, len(storage.offsets[request.Cluster].consumer[request.Group][request.Topic]))
|
|
for partition, oring := range storage.offsets[request.Cluster].consumer[request.Group][request.Topic] {
|
|
if oring == nil {
|
|
response.OffsetList[partition] = -1
|
|
} else {
|
|
offset, _ := oring.Prev().Value.(*protocol.ConsumerOffset)
|
|
if offset == nil {
|
|
response.OffsetList[partition] = -1
|
|
} else {
|
|
response.OffsetList[partition] = offset.Offset
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
response.ErrorTopic = true
|
|
}
|
|
} else {
|
|
response.ErrorGroup = true
|
|
}
|
|
storage.offsets[request.Cluster].consumerLock.RUnlock()
|
|
}
|
|
request.Result <- response
|
|
}
|
|
|
|
func (storage *OffsetStorage) debugPrintGroup(cluster string, group string) {
|
|
// Make sure the cluster exists
|
|
clusterMap, ok := storage.offsets[cluster]
|
|
if !ok {
|
|
log.Debugf("Detail cluster=%s,group=%s: No Cluster", cluster, group)
|
|
return
|
|
}
|
|
|
|
// Make sure the group even exists
|
|
clusterMap.consumerLock.RLock()
|
|
consumerMap, ok := clusterMap.consumer[group]
|
|
if !ok {
|
|
clusterMap.consumerLock.RUnlock()
|
|
log.Debugf("Detail cluster=%s,group=%s: No Group", cluster, group)
|
|
return
|
|
}
|
|
|
|
// Scan the offsets table and print all partitions (full ring) for the group
|
|
for topic, partitions := range consumerMap {
|
|
for partition, offsetRing := range partitions {
|
|
if (offsetRing == nil) || (offsetRing.Value == nil) {
|
|
log.Debugf("Detail cluster=%s,group=%s,topic=%s,partition=%v: No Ring", cluster, group, topic, partition)
|
|
continue
|
|
}
|
|
|
|
// Pull out the offsets once so we can unlock the map
|
|
ringStr := ""
|
|
offsetRing.Do(func(val interface{}) {
|
|
if val == nil {
|
|
ringStr += "(),"
|
|
} else {
|
|
ptr, _ := val.(*protocol.ConsumerOffset)
|
|
ringStr += fmt.Sprintf("(%v,%v,%v,%v)", ptr.Timestamp, ptr.Offset, ptr.Lag, ptr.Artificial)
|
|
}
|
|
})
|
|
log.Debugf("Detail cluster=%s,group=%s,topic=%s,partition=%v: %s", cluster, group, topic, partition, ringStr)
|
|
}
|
|
}
|
|
clusterMap.consumerLock.RUnlock()
|
|
}
|