зеркало из https://github.com/microsoft/Burrow.git
227 строки
7.6 KiB
Go
227 строки
7.6 KiB
Go
/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
|
|
* 2.0 (the "License"); you may not use this file except in compliance with
|
|
* the License. You may obtain a copy of the License at
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
log "github.com/cihub/seelog"
|
|
"github.com/linkedin/Burrow/protocol"
|
|
"github.com/samuel/go-zookeeper/zk"
|
|
"math/rand"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type ZookeeperClient struct {
|
|
app *ApplicationContext
|
|
cluster string
|
|
conn *zk.Conn
|
|
zkRefreshTicker *time.Ticker
|
|
zkGroupList map[string]bool
|
|
zkGroupLock sync.RWMutex
|
|
}
|
|
|
|
func NewZookeeperClient(app *ApplicationContext, cluster string) (*ZookeeperClient, error) {
|
|
zkconn, _, err := zk.Connect(app.Config.Kafka[cluster].Zookeepers, time.Duration(app.Config.Zookeeper.Timeout)*time.Second)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client := &ZookeeperClient{
|
|
app: app,
|
|
cluster: cluster,
|
|
conn: zkconn,
|
|
zkGroupLock: sync.RWMutex{},
|
|
zkGroupList: make(map[string]bool),
|
|
}
|
|
|
|
// Check if this cluster is configured to check Zookeeper consumer offsets
|
|
if client.app.Config.Kafka[cluster].ZKOffsets {
|
|
// Get a group list to start with (this will start the offset checkers)
|
|
client.refreshConsumerGroups()
|
|
|
|
// Set a ticker to refresh the group list periodically
|
|
client.zkRefreshTicker = time.NewTicker(time.Duration(client.app.Config.Lagcheck.ZKGroupRefresh) * time.Second)
|
|
go func() {
|
|
for _ = range client.zkRefreshTicker.C {
|
|
client.refreshConsumerGroups()
|
|
}
|
|
}()
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) Stop() {
|
|
if zkClient.zkRefreshTicker != nil {
|
|
zkClient.zkRefreshTicker.Stop()
|
|
zkClient.zkGroupLock.Lock()
|
|
zkClient.zkGroupList = make(map[string]bool)
|
|
zkClient.zkGroupLock.Unlock()
|
|
}
|
|
|
|
zkClient.conn.Close()
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) refreshConsumerGroups() {
|
|
zkClient.zkGroupLock.Lock()
|
|
defer zkClient.zkGroupLock.Unlock()
|
|
|
|
consumerGroups, _, err := zkClient.conn.Children(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers")
|
|
if err != nil {
|
|
// Can't read the consumers path. Bail for now
|
|
log.Errorf("Cannot get consumer group list for cluster %s: %s", zkClient.cluster, err)
|
|
return
|
|
}
|
|
|
|
// Mark all existing groups false
|
|
for consumerGroup := range zkClient.zkGroupList {
|
|
zkClient.zkGroupList[consumerGroup] = false
|
|
}
|
|
|
|
// Check for new groups, mark existing groups true
|
|
for _, consumerGroup := range consumerGroups {
|
|
// Don't bother adding groups in the blacklist
|
|
if (zkClient.app.Storage.groupBlacklist != nil) && zkClient.app.Storage.groupBlacklist.MatchString(consumerGroup) {
|
|
continue
|
|
}
|
|
|
|
if _, ok := zkClient.zkGroupList[consumerGroup]; !ok {
|
|
// Add new consumer group and start it
|
|
log.Debugf("Add ZK consumer group %s to cluster %s", consumerGroup, zkClient.cluster)
|
|
go zkClient.startConsumerGroupChecker(consumerGroup)
|
|
}
|
|
zkClient.zkGroupList[consumerGroup] = true
|
|
}
|
|
|
|
// Delete groups that are still false
|
|
for consumerGroup := range zkClient.zkGroupList {
|
|
if !zkClient.zkGroupList[consumerGroup] {
|
|
log.Debugf("Remove ZK consumer group %s from cluster %s", consumerGroup, zkClient.cluster)
|
|
delete(zkClient.zkGroupList, consumerGroup)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) startConsumerGroupChecker(consumerGroup string) {
|
|
// Sleep for a random portion of the check interval
|
|
time.Sleep(time.Duration(rand.Int63n(zkClient.app.Config.Lagcheck.ZKCheck*1000)) * time.Millisecond)
|
|
|
|
for {
|
|
// Make sure this group still exists
|
|
zkClient.zkGroupLock.RLock()
|
|
if _, ok := zkClient.zkGroupList[consumerGroup]; !ok {
|
|
zkClient.zkGroupLock.RUnlock()
|
|
log.Debugf("Stopping checker for ZK consumer group %s in cluster %s", consumerGroup, zkClient.cluster)
|
|
break
|
|
}
|
|
zkClient.zkGroupLock.RUnlock()
|
|
|
|
// Check this group's offsets
|
|
log.Debugf("Get ZK offsets for group %s in cluster %s", consumerGroup, zkClient.cluster)
|
|
go zkClient.getOffsetsForConsumerGroup(consumerGroup)
|
|
|
|
// Sleep for the check interval
|
|
time.Sleep(time.Duration(zkClient.app.Config.Lagcheck.ZKCheck) * time.Second)
|
|
}
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) getOffsetsForConsumerGroup(consumerGroup string) {
|
|
topics, _, err := zkClient.conn.Children(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers/" + consumerGroup + "/offsets")
|
|
switch {
|
|
case err == nil:
|
|
// Spawn a goroutine for each topic. This provides parallelism for multi-topic consumers
|
|
for _, topic := range topics {
|
|
go zkClient.getOffsetsForTopic(consumerGroup, topic)
|
|
}
|
|
case err == zk.ErrNoNode:
|
|
// If the node doesn't exist, it may be because the group is using Kafka-committed offsets. Skip it
|
|
log.Debugf("Skip checking ZK offsets for group %s in cluster %s as the offsets path doesn't exist", consumerGroup, zkClient.cluster)
|
|
default:
|
|
log.Warnf("Cannot read topics for group %s in cluster %s: %s", consumerGroup, zkClient.cluster, err)
|
|
}
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) getOffsetsForTopic(consumerGroup string, topic string) {
|
|
partitions, _, err := zkClient.conn.Children(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers/" + consumerGroup + "/offsets/" + topic)
|
|
if err != nil {
|
|
log.Warnf("Cannot read partitions for topic %s for group %s in cluster %s: %s", topic, consumerGroup, zkClient.cluster, err)
|
|
return
|
|
}
|
|
|
|
// Spawn a goroutine for each partition
|
|
for _, partition := range partitions {
|
|
go zkClient.getOffsetForPartition(consumerGroup, topic, partition)
|
|
}
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) getOffsetForPartition(consumerGroup string, topic string, partition string) {
|
|
offsetStr, zkNodeStat, err := zkClient.conn.Get(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers/" + consumerGroup + "/offsets/" + topic + "/" + partition)
|
|
if err != nil {
|
|
log.Warnf("Failed to read partition %s:%v for group %s in cluster %s: %s", topic, partition, consumerGroup, zkClient.cluster, err)
|
|
return
|
|
}
|
|
|
|
partitionNum, err := strconv.ParseInt(partition, 10, 32)
|
|
if err != nil {
|
|
log.Errorf("Partition (%s) for topic %s for group %s in cluster %s is not an integer", partition, topic, consumerGroup, zkClient.cluster)
|
|
return
|
|
}
|
|
|
|
offset, err := strconv.ParseInt(string(offsetStr), 10, 64)
|
|
if err != nil {
|
|
log.Errorf("Offset value (%s) for partition %s:%v for group %s in cluster %s is not an integer", string(offsetStr), topic, partition, consumerGroup, zkClient.cluster)
|
|
return
|
|
}
|
|
|
|
partitionOffset := &protocol.PartitionOffset{
|
|
Cluster: zkClient.cluster,
|
|
Topic: topic,
|
|
Partition: int32(partitionNum),
|
|
Group: consumerGroup,
|
|
Timestamp: zkNodeStat.Mtime,
|
|
Offset: offset,
|
|
}
|
|
timeoutSendOffset(zkClient.app.Storage.offsetChannel, partitionOffset, 1)
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) NewLock(path string) *zk.Lock {
|
|
// Pass through to the connection NewLock, without ACLs
|
|
return zk.NewLock(zkClient.conn, path, make([]zk.ACL, 0))
|
|
}
|
|
|
|
func (zkClient *ZookeeperClient) RecursiveDelete(path string) {
|
|
if path == "/" {
|
|
panic("Do not try and delete the root znode")
|
|
}
|
|
|
|
children, _, err := zkClient.conn.Children(path)
|
|
switch {
|
|
case err == nil:
|
|
for _, child := range children {
|
|
zkClient.RecursiveDelete(path + "/" + child)
|
|
}
|
|
case err == zk.ErrNoNode:
|
|
return
|
|
default:
|
|
panic(err)
|
|
}
|
|
|
|
_, stat, err := zkClient.conn.Get(path)
|
|
if (err != nil) && (err != zk.ErrNoNode) {
|
|
panic(err)
|
|
}
|
|
err = zkClient.conn.Delete(path, stat.Version)
|
|
if (err != nil) && (err != zk.ErrNoNode) {
|
|
panic(err)
|
|
}
|
|
}
|