add rebalance finished event handler ZookeeperConsumerConnector
This commit is contained in:
Родитель
77a8133acd
Коммит
8dba0a6356
|
@ -21,6 +21,7 @@ namespace Kafka.Client.Consumers
|
|||
using Kafka.Client.Serialization;
|
||||
using Kafka.Client.Utils;
|
||||
using Kafka.Client.ZooKeeperIntegration;
|
||||
using Kafka.Client.ZooKeeperIntegration.Events;
|
||||
using Kafka.Client.ZooKeeperIntegration.Listeners;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
|
@ -33,6 +34,7 @@ namespace Kafka.Client.Consumers
|
|||
/// </summary>
|
||||
public class ZookeeperConsumerConnector : KafkaClientBase, IZookeeperConsumerConnector
|
||||
{
|
||||
private readonly EventHandler<ConsumerRebalanceEventArgs> consumerRebalanceFinishedHandler;
|
||||
public static log4net.ILog Logger = log4net.LogManager.GetLogger(typeof(ZookeeperConsumerConnector));
|
||||
public static readonly int MaxNRetries = 4;
|
||||
public static bool UseSharedStaticZookeeperClient = true;
|
||||
|
@ -74,7 +76,8 @@ namespace Kafka.Client.Consumers
|
|||
bool enableFetcher,
|
||||
EventHandler rebalanceHandler = null,
|
||||
EventHandler zkDisconnectedHandler = null,
|
||||
EventHandler zkExpiredHandler = null)
|
||||
EventHandler zkExpiredHandler = null,
|
||||
EventHandler<ConsumerRebalanceEventArgs> rebalanceFinishedHandler = null)
|
||||
{
|
||||
if (string.IsNullOrEmpty(config.GroupId))
|
||||
{
|
||||
|
@ -94,6 +97,7 @@ namespace Kafka.Client.Consumers
|
|||
this.consumerRebalanceHandler = rebalanceHandler;
|
||||
this.zkSessionDisconnectedHandler = zkDisconnectedHandler;
|
||||
this.zkSessionExpiredHandler = zkExpiredHandler;
|
||||
this.consumerRebalanceFinishedHandler = rebalanceFinishedHandler;
|
||||
|
||||
if (this.config.AutoCommit)
|
||||
{
|
||||
|
@ -596,6 +600,10 @@ namespace Kafka.Client.Consumers
|
|||
{
|
||||
loadBalancerListener.ConsumerRebalance += this.consumerRebalanceHandler;
|
||||
}
|
||||
if (this.consumerRebalanceFinishedHandler != null)
|
||||
{
|
||||
loadBalancerListener.ConsumerRebalanceFinished += this.consumerRebalanceFinishedHandler;
|
||||
}
|
||||
|
||||
stopAsyncRebalancing.Add(loadBalancerListener.StopRebalance);
|
||||
this.RegisterConsumerInZk(dirs, consumerIdString, topicCount);
|
||||
|
|
Загрузка…
Ссылка в новой задаче