зеркало из https://github.com/microsoft/Burrow.git
update: with protocol enable
This commit is contained in:
Родитель
656f9fc446
Коммит
a91376a745
15
debug.go
15
debug.go
|
@ -12,15 +12,16 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
)
|
||||
|
||||
func printConsumerGroupStatus(status *ConsumerGroupStatus) {
|
||||
func printConsumerGroupStatus(status *protocol.ConsumerGroupStatus) {
|
||||
fmt.Println("-------------------------------------------------")
|
||||
fmt.Println("Group: ", status.Group)
|
||||
if status.Status == StatusOK {
|
||||
if status.Status == protocol.StatusOK {
|
||||
fmt.Printf("Status: OK (complete = %t)\n", status.Complete)
|
||||
} else {
|
||||
if status.Status == StatusWarning {
|
||||
if status.Status == protocol.StatusWarning {
|
||||
fmt.Printf("Status: WARNING (complete = %t)\n", status.Complete)
|
||||
} else {
|
||||
fmt.Printf("Status: ERROR (complete = %t)\n", status.Complete)
|
||||
|
@ -29,13 +30,13 @@ func printConsumerGroupStatus(status *ConsumerGroupStatus) {
|
|||
for _, partition := range status.Partitions {
|
||||
prefix := " OK"
|
||||
switch {
|
||||
case partition.Status == StatusWarning:
|
||||
case partition.Status == protocol.StatusWarning:
|
||||
prefix = " WARN"
|
||||
case partition.Status == StatusStop:
|
||||
case partition.Status == protocol.StatusStop:
|
||||
prefix = " STOP"
|
||||
case partition.Status == StatusError:
|
||||
case partition.Status == protocol.StatusError:
|
||||
prefix = " ERR"
|
||||
case partition.Status == StatusStall:
|
||||
case partition.Status == protocol.StatusStall:
|
||||
prefix = " STALL"
|
||||
default:
|
||||
prefix = " STOP"
|
||||
|
|
|
@ -13,6 +13,7 @@ package main
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -136,10 +137,10 @@ type HTTPResponseConsumerList struct {
|
|||
Request HTTPResponseRequestInfo `json:"request"`
|
||||
}
|
||||
type HTTPResponseConsumerStatus struct {
|
||||
Error bool `json:"error"`
|
||||
Message string `json:"message"`
|
||||
Status ConsumerGroupStatus `json:"status"`
|
||||
Request HTTPResponseRequestInfo `json:"request"`
|
||||
Error bool `json:"error"`
|
||||
Message string `json:"message"`
|
||||
Status protocol.ConsumerGroupStatus `json:"status"`
|
||||
Request HTTPResponseRequestInfo `json:"request"`
|
||||
}
|
||||
|
||||
func makeRequestInfo(r *http.Request) HTTPResponseRequestInfo {
|
||||
|
@ -360,10 +361,10 @@ func handleConsumerTopicDetail(app *ApplicationContext, w http.ResponseWriter, r
|
|||
}
|
||||
|
||||
func handleConsumerStatus(app *ApplicationContext, w http.ResponseWriter, r *http.Request, cluster string, group string, showall bool) (int, string) {
|
||||
storageRequest := &RequestConsumerStatus{Result: make(chan *ConsumerGroupStatus), Cluster: cluster, Group: group, Showall: showall}
|
||||
storageRequest := &RequestConsumerStatus{Result: make(chan *protocol.ConsumerGroupStatus), Cluster: cluster, Group: group, Showall: showall}
|
||||
app.Storage.requestChannel <- storageRequest
|
||||
result := <-storageRequest.Result
|
||||
if result.Status == StatusNotFound {
|
||||
if result.Status == protocol.StatusNotFound {
|
||||
return makeErrorResponse(http.StatusNotFound, "consumer group not found", w, r)
|
||||
}
|
||||
|
||||
|
@ -384,10 +385,10 @@ func handleConsumerStatus(app *ApplicationContext, w http.ResponseWriter, r *htt
|
|||
}
|
||||
|
||||
func handleConsumerDrop(app *ApplicationContext, w http.ResponseWriter, r *http.Request, cluster string, group string) (int, string) {
|
||||
storageRequest := &RequestConsumerDrop{Result: make(chan StatusConstant), Cluster: cluster, Group: group}
|
||||
storageRequest := &RequestConsumerDrop{Result: make(chan protocol.StatusConstant), Cluster: cluster, Group: group}
|
||||
app.Storage.requestChannel <- storageRequest
|
||||
result := <-storageRequest.Result
|
||||
if result == StatusNotFound {
|
||||
if result == protocol.StatusNotFound {
|
||||
return makeErrorResponse(http.StatusNotFound, "consumer group not found", w, r)
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"errors"
|
||||
"github.com/Shopify/sarama"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -160,7 +161,7 @@ func (client *KafkaClient) Stop() {
|
|||
}
|
||||
|
||||
// Send the offset on the specified channel, but wait no more than maxTime seconds to do so
|
||||
func timeoutSendOffset(offsetChannel chan *PartitionOffset, offset *PartitionOffset, maxTime int) {
|
||||
func timeoutSendOffset(offsetChannel chan *protocol.PartitionOffset, offset *protocol.PartitionOffset, maxTime int) {
|
||||
timeout := time.After(time.Duration(maxTime) * time.Second)
|
||||
select {
|
||||
case offsetChannel <- offset:
|
||||
|
@ -215,7 +216,7 @@ func (client *KafkaClient) getOffsets() error {
|
|||
log.Warnf("Error in OffsetResponse for %s:%v from broker %v: %s", topic, partition, brokerID, offsetResponse.Err.Error())
|
||||
continue
|
||||
}
|
||||
offset := &PartitionOffset{
|
||||
offset := &protocol.PartitionOffset{
|
||||
Cluster: client.cluster,
|
||||
Topic: topic,
|
||||
Partition: partition,
|
||||
|
@ -328,7 +329,7 @@ func (client *KafkaClient) processConsumerOffsetsMessage(msg *sarama.ConsumerMes
|
|||
}
|
||||
|
||||
// fmt.Printf("[%s,%s,%v]::OffsetAndMetadata[%v,%s,%v]\n", group, topic, partition, offset, metadata, timestamp)
|
||||
partitionOffset := &PartitionOffset{
|
||||
partitionOffset := &protocol.PartitionOffset{
|
||||
Cluster: client.cluster,
|
||||
Topic: topic,
|
||||
Partition: int32(partition),
|
||||
|
|
|
@ -12,6 +12,7 @@ package notifier
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
)
|
||||
|
||||
// Helper function for the templates to encode an object into a JSON string
|
||||
|
@ -22,7 +23,7 @@ func templateJsonEncoder(encodeMe interface{}) string {
|
|||
|
||||
// Helper - recategorize partitions as a map of lists
|
||||
// map[string][]string => status short name -> list of topics
|
||||
func classifyTopicsByStatus(partitions []*PartitionStatus) map[string][]string {
|
||||
func classifyTopicsByStatus(partitions []*protocol.PartitionStatus) map[string][]string {
|
||||
tmp_map := make(map[string]map[string]bool)
|
||||
for _, partition := range partitions {
|
||||
if _, ok := tmp_map[partition.Status.String()]; !ok {
|
||||
|
@ -44,7 +45,7 @@ func classifyTopicsByStatus(partitions []*PartitionStatus) map[string][]string {
|
|||
|
||||
// Template Helper - Return a map of partition counts
|
||||
// keys are warn, stop, stall, rewind, unknown
|
||||
func templateCountPartitions(partitions []*PartitionStatus) map[string]int {
|
||||
func templateCountPartitions(partitions []*protocol.PartitionStatus) map[string]int {
|
||||
rv := map[string]int{
|
||||
"warn": 0,
|
||||
"stop": 0,
|
||||
|
@ -55,15 +56,15 @@ func templateCountPartitions(partitions []*PartitionStatus) map[string]int {
|
|||
|
||||
for _, partition := range partitions {
|
||||
switch partition.Status {
|
||||
case StatusOK:
|
||||
case protocol.StatusOK:
|
||||
break
|
||||
case StatusWarning:
|
||||
case protocol.StatusWarning:
|
||||
rv["warn"]++
|
||||
case StatusStop:
|
||||
case protocol.StatusStop:
|
||||
rv["stop"]++
|
||||
case StatusStall:
|
||||
case protocol.StatusStall:
|
||||
rv["stall"]++
|
||||
case StatusRewind:
|
||||
case protocol.StatusRewind:
|
||||
rv["rewind"]++
|
||||
default:
|
||||
rv["unknown"]++
|
||||
|
@ -87,7 +88,7 @@ func templateDivide(a, b int) int {
|
|||
return a / b
|
||||
}
|
||||
|
||||
func maxLagHelper(a *PartitionStatus) int64 {
|
||||
func maxLagHelper(a *protocol.PartitionStatus) int64 {
|
||||
if a == nil {
|
||||
return 0
|
||||
} else {
|
||||
|
|
|
@ -13,6 +13,7 @@ package notifier
|
|||
import (
|
||||
"bytes"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"github.com/pborman/uuid"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -152,7 +153,7 @@ func (notifier *HttpNotifier) sendConsumerGroupStatusNotify(msg Message) error {
|
|||
msg.Cluster, msg.Status, idStr, resp.Status)
|
||||
}
|
||||
|
||||
if notifier.SendDelete && (msg.Status == StatusOK) {
|
||||
if notifier.SendDelete && (msg.Status == protocol.StatusOK) {
|
||||
if _, ok := notifier.groupIds[msg.Cluster][msg.Group]; ok {
|
||||
// Send DELETE to HTTP endpoint
|
||||
bytesToSend := new(bytes.Buffer)
|
||||
|
|
|
@ -11,62 +11,10 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
)
|
||||
|
||||
type StatusConstant int
|
||||
|
||||
const (
|
||||
StatusNotFound StatusConstant = 0
|
||||
StatusOK StatusConstant = 1
|
||||
StatusWarning StatusConstant = 2
|
||||
StatusError StatusConstant = 3
|
||||
StatusStop StatusConstant = 4
|
||||
StatusStall StatusConstant = 5
|
||||
StatusRewind StatusConstant = 6
|
||||
)
|
||||
|
||||
var StatusStrings = [...]string{"NOTFOUND", "OK", "WARN", "ERR", "STOP", "STALL", "REWIND"}
|
||||
|
||||
func (c StatusConstant) String() string {
|
||||
if (c >= 0) && (c < StatusConstant(len(StatusStrings))) {
|
||||
return StatusStrings[c]
|
||||
} else {
|
||||
return "UNKNOWN"
|
||||
}
|
||||
}
|
||||
func (c StatusConstant) MarshalText() ([]byte, error) {
|
||||
return []byte(c.String()), nil
|
||||
}
|
||||
func (c StatusConstant) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(c.String())
|
||||
}
|
||||
|
||||
type ConsumerOffset struct {
|
||||
Offset int64
|
||||
Timestamp int64
|
||||
Lag int64
|
||||
artificial bool
|
||||
}
|
||||
|
||||
type PartitionStatus struct {
|
||||
Topic string
|
||||
Partition int32
|
||||
Status StatusConstant
|
||||
Start ConsumerOffset
|
||||
End ConsumerOffset
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
Cluster string
|
||||
Group string
|
||||
Status StatusConstant
|
||||
Complete bool
|
||||
Partitions []*PartitionStatus
|
||||
TotalPartitions int
|
||||
Maxlag *PartitionStatus
|
||||
TotalLag uint64
|
||||
}
|
||||
type Message protocol.ConsumerGroupStatus
|
||||
|
||||
type Notifier interface {
|
||||
Notify(msg Message) error
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
@ -84,10 +85,10 @@ func (slack *SlackNotifier) sendConsumerGroupStatusNotify() error {
|
|||
|
||||
var emoji, color string
|
||||
switch msg.Status {
|
||||
case StatusOK:
|
||||
case protocol.StatusOK:
|
||||
emoji = ":white_check_mark:"
|
||||
color = "good"
|
||||
case StatusNotFound, StatusWarning:
|
||||
case protocol.StatusNotFound, protocol.StatusWarning:
|
||||
emoji = ":question:"
|
||||
color = "warning"
|
||||
default:
|
||||
|
|
|
@ -13,6 +13,7 @@ package main
|
|||
import (
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/linkedin/Burrow/notifier"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -30,7 +31,7 @@ type NotifyCenter struct {
|
|||
quitChan chan struct{}
|
||||
groupList map[string]map[string]bool
|
||||
groupLock sync.RWMutex
|
||||
resultsChannel chan *ConsumerGroupStatus
|
||||
resultsChannel chan *protocol.ConsumerGroupStatus
|
||||
}
|
||||
|
||||
func LoadNotifiers(app *ApplicationContext) error {
|
||||
|
@ -60,7 +61,7 @@ func LoadNotifiers(app *ApplicationContext) error {
|
|||
quitChan: make(chan struct{}),
|
||||
groupList: make(map[string]map[string]bool),
|
||||
groupLock: sync.RWMutex{},
|
||||
resultsChannel: make(chan *ConsumerGroupStatus),
|
||||
resultsChannel: make(chan *protocol.ConsumerGroupStatus),
|
||||
}
|
||||
|
||||
app.NotifyCenter = nc
|
||||
|
@ -110,9 +111,8 @@ func StopNotifiers(app *ApplicationContext) {
|
|||
// TODO stop all ncs
|
||||
}
|
||||
|
||||
func (nc *NotifyCenter) handleEvaluationResponse(result *ConsumerGroupStatus) {
|
||||
// TODO convert result to Message
|
||||
msg := notifier.Message{}
|
||||
func (nc *NotifyCenter) handleEvaluationResponse(result *protocol.ConsumerGroupStatus) {
|
||||
msg := notifier.Message(*result)
|
||||
for _, notifier := range nc.notifiers {
|
||||
if !notifier.Ignore(msg) {
|
||||
notifier.Notify(msg)
|
||||
|
|
163
offsets_store.go
163
offsets_store.go
|
@ -12,22 +12,21 @@ package main
|
|||
|
||||
import (
|
||||
"container/ring"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PartitionOffset struct {
|
||||
Cluster string
|
||||
Topic string
|
||||
Partition int32
|
||||
Offset int64
|
||||
Timestamp int64
|
||||
Group string
|
||||
TopicPartitionCount int
|
||||
type OffsetStorage struct {
|
||||
app *ApplicationContext
|
||||
quit chan struct{}
|
||||
offsetChannel chan *protocol.PartitionOffset
|
||||
requestChannel chan interface{}
|
||||
offsets map[string]*ClusterOffsets
|
||||
groupBlacklist *regexp.Regexp
|
||||
}
|
||||
|
||||
type BrokerOffset struct {
|
||||
|
@ -35,74 +34,12 @@ type BrokerOffset struct {
|
|||
Timestamp int64
|
||||
}
|
||||
|
||||
type ConsumerOffset struct {
|
||||
Offset int64 `json:"offset"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Lag int64 `json:"lag"`
|
||||
artificial bool
|
||||
}
|
||||
|
||||
type ClusterOffsets struct {
|
||||
broker map[string][]*BrokerOffset
|
||||
consumer map[string]map[string][]*ring.Ring
|
||||
brokerLock *sync.RWMutex
|
||||
consumerLock *sync.RWMutex
|
||||
}
|
||||
type OffsetStorage struct {
|
||||
app *ApplicationContext
|
||||
quit chan struct{}
|
||||
offsetChannel chan *PartitionOffset
|
||||
requestChannel chan interface{}
|
||||
offsets map[string]*ClusterOffsets
|
||||
groupBlacklist *regexp.Regexp
|
||||
}
|
||||
|
||||
type StatusConstant int
|
||||
|
||||
const (
|
||||
StatusNotFound StatusConstant = 0
|
||||
StatusOK StatusConstant = 1
|
||||
StatusWarning StatusConstant = 2
|
||||
StatusError StatusConstant = 3
|
||||
StatusStop StatusConstant = 4
|
||||
StatusStall StatusConstant = 5
|
||||
StatusRewind StatusConstant = 6
|
||||
)
|
||||
|
||||
var StatusStrings = [...]string{"NOTFOUND", "OK", "WARN", "ERR", "STOP", "STALL", "REWIND"}
|
||||
|
||||
func (c StatusConstant) String() string {
|
||||
if (c >= 0) && (c < StatusConstant(len(StatusStrings))) {
|
||||
return StatusStrings[c]
|
||||
} else {
|
||||
return "UNKNOWN"
|
||||
}
|
||||
}
|
||||
func (c StatusConstant) MarshalText() ([]byte, error) {
|
||||
return []byte(c.String()), nil
|
||||
}
|
||||
func (c StatusConstant) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(c.String())
|
||||
}
|
||||
|
||||
type PartitionStatus struct {
|
||||
Topic string `json:"topic"`
|
||||
Partition int32 `json:"partition"`
|
||||
Status StatusConstant `json:"status"`
|
||||
Start ConsumerOffset `json:"start"`
|
||||
End ConsumerOffset `json:"end"`
|
||||
}
|
||||
|
||||
type ConsumerGroupStatus struct {
|
||||
Cluster string `json:"cluster"`
|
||||
Group string `json:"group"`
|
||||
Status StatusConstant `json:"status"`
|
||||
Complete bool `json:"complete"`
|
||||
Partitions []*PartitionStatus `json:"partitions"`
|
||||
TotalPartitions int `json:"partition_count"`
|
||||
Maxlag *PartitionStatus `json:"maxlag"`
|
||||
TotalLag uint64 `json:"totallag"`
|
||||
}
|
||||
|
||||
type ResponseTopicList struct {
|
||||
TopicList []string
|
||||
|
@ -132,13 +69,13 @@ type RequestOffsets struct {
|
|||
Group string
|
||||
}
|
||||
type RequestConsumerStatus struct {
|
||||
Result chan *ConsumerGroupStatus
|
||||
Result chan *protocol.ConsumerGroupStatus
|
||||
Cluster string
|
||||
Group string
|
||||
Showall bool
|
||||
}
|
||||
type RequestConsumerDrop struct {
|
||||
Result chan StatusConstant
|
||||
Result chan protocol.StatusConstant
|
||||
Cluster string
|
||||
Group string
|
||||
}
|
||||
|
@ -147,7 +84,7 @@ func NewOffsetStorage(app *ApplicationContext) (*OffsetStorage, error) {
|
|||
storage := &OffsetStorage{
|
||||
app: app,
|
||||
quit: make(chan struct{}),
|
||||
offsetChannel: make(chan *PartitionOffset, 10000),
|
||||
offsetChannel: make(chan *protocol.PartitionOffset, 10000),
|
||||
requestChannel: make(chan interface{}),
|
||||
offsets: make(map[string]*ClusterOffsets),
|
||||
}
|
||||
|
@ -207,7 +144,7 @@ func NewOffsetStorage(app *ApplicationContext) (*OffsetStorage, error) {
|
|||
return storage, nil
|
||||
}
|
||||
|
||||
func (storage *OffsetStorage) addBrokerOffset(offset *PartitionOffset) {
|
||||
func (storage *OffsetStorage) addBrokerOffset(offset *protocol.PartitionOffset) {
|
||||
clusterMap, ok := storage.offsets[offset.Cluster]
|
||||
if !ok {
|
||||
// Ignore offsets for clusters that we don't know about - should never happen anyways
|
||||
|
@ -242,7 +179,7 @@ func (storage *OffsetStorage) addBrokerOffset(offset *PartitionOffset) {
|
|||
clusterMap.brokerLock.Unlock()
|
||||
}
|
||||
|
||||
func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
|
||||
func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset) {
|
||||
// Ignore offsets for clusters that we don't know about - should never happen anyways
|
||||
clusterOffsets, ok := storage.offsets[offset.Cluster]
|
||||
if !ok {
|
||||
|
@ -314,7 +251,7 @@ func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
|
|||
consumerTopicMap[offset.Partition] = ring.New(storage.app.Config.Lagcheck.Intervals)
|
||||
consumerPartitionRing = consumerTopicMap[offset.Partition]
|
||||
} else {
|
||||
lastOffset := consumerPartitionRing.Prev().Value.(*ConsumerOffset)
|
||||
lastOffset := consumerPartitionRing.Prev().Value.(*protocol.ConsumerOffset)
|
||||
timestampDifference := offset.Timestamp - lastOffset.Timestamp
|
||||
|
||||
// Prevent old offset commits, but only if the offsets don't advance (because of artifical commits below)
|
||||
|
@ -327,7 +264,7 @@ func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
|
|||
}
|
||||
|
||||
// Prevent new commits that are too fast (less than the min-distance config) if the last offset was not artificial
|
||||
if (!lastOffset.artificial) && (timestampDifference >= 0) && (timestampDifference < (storage.app.Config.Lagcheck.MinDistance * 1000)) {
|
||||
if (!lastOffset.Artificial) && (timestampDifference >= 0) && (timestampDifference < (storage.app.Config.Lagcheck.MinDistance * 1000)) {
|
||||
clusterOffsets.consumerLock.Unlock()
|
||||
log.Debugf("Dropped offset (mindistance): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v tsdiff=%v lag=%v",
|
||||
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset,
|
||||
|
@ -346,18 +283,18 @@ func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
|
|||
|
||||
// Update or create the ring value at the current pointer
|
||||
if consumerPartitionRing.Value == nil {
|
||||
consumerPartitionRing.Value = &ConsumerOffset{
|
||||
consumerPartitionRing.Value = &protocol.ConsumerOffset{
|
||||
Offset: offset.Offset,
|
||||
Timestamp: offset.Timestamp,
|
||||
Lag: partitionLag,
|
||||
artificial: false,
|
||||
Artificial: false,
|
||||
}
|
||||
} else {
|
||||
ringval, _ := consumerPartitionRing.Value.(*ConsumerOffset)
|
||||
ringval, _ := consumerPartitionRing.Value.(*protocol.ConsumerOffset)
|
||||
ringval.Offset = offset.Offset
|
||||
ringval.Timestamp = offset.Timestamp
|
||||
ringval.Lag = partitionLag
|
||||
ringval.artificial = false
|
||||
ringval.Artificial = false
|
||||
}
|
||||
|
||||
log.Tracef("Commit offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=%v",
|
||||
|
@ -373,15 +310,15 @@ func (storage *OffsetStorage) Stop() {
|
|||
close(storage.quit)
|
||||
}
|
||||
|
||||
func (storage *OffsetStorage) dropGroup(cluster string, group string, resultChannel chan StatusConstant) {
|
||||
func (storage *OffsetStorage) dropGroup(cluster string, group string, resultChannel chan protocol.StatusConstant) {
|
||||
storage.offsets[cluster].consumerLock.Lock()
|
||||
|
||||
if _, ok := storage.offsets[cluster].consumer[group]; ok {
|
||||
log.Infof("Removing group %s from cluster %s by request", group, cluster)
|
||||
delete(storage.offsets[cluster].consumer, group)
|
||||
resultChannel <- StatusOK
|
||||
resultChannel <- protocol.StatusOK
|
||||
} else {
|
||||
resultChannel <- StatusNotFound
|
||||
resultChannel <- protocol.StatusNotFound
|
||||
}
|
||||
|
||||
storage.offsets[cluster].consumerLock.Unlock()
|
||||
|
@ -395,13 +332,13 @@ func (storage *OffsetStorage) dropGroup(cluster string, group string, resultChan
|
|||
// consumer has stopped committing offsets for that partition (error), unless
|
||||
// Rule 5: If the lag is -1, this is a special value that means there is no broker offset yet. Consider it good (will get caught in the next refresh of topics)
|
||||
// Rule 6: If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error)
|
||||
func (storage *OffsetStorage) evaluateGroup(cluster string, group string, resultChannel chan *ConsumerGroupStatus, showall bool) {
|
||||
status := &ConsumerGroupStatus{
|
||||
func (storage *OffsetStorage) evaluateGroup(cluster string, group string, resultChannel chan *protocol.ConsumerGroupStatus, showall bool) {
|
||||
status := &protocol.ConsumerGroupStatus{
|
||||
Cluster: cluster,
|
||||
Group: group,
|
||||
Status: StatusNotFound,
|
||||
Status: protocol.StatusNotFound,
|
||||
Complete: true,
|
||||
Partitions: make([]*PartitionStatus, 0),
|
||||
Partitions: make([]*protocol.PartitionStatus, 0),
|
||||
Maxlag: nil,
|
||||
TotalLag: 0,
|
||||
}
|
||||
|
@ -423,11 +360,11 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
}
|
||||
|
||||
// Scan the offsets table once and store all the offsets for the group locally
|
||||
status.Status = StatusOK
|
||||
offsetList := make(map[string][][]ConsumerOffset, len(consumerMap))
|
||||
status.Status = protocol.StatusOK
|
||||
offsetList := make(map[string][][]protocol.ConsumerOffset, len(consumerMap))
|
||||
var youngestOffset int64
|
||||
for topic, partitions := range consumerMap {
|
||||
offsetList[topic] = make([][]ConsumerOffset, len(partitions))
|
||||
offsetList[topic] = make([][]protocol.ConsumerOffset, len(partitions))
|
||||
for partition, offsetRing := range partitions {
|
||||
status.TotalPartitions += 1
|
||||
|
||||
|
@ -438,13 +375,13 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
}
|
||||
|
||||
// Add an artificial offset commit if the consumer has no lag against the current broker offset
|
||||
lastOffset := offsetRing.Prev().Value.(*ConsumerOffset)
|
||||
lastOffset := offsetRing.Prev().Value.(*protocol.ConsumerOffset)
|
||||
if lastOffset.Offset >= clusterMap.broker[topic][partition].Offset {
|
||||
ringval, _ := offsetRing.Value.(*ConsumerOffset)
|
||||
ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset)
|
||||
ringval.Offset = lastOffset.Offset
|
||||
ringval.Timestamp = time.Now().Unix() * 1000
|
||||
ringval.Lag = 0
|
||||
ringval.artificial = true
|
||||
ringval.Artificial = true
|
||||
partitions[partition] = partitions[partition].Next()
|
||||
|
||||
log.Tracef("Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=0",
|
||||
|
@ -452,12 +389,12 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
}
|
||||
|
||||
// Pull out the offsets once so we can unlock the map
|
||||
offsetList[topic][partition] = make([]ConsumerOffset, storage.app.Config.Lagcheck.Intervals)
|
||||
offsetList[topic][partition] = make([]protocol.ConsumerOffset, storage.app.Config.Lagcheck.Intervals)
|
||||
partitionMap := offsetList[topic][partition]
|
||||
idx := -1
|
||||
partitions[partition].Do(func(val interface{}) {
|
||||
idx += 1
|
||||
ptr, _ := val.(*ConsumerOffset)
|
||||
ptr, _ := val.(*protocol.ConsumerOffset)
|
||||
partitionMap[idx] = *ptr
|
||||
|
||||
// Track the youngest offset we have found to check expiration
|
||||
|
@ -475,7 +412,7 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
clusterMap.consumerLock.Unlock()
|
||||
|
||||
// Return the group as a 404
|
||||
status.Status = StatusNotFound
|
||||
status.Status = protocol.StatusNotFound
|
||||
resultChannel <- status
|
||||
return
|
||||
}
|
||||
|
@ -499,10 +436,10 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
}
|
||||
|
||||
// We may always add this partition, so create it once
|
||||
thispart := &PartitionStatus{
|
||||
thispart := &protocol.PartitionStatus{
|
||||
Topic: topic,
|
||||
Partition: int32(partition),
|
||||
Status: StatusOK,
|
||||
Status: protocol.StatusOK,
|
||||
Start: firstOffset,
|
||||
End: lastOffset,
|
||||
}
|
||||
|
@ -516,8 +453,8 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
|
||||
// Rule 4 - Offsets haven't been committed in a while
|
||||
if ((time.Now().Unix() * 1000) - lastOffset.Timestamp) > (lastOffset.Timestamp - firstOffset.Timestamp) {
|
||||
status.Status = StatusError
|
||||
thispart.Status = StatusStop
|
||||
status.Status = protocol.StatusError
|
||||
thispart.Status = protocol.StatusStop
|
||||
status.Partitions = append(status.Partitions, thispart)
|
||||
continue
|
||||
}
|
||||
|
@ -526,8 +463,8 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
// We check this first because we always want to know about a rewind - it's bad behavior
|
||||
for i := 1; i <= maxidx; i++ {
|
||||
if offsets[i].Offset < offsets[i-1].Offset {
|
||||
status.Status = StatusError
|
||||
thispart.Status = StatusRewind
|
||||
status.Status = protocol.StatusError
|
||||
thispart.Status = protocol.StatusRewind
|
||||
status.Partitions = append(status.Partitions, thispart)
|
||||
continue
|
||||
}
|
||||
|
@ -550,8 +487,8 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
}
|
||||
|
||||
// Rule 2
|
||||
status.Status = StatusError
|
||||
thispart.Status = StatusStall
|
||||
status.Status = protocol.StatusError
|
||||
thispart.Status = protocol.StatusStall
|
||||
} else {
|
||||
// Rule 1 passes, or shortcut a full check on Rule 3 if we can
|
||||
if (firstOffset.Lag == 0) || (lastOffset.Lag <= firstOffset.Lag) {
|
||||
|
@ -572,15 +509,15 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
|
|||
|
||||
if !lagDropped {
|
||||
// Rule 3
|
||||
if status.Status == StatusOK {
|
||||
status.Status = StatusWarning
|
||||
if status.Status == protocol.StatusOK {
|
||||
status.Status = protocol.StatusWarning
|
||||
}
|
||||
thispart.Status = StatusWarning
|
||||
thispart.Status = protocol.StatusWarning
|
||||
}
|
||||
}
|
||||
|
||||
// Always add the partition if it's not OK
|
||||
if (thispart.Status != StatusOK) || showall {
|
||||
if (thispart.Status != protocol.StatusOK) || showall {
|
||||
status.Partitions = append(status.Partitions, thispart)
|
||||
}
|
||||
}
|
||||
|
@ -681,7 +618,7 @@ func (storage *OffsetStorage) requestOffsets(request *RequestOffsets) {
|
|||
if oring == nil {
|
||||
response.OffsetList[partition] = -1
|
||||
} else {
|
||||
offset, _ := oring.Prev().Value.(*ConsumerOffset)
|
||||
offset, _ := oring.Prev().Value.(*protocol.ConsumerOffset)
|
||||
if offset == nil {
|
||||
response.OffsetList[partition] = -1
|
||||
} else {
|
||||
|
@ -731,8 +668,8 @@ func (storage *OffsetStorage) debugPrintGroup(cluster string, group string) {
|
|||
if val == nil {
|
||||
ringStr += "(),"
|
||||
} else {
|
||||
ptr, _ := val.(*ConsumerOffset)
|
||||
ringStr += fmt.Sprintf("(%v,%v,%v,%v)", ptr.Timestamp, ptr.Offset, ptr.Lag, ptr.artificial)
|
||||
ptr, _ := val.(*protocol.ConsumerOffset)
|
||||
ringStr += fmt.Sprintf("(%v,%v,%v,%v)", ptr.Timestamp, ptr.Offset, ptr.Lag, ptr.Artificial)
|
||||
}
|
||||
})
|
||||
log.Debugf("Detail cluster=%s,group=%s,topic=%s,partition=%v: %s", cluster, group, topic, partition, ringStr)
|
||||
|
|
|
@ -1,3 +1,13 @@
|
|||
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
|
||||
package protocol
|
||||
|
||||
import (
|
||||
|
|
3
storm.go
3
storm.go
|
@ -14,6 +14,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"github.com/samuel/go-zookeeper/zk"
|
||||
"math/rand"
|
||||
"regexp"
|
||||
|
@ -147,7 +148,7 @@ func (stormClient *StormClient) getOffsetsForPartition(consumerGroup string, par
|
|||
switch {
|
||||
case errConversion == nil:
|
||||
log.Debugf("About to sync Storm offset: [%s,%s,%v]::[%v,%v]\n", consumerGroup, topic, partition, offset, zkNodeStat.Mtime)
|
||||
partitionOffset := &PartitionOffset{
|
||||
partitionOffset := &protocol.PartitionOffset{
|
||||
Cluster: stormClient.cluster,
|
||||
Topic: topic,
|
||||
Partition: int32(partition),
|
||||
|
|
|
@ -12,6 +12,7 @@ package main
|
|||
|
||||
import (
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/linkedin/Burrow/protocol"
|
||||
"github.com/samuel/go-zookeeper/zk"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
|
@ -181,7 +182,7 @@ func (zkClient *ZookeeperClient) getOffsetForPartition(consumerGroup string, top
|
|||
return
|
||||
}
|
||||
|
||||
partitionOffset := &PartitionOffset{
|
||||
partitionOffset := &protocol.PartitionOffset{
|
||||
Cluster: zkClient.cluster,
|
||||
Topic: topic,
|
||||
Partition: int32(partitionNum),
|
||||
|
|
Загрузка…
Ссылка в новой задаче