From 186b9cd2c39d7e3b93404542259c337abe2ac71e Mon Sep 17 00:00:00 2001 From: Som Sahu Date: Mon, 4 Apr 2016 11:22:21 -0700 Subject: [PATCH] 1) Fix log loss scenario during rebalance, 2) use api version 0 instead of 1, 3) Some minor performance improvement and bug fixes --- .../Consumers/ConsumerIterator.cs | 2 +- .../Consumers/PartitionTopicInfo.cs | 4 +- .../Messages/BufferedMessageSet.cs | 2 +- src/KafkaNET.Library/Requests/FetchRequest.cs | 2 +- .../Listeners/ZKRebalancerListener.cs | 59 +++---------------- 5 files changed, 14 insertions(+), 55 deletions(-) diff --git a/src/KafkaNET.Library/Consumers/ConsumerIterator.cs b/src/KafkaNET.Library/Consumers/ConsumerIterator.cs index 28e08c4..99c0345 100644 --- a/src/KafkaNET.Library/Consumers/ConsumerIterator.cs +++ b/src/KafkaNET.Library/Consumers/ConsumerIterator.cs @@ -247,7 +247,7 @@ namespace Kafka.Client.Consumers currentTopicInfo = currentDataChunk.TopicInfo; Logger.DebugFormat("CurrentTopicInfo: ConsumedOffset({0}), FetchOffset({1})", currentTopicInfo.ConsumeOffset, currentTopicInfo.FetchOffset); - if (currentTopicInfo.ConsumeOffset != currentDataChunk.FetchOffset) + if (currentTopicInfo.FetchOffset < currentDataChunk.FetchOffset) { Logger.ErrorFormat("consumed offset: {0} doesn't match fetch offset: {1} for {2}; consumer may lose data", currentTopicInfo.ConsumeOffset, diff --git a/src/KafkaNET.Library/Consumers/PartitionTopicInfo.cs b/src/KafkaNET.Library/Consumers/PartitionTopicInfo.cs index 4ea28c1..1690ac0 100644 --- a/src/KafkaNET.Library/Consumers/PartitionTopicInfo.cs +++ b/src/KafkaNET.Library/Consumers/PartitionTopicInfo.cs @@ -232,8 +232,8 @@ namespace Kafka.Client.Consumers Logger.InfoFormat("{2} : Updating fetch offset = {0} with value = {1}", this.fetchedOffset, offset, this.PartitionId); this.chunkQueue.Add(new FetchedDataChunk(messages, this, this.fetchedOffset)); - long newOffset = Interlocked.Exchange(ref this.fetchedOffset, offset); - Logger.Debug("Updated fetch offset of " + this + " to " + newOffset); + Interlocked.Exchange(ref this.fetchedOffset, offset); + Logger.Debug("Updated fetch offset of " + this + " to " + offset); } return size; diff --git a/src/KafkaNET.Library/Messages/BufferedMessageSet.cs b/src/KafkaNET.Library/Messages/BufferedMessageSet.cs index 725125a..878bb84 100644 --- a/src/KafkaNET.Library/Messages/BufferedMessageSet.cs +++ b/src/KafkaNET.Library/Messages/BufferedMessageSet.cs @@ -316,7 +316,7 @@ namespace Kafka.Client.Messages return AllDone(); } - Message newMessage = this.Messages.ToList()[topIterPosition]; + Message newMessage = this.Messages.ElementAt(topIterPosition); lastMessageSize = newMessage.Size; topIterPosition++; switch (newMessage.CompressionCodec) diff --git a/src/KafkaNET.Library/Requests/FetchRequest.cs b/src/KafkaNET.Library/Requests/FetchRequest.cs index 93c7878..ded1ed9 100644 --- a/src/KafkaNET.Library/Requests/FetchRequest.cs +++ b/src/KafkaNET.Library/Requests/FetchRequest.cs @@ -56,7 +56,7 @@ namespace Kafka.Client.Requests public const byte DefaultMinBytesSize = 4; public const byte DefaultOffsetInfoSizeSize = 4; - public const short CurrentVersion = 1; + public const short CurrentVersion = 0; public FetchRequest(int correlationId, string clientId, int maxWait, int minBytes, Dictionary> fetchInfos) { diff --git a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs index f1407bf..64c2210 100644 --- a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs +++ b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs @@ -496,34 +496,19 @@ namespace Kafka.Client.ZooKeeperIntegration.Listeners } var leader = leaderOpt.Value; var znode = topicDirs.ConsumerOffsetDir + "/" + partition; - var offsetString = this.zkClient.ReadData(znode, true); + + var offsetCommitedString = this.zkClient.ReadData(znode, true); //if first time starting a consumer, set the initial offset based on the config - long offset = 0; - long offsetCommited = 0; - if (offsetString == null) + long offset = -1; + long offsetCommited = -1; + if (offsetCommitedString != null) { - switch (config.AutoOffsetReset) - { - case OffsetRequest.SmallestTime: - offset = this.EarliestOrLatestOffset(topic, leader, partitionId, OffsetRequest.EarliestTime); - break; - case OffsetRequest.LargestTime: - offset = this.EarliestOrLatestOffset(topic, leader, partitionId, OffsetRequest.LatestTime); - break; - default: - throw new ConfigurationErrorsException("Wrong value in autoOffsetReset in ConsumerConfig"); - } - offsetCommited = Math.Max(offset - 1, 0); - } - else - { - offsetCommited = long.Parse(offsetString); - long latestOffset = this.EarliestOrLatestOffset(topic, leader, partitionId, OffsetRequest.LatestTime); - offset = Math.Min(offsetCommited + 1, latestOffset); - Logger.InfoFormat("Final offset {0} for topic {1} partition {2} OffsetCommited {3} latestOffset {4}" - , offset, topic, partition, offsetCommited, latestOffset); + offsetCommited = long.Parse(offsetCommitedString); + offset = offsetCommited + 1; } + Logger.InfoFormat("Final offset {0} for topic {1} partition {2} OffsetCommited {3}" + , offset, topic, partition, offsetCommited); var queue = this.queues[new Tuple(topic, consumerThreadId)]; var partTopicInfo = new PartitionTopicInfo( @@ -540,32 +525,6 @@ namespace Kafka.Client.ZooKeeperIntegration.Listeners Logger.InfoFormat("{0} selected new offset {1}", partTopicInfo, offset); } - private long EarliestOrLatestOffset(string topic, int brokerId, int partitionId, long earliestIoLatest) - { - Consumer consumer = null; - long producedOffset = -1; - try - { - var cluster = new Cluster(this.zkClient); - var broker = cluster.GetBroker(brokerId); - if (broker == null) - { - throw new IllegalStateException(string.Format("Broker {0} is unavailable. Cannot issue GetOffsetsBefore request", brokerId)); - } - consumer = new Consumer(this.config, broker.Host, broker.Port); - var requestInfos = new Dictionary>(); - requestInfos[topic] = new List() { new PartitionOffsetRequestInfo(partitionId, earliestIoLatest, 1) }; - var offsets = consumer.GetOffsetsBefore(new OffsetRequest(requestInfos)); - - producedOffset = offsets.ResponseMap[topic].First().Offsets[0]; - } - catch (Exception ex) - { - Logger.ErrorFormat("error in EarliestOrLatestOffset() : {0}", ex.FormatException()); - } - return producedOffset; - } - private void ReleasePartitionOwnership(IDictionary> topicThreadIdsMap) {