Merge pull request #46 from Microsoft/soumyajitsahu_kafkanetfix

1) Fix log loss scenario during rebalance, 2) use api version 0 inste…
This commit is contained in:
Razvan Iamandii 2016-04-19 11:52:51 -07:00
Родитель a606a4555e 186b9cd2c3
Коммит bdb85152dd
5 изменённых файлов: 14 добавлений и 55 удалений

Просмотреть файл

@ -247,7 +247,7 @@ namespace Kafka.Client.Consumers
currentTopicInfo = currentDataChunk.TopicInfo;
Logger.DebugFormat("CurrentTopicInfo: ConsumedOffset({0}), FetchOffset({1})",
currentTopicInfo.ConsumeOffset, currentTopicInfo.FetchOffset);
if (currentTopicInfo.ConsumeOffset != currentDataChunk.FetchOffset)
if (currentTopicInfo.FetchOffset < currentDataChunk.FetchOffset)
{
Logger.ErrorFormat("consumed offset: {0} doesn't match fetch offset: {1} for {2}; consumer may lose data",
currentTopicInfo.ConsumeOffset,

Просмотреть файл

@ -232,8 +232,8 @@ namespace Kafka.Client.Consumers
Logger.InfoFormat("{2} : Updating fetch offset = {0} with value = {1}", this.fetchedOffset, offset, this.PartitionId);
this.chunkQueue.Add(new FetchedDataChunk(messages, this, this.fetchedOffset));
long newOffset = Interlocked.Exchange(ref this.fetchedOffset, offset);
Logger.Debug("Updated fetch offset of " + this + " to " + newOffset);
Interlocked.Exchange(ref this.fetchedOffset, offset);
Logger.Debug("Updated fetch offset of " + this + " to " + offset);
}
return size;

Просмотреть файл

@ -316,7 +316,7 @@ namespace Kafka.Client.Messages
return AllDone();
}
Message newMessage = this.Messages.ToList()[topIterPosition];
Message newMessage = this.Messages.ElementAt(topIterPosition);
lastMessageSize = newMessage.Size;
topIterPosition++;
switch (newMessage.CompressionCodec)

Просмотреть файл

@ -56,7 +56,7 @@ namespace Kafka.Client.Requests
public const byte DefaultMinBytesSize = 4;
public const byte DefaultOffsetInfoSizeSize = 4;
public const short CurrentVersion = 1;
public const short CurrentVersion = 0;
public FetchRequest(int correlationId, string clientId, int maxWait, int minBytes, Dictionary<string, List<PartitionFetchInfo>> fetchInfos)
{

Просмотреть файл

@ -496,34 +496,19 @@ namespace Kafka.Client.ZooKeeperIntegration.Listeners
}
var leader = leaderOpt.Value;
var znode = topicDirs.ConsumerOffsetDir + "/" + partition;
var offsetString = this.zkClient.ReadData<string>(znode, true);
var offsetCommitedString = this.zkClient.ReadData<string>(znode, true);
//if first time starting a consumer, set the initial offset based on the config
long offset = 0;
long offsetCommited = 0;
if (offsetString == null)
long offset = -1;
long offsetCommited = -1;
if (offsetCommitedString != null)
{
switch (config.AutoOffsetReset)
{
case OffsetRequest.SmallestTime:
offset = this.EarliestOrLatestOffset(topic, leader, partitionId, OffsetRequest.EarliestTime);
break;
case OffsetRequest.LargestTime:
offset = this.EarliestOrLatestOffset(topic, leader, partitionId, OffsetRequest.LatestTime);
break;
default:
throw new ConfigurationErrorsException("Wrong value in autoOffsetReset in ConsumerConfig");
}
offsetCommited = Math.Max(offset - 1, 0);
}
else
{
offsetCommited = long.Parse(offsetString);
long latestOffset = this.EarliestOrLatestOffset(topic, leader, partitionId, OffsetRequest.LatestTime);
offset = Math.Min(offsetCommited + 1, latestOffset);
Logger.InfoFormat("Final offset {0} for topic {1} partition {2} OffsetCommited {3} latestOffset {4}"
, offset, topic, partition, offsetCommited, latestOffset);
offsetCommited = long.Parse(offsetCommitedString);
offset = offsetCommited + 1;
}
Logger.InfoFormat("Final offset {0} for topic {1} partition {2} OffsetCommited {3}"
, offset, topic, partition, offsetCommited);
var queue = this.queues[new Tuple<string, string>(topic, consumerThreadId)];
var partTopicInfo = new PartitionTopicInfo(
@ -540,32 +525,6 @@ namespace Kafka.Client.ZooKeeperIntegration.Listeners
Logger.InfoFormat("{0} selected new offset {1}", partTopicInfo, offset);
}
private long EarliestOrLatestOffset(string topic, int brokerId, int partitionId, long earliestIoLatest)
{
Consumer consumer = null;
long producedOffset = -1;
try
{
var cluster = new Cluster(this.zkClient);
var broker = cluster.GetBroker(brokerId);
if (broker == null)
{
throw new IllegalStateException(string.Format("Broker {0} is unavailable. Cannot issue GetOffsetsBefore request", brokerId));
}
consumer = new Consumer(this.config, broker.Host, broker.Port);
var requestInfos = new Dictionary<string, List<PartitionOffsetRequestInfo>>();
requestInfos[topic] = new List<PartitionOffsetRequestInfo>() { new PartitionOffsetRequestInfo(partitionId, earliestIoLatest, 1) };
var offsets = consumer.GetOffsetsBefore(new OffsetRequest(requestInfos));
producedOffset = offsets.ResponseMap[topic].First().Offsets[0];
}
catch (Exception ex)
{
Logger.ErrorFormat("error in EarliestOrLatestOffset() : {0}", ex.FormatException());
}
return producedOffset;
}
private void ReleasePartitionOwnership(IDictionary<string, IList<string>> topicThreadIdsMap)
{