Burrow/zookeeper.go

227 строки
7.6 KiB
Go

/* Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package main
import (
log "github.com/cihub/seelog"
"github.com/linkedin/Burrow/protocol"
"github.com/samuel/go-zookeeper/zk"
"math/rand"
"strconv"
"sync"
"time"
)
type ZookeeperClient struct {
app *ApplicationContext
cluster string
conn *zk.Conn
zkRefreshTicker *time.Ticker
zkGroupList map[string]bool
zkGroupLock sync.RWMutex
}
func NewZookeeperClient(app *ApplicationContext, cluster string) (*ZookeeperClient, error) {
zkconn, _, err := zk.Connect(app.Config.Kafka[cluster].Zookeepers, time.Duration(app.Config.Zookeeper.Timeout)*time.Second)
if err != nil {
return nil, err
}
client := &ZookeeperClient{
app: app,
cluster: cluster,
conn: zkconn,
zkGroupLock: sync.RWMutex{},
zkGroupList: make(map[string]bool),
}
// Check if this cluster is configured to check Zookeeper consumer offsets
if client.app.Config.Kafka[cluster].ZKOffsets {
// Get a group list to start with (this will start the offset checkers)
client.refreshConsumerGroups()
// Set a ticker to refresh the group list periodically
client.zkRefreshTicker = time.NewTicker(time.Duration(client.app.Config.Lagcheck.ZKGroupRefresh) * time.Second)
go func() {
for _ = range client.zkRefreshTicker.C {
client.refreshConsumerGroups()
}
}()
}
return client, nil
}
func (zkClient *ZookeeperClient) Stop() {
if zkClient.zkRefreshTicker != nil {
zkClient.zkRefreshTicker.Stop()
zkClient.zkGroupLock.Lock()
zkClient.zkGroupList = make(map[string]bool)
zkClient.zkGroupLock.Unlock()
}
zkClient.conn.Close()
}
func (zkClient *ZookeeperClient) refreshConsumerGroups() {
zkClient.zkGroupLock.Lock()
defer zkClient.zkGroupLock.Unlock()
consumerGroups, _, err := zkClient.conn.Children(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers")
if err != nil {
// Can't read the consumers path. Bail for now
log.Errorf("Cannot get consumer group list for cluster %s: %s", zkClient.cluster, err)
return
}
// Mark all existing groups false
for consumerGroup := range zkClient.zkGroupList {
zkClient.zkGroupList[consumerGroup] = false
}
// Check for new groups, mark existing groups true
for _, consumerGroup := range consumerGroups {
// Don't bother adding groups in the blacklist
if (zkClient.app.Storage.groupBlacklist != nil) && zkClient.app.Storage.groupBlacklist.MatchString(consumerGroup) {
continue
}
if _, ok := zkClient.zkGroupList[consumerGroup]; !ok {
// Add new consumer group and start it
log.Debugf("Add ZK consumer group %s to cluster %s", consumerGroup, zkClient.cluster)
go zkClient.startConsumerGroupChecker(consumerGroup)
}
zkClient.zkGroupList[consumerGroup] = true
}
// Delete groups that are still false
for consumerGroup := range zkClient.zkGroupList {
if !zkClient.zkGroupList[consumerGroup] {
log.Debugf("Remove ZK consumer group %s from cluster %s", consumerGroup, zkClient.cluster)
delete(zkClient.zkGroupList, consumerGroup)
}
}
}
func (zkClient *ZookeeperClient) startConsumerGroupChecker(consumerGroup string) {
// Sleep for a random portion of the check interval
time.Sleep(time.Duration(rand.Int63n(zkClient.app.Config.Lagcheck.ZKCheck*1000)) * time.Millisecond)
for {
// Make sure this group still exists
zkClient.zkGroupLock.RLock()
if _, ok := zkClient.zkGroupList[consumerGroup]; !ok {
zkClient.zkGroupLock.RUnlock()
log.Debugf("Stopping checker for ZK consumer group %s in cluster %s", consumerGroup, zkClient.cluster)
break
}
zkClient.zkGroupLock.RUnlock()
// Check this group's offsets
log.Debugf("Get ZK offsets for group %s in cluster %s", consumerGroup, zkClient.cluster)
go zkClient.getOffsetsForConsumerGroup(consumerGroup)
// Sleep for the check interval
time.Sleep(time.Duration(zkClient.app.Config.Lagcheck.ZKCheck) * time.Second)
}
}
func (zkClient *ZookeeperClient) getOffsetsForConsumerGroup(consumerGroup string) {
topics, _, err := zkClient.conn.Children(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers/" + consumerGroup + "/offsets")
switch {
case err == nil:
// Spawn a goroutine for each topic. This provides parallelism for multi-topic consumers
for _, topic := range topics {
go zkClient.getOffsetsForTopic(consumerGroup, topic)
}
case err == zk.ErrNoNode:
// If the node doesn't exist, it may be because the group is using Kafka-committed offsets. Skip it
log.Debugf("Skip checking ZK offsets for group %s in cluster %s as the offsets path doesn't exist", consumerGroup, zkClient.cluster)
default:
log.Warnf("Cannot read topics for group %s in cluster %s: %s", consumerGroup, zkClient.cluster, err)
}
}
func (zkClient *ZookeeperClient) getOffsetsForTopic(consumerGroup string, topic string) {
partitions, _, err := zkClient.conn.Children(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers/" + consumerGroup + "/offsets/" + topic)
if err != nil {
log.Warnf("Cannot read partitions for topic %s for group %s in cluster %s: %s", topic, consumerGroup, zkClient.cluster, err)
return
}
// Spawn a goroutine for each partition
for _, partition := range partitions {
go zkClient.getOffsetForPartition(consumerGroup, topic, partition)
}
}
func (zkClient *ZookeeperClient) getOffsetForPartition(consumerGroup string, topic string, partition string) {
offsetStr, zkNodeStat, err := zkClient.conn.Get(zkClient.app.Config.Kafka[zkClient.cluster].ZookeeperPath + "/consumers/" + consumerGroup + "/offsets/" + topic + "/" + partition)
if err != nil {
log.Warnf("Failed to read partition %s:%v for group %s in cluster %s: %s", topic, partition, consumerGroup, zkClient.cluster, err)
return
}
partitionNum, err := strconv.ParseInt(partition, 10, 32)
if err != nil {
log.Errorf("Partition (%s) for topic %s for group %s in cluster %s is not an integer", partition, topic, consumerGroup, zkClient.cluster)
return
}
offset, err := strconv.ParseInt(string(offsetStr), 10, 64)
if err != nil {
log.Errorf("Offset value (%s) for partition %s:%v for group %s in cluster %s is not an integer", string(offsetStr), topic, partition, consumerGroup, zkClient.cluster)
return
}
partitionOffset := &protocol.PartitionOffset{
Cluster: zkClient.cluster,
Topic: topic,
Partition: int32(partitionNum),
Group: consumerGroup,
Timestamp: zkNodeStat.Mtime,
Offset: offset,
}
timeoutSendOffset(zkClient.app.Storage.offsetChannel, partitionOffset, 1)
}
func (zkClient *ZookeeperClient) NewLock(path string) *zk.Lock {
// Pass through to the connection NewLock, without ACLs
return zk.NewLock(zkClient.conn, path, make([]zk.ACL, 0))
}
func (zkClient *ZookeeperClient) RecursiveDelete(path string) {
if path == "/" {
panic("Do not try and delete the root znode")
}
children, _, err := zkClient.conn.Children(path)
switch {
case err == nil:
for _, child := range children {
zkClient.RecursiveDelete(path + "/" + child)
}
case err == zk.ErrNoNode:
return
default:
panic(err)
}
_, stat, err := zkClient.conn.Get(path)
if (err != nil) && (err != zk.ErrNoNode) {
panic(err)
}
err = zkClient.conn.Delete(path, stat.Version)
if (err != nil) && (err != zk.ErrNoNode) {
panic(err)
}
}