This commit is contained in:
Razvan Iamandii 2016-04-14 13:50:20 -07:00
Родитель cc9e1f7908 0398697ffc
Коммит bc2085b0e6
14 изменённых файлов: 187 добавлений и 25 удалений

Просмотреть файл

@ -45,6 +45,12 @@ namespace Kafka.Client.Cfg
//fetch.message.max.bytes
public const int DefaultFetchSize = 11 * 1024 * 1024;
//fetch.min.bytes
public const int DefaultFetchMinBytes = 1;
//fetch.wait.max.ms
public const int DefaultMaxFetchWaitMs = 100;
public const int DefaultMaxFetchFactor = 10;
public const int DefaultBackOffIncrement = 1000;
@ -54,9 +60,9 @@ namespace Kafka.Client.Cfg
//socket.receive.buffer.bytes
public const int DefaultBufferSize = 11 * 1024 * 1024;
public const int DefaultSendTimeout = 5*1000;
public const int DefaultSendTimeout = 5 * 1000;
public const int DefaultReceiveTimeout = 5*1000;
public const int DefaultReceiveTimeout = 5 * 1000;
public const int DefaultReconnectInterval = 60 * 1000;
@ -79,6 +85,8 @@ namespace Kafka.Client.Cfg
this.AutoCommit = DefaultAutoCommit;
this.AutoCommitInterval = DefaultAutoCommitInterval;
this.FetchSize = DefaultFetchSize;
this.FetchMinBytes = DefaultFetchMinBytes;
this.MaxFetchWaitMs = DefaultMaxFetchWaitMs;
this.MaxFetchFactor = DefaultMaxFetchFactor;
this.BackOffIncrement = DefaultBackOffIncrement;
this.ConsumerId = GetHostName();
@ -124,7 +132,7 @@ namespace Kafka.Client.Cfg
this.ShutdownTimeout = config.ShutdownTimeout;
this.MaxFetchBufferLength = config.MaxFetchBufferLength;
this.ConsumeGroupRebalanceRetryIntervalMs = DefaultConsumeGroupRebalanceRetryIntervalMs;
this.ConsumeGroupFindNewLeaderSleepIntervalMs = ConsumeGroupFindNewLeaderSleepIntervalMs;
this.ConsumeGroupFindNewLeaderSleepIntervalMs = DefaultConsumeGroupFindNewLeaderSleepIntervalMs;
if (config.Broker.ElementInformation.IsPresent)
{
this.SetBrokerConfiguration(config.Broker);
@ -224,6 +232,20 @@ namespace Kafka.Client.Cfg
/// </summary>
public int FetchSize { get; set; }
/// <summary>
/// fetch.min.bytes -
/// The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
/// Default value: 1
/// </summary>
public int FetchMinBytes { get; set; }
/// <summary>
/// fetch.wait.max.ms -
/// The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes.
/// Default value: 100
/// </summary>
public int MaxFetchWaitMs { get; set; }
/// <summary>
/// Consumer Group API only. Zookeeper
/// </summary>
@ -280,7 +302,7 @@ namespace Kafka.Client.Cfg
/// Consumer Group API only. the time of sleep when no data to fetch. in milliseconds.
/// Default value: 1000
/// </summary>
public int BackOffIncrement { get; set; }
public int BackOffIncrement { get; set; }
/// <summary>
/// Consumer group only.
@ -296,7 +318,7 @@ namespace Kafka.Client.Cfg
{
//append ticks, so that consumerId is unqique, but sequential
//non-unique consumerId may lead to issues, when broker loses connection and restores it
consumerId = value +"_" + DateTime.UtcNow.Ticks;
consumerId = value + "_" + DateTime.UtcNow.Ticks;
}
}
private string consumerId;
@ -311,7 +333,7 @@ namespace Kafka.Client.Cfg
/// Consumer group only.
/// Default value: 2000ms
/// </summary>
public int ConsumeGroupFindNewLeaderSleepIntervalMs { get; set; }
public int ConsumeGroupFindNewLeaderSleepIntervalMs { get; set; }
#endregion

Просмотреть файл

@ -36,7 +36,7 @@ namespace Kafka.Client.Consumers
/// <remarks>
/// The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
/// </remarks>
public class ConsumerIterator<TData> : IEnumerator<TData>
public class ConsumerIterator<TData> : IConsumerIterator<TData>
{
public static log4net.ILog Logger = log4net.LogManager.GetLogger(typeof(ConsumerIterator<TData>));

Просмотреть файл

@ -90,8 +90,8 @@ namespace Kafka.Client.Consumers
new FetchRequestBuilder().
CorrelationId(reqId).
ClientId(_config.ConsumerId ?? _name).
MaxWait(0).
MinBytes(0);
MaxWait(_config.MaxFetchWaitMs).
MinBytes(_config.FetchMinBytes);
fetchablePartitionTopicInfos.ForEach(pti => builder.AddFetch(pti.Topic, pti.PartitionId, pti.NextRequestOffset, _config.FetchSize));
FetchRequest fetchRequest = builder.Build();

Просмотреть файл

@ -57,7 +57,8 @@ namespace Kafka.Client.Consumers
/// <param name="topic">The topic </param>
/// <param name="partition">The partition</param>
/// <param name="offset">The offset</param>
void CommitOffset(string topic, int partition, long offset);
/// <param name="setPosition">Indicates whether to set the fetcher's offset to the value committed. Default = true.</param>
void CommitOffset(string topic, int partition, long offset, bool setPosition = true);
/// <summary>
/// Return offsets of current ConsumerGroup

Просмотреть файл

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Consumers
{
using System;
using System.Collections.Generic;
public interface IConsumerIterator<TData> : IEnumerator<TData>, IDisposable
{
void ClearIterator();
}
}

Просмотреть файл

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Consumers
{
using System.Collections.Generic;
using System.Threading;
public interface IKafkaMessageStream<TData> : IEnumerable<TData>
{
IConsumerIterator<TData> iterator { get; }
int Count { get; }
IKafkaMessageStream<TData> GetCancellable(CancellationToken cancellationToken);
void Clear();
}
}

Просмотреть файл

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Consumers
{
using Kafka.Client.Serialization;
using System.Collections.Generic;
public interface IZookeeperConsumerConnector : IConsumerConnector
{
string ConsumerGroup { get; }
void AutoCommit();
string GetConsumerIdString();
IDictionary<string, IList<IKafkaMessageStream<TData>>> CreateMessageStreams<TData>(IDictionary<string, int> topicCountDict, IDecoder<TData> decoder);
IDictionary<string, IDictionary<int, PartitionTopicInfo>> GetCurrentOwnership();
void ReleaseAllPartitionOwnerships();
}
}

Просмотреть файл

@ -27,13 +27,13 @@ namespace Kafka.Client.Consumers
/// <summary>
/// This class is a thread-safe IEnumerable of <see cref="Message"/> that can be enumerated to get messages.
/// </summary>
public class KafkaMessageStream<TData> : IEnumerable<TData>
public class KafkaMessageStream<TData> : IKafkaMessageStream<TData>
{
private readonly BlockingCollection<FetchedDataChunk> queue;
private readonly int consumerTimeoutMs;
public ConsumerIterator<TData> iterator { get; private set; }
public IConsumerIterator<TData> iterator { get; private set; }
private string topic;
@ -57,7 +57,7 @@ namespace Kafka.Client.Consumers
this.iterator = new ConsumerIterator<TData>(topic, queue, consumerTimeoutMs, decoder, token);
}
public IEnumerable<TData> GetCancellable(CancellationToken cancellationToken)
public IKafkaMessageStream<TData> GetCancellable(CancellationToken cancellationToken)
{
return new KafkaMessageStream<TData>(this.topic, this.queue, this.consumerTimeoutMs, this.decoder, cancellationToken);
}

Просмотреть файл

@ -18,25 +18,20 @@
namespace Kafka.Client.Consumers
{
using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.Serialization;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration;
using Kafka.Client.ZooKeeperIntegration.Listeners;
using log4net;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net;
using System.Reflection;
using System.Linq;
/// <summary>
/// The consumer high-level API, that hides the details of brokers from the consumer.
/// It also maintains the state of what has been consumed.
/// </summary>
public class ZookeeperConsumerConnector : KafkaClientBase, IConsumerConnector
public class ZookeeperConsumerConnector : KafkaClientBase, IZookeeperConsumerConnector
{
public static log4net.ILog Logger = log4net.LogManager.GetLogger(typeof(ZookeeperConsumerConnector));
public static readonly int MaxNRetries = 4;
@ -215,7 +210,8 @@ namespace Kafka.Client.Consumers
/// <param name="topic"></param>
/// <param name="partition"></param>
/// <param name="offset"></param>
public void CommitOffset(string topic, int partition, long offset)
/// <param name="setPosition">Indicates whether to set the fetcher's offset to the value committed. Default = true.</param>
public void CommitOffset(string topic, int partition, long offset, bool setPosition = true)
{
this.EnsuresNotDisposed();
if (this.GetZkClient() == null)
@ -241,8 +237,11 @@ namespace Kafka.Client.Consumers
topicDirs.ConsumerOffsetDir + "/" +
partitionTopicInfo.PartitionId, offset.ToString());
partitionTopicInfo.CommitedOffset = offset;
partitionTopicInfo.ConsumeOffset = offset;
partitionTopicInfo.FetchOffset = offset;
if (setPosition)
{
partitionTopicInfo.ConsumeOffset = offset;
partitionTopicInfo.FetchOffset = offset;
}
}
catch (Exception ex)
{
@ -331,6 +330,15 @@ namespace Kafka.Client.Consumers
return this.Consume(topicCountDict, decoder);
}
IDictionary<string, IList<IKafkaMessageStream<TData>>> IZookeeperConsumerConnector.CreateMessageStreams<TData>(IDictionary<string, int> topicCountDict, IDecoder<TData> decoder)
{
return CreateMessageStreams(topicCountDict, decoder)
.ToDictionary(
kvp => kvp.Key,
kvp => (IList<IKafkaMessageStream<TData>>)kvp.Value.Cast<IKafkaMessageStream<TData>>().ToList()
);
}
public Dictionary<int, long> GetOffset(string topic)
{
Dictionary<int, long> offsets = new Dictionary<int, long>();

Просмотреть файл

@ -260,18 +260,20 @@ namespace KafkaNET.Library.Examples
Message lastMessage = null;
int count = 0;
KafkaMessageStream<Message> messagesStream = null;
ConsumerIterator<Message> iterator = null;
using (CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(cgOptions.CancellationTimeoutMs))
{
lastMessage = null;
IEnumerable<Message> messages = topicData[0].GetCancellable(cancellationTokenSource.Token);
messagesStream = (KafkaMessageStream<Message>)messages;
iterator = (ConsumerIterator<Message>)messagesStream.iterator;
foreach (Message message in messages)
{
latestTotalCount = Interlocked.Increment(ref ConsumerGroupHelper.totalCount);
lastMessage = message;
if (latestTotalCount == 1)
{
PartitionTopicInfo p = messagesStream.iterator.currentTopicInfo;
PartitionTopicInfo p = iterator.currentTopicInfo;
Logger.InfoFormat("Read FIRST message, it's offset: {0} PartitionID:{1}", lastMessage.Offset, p == null ? "null" : p.PartitionId.ToString());
}
hitEndAndCommited = false;
@ -285,7 +287,7 @@ namespace KafkaNET.Library.Examples
{
connector.CommitOffsets();
consumedTotalCount += count;
PartitionTopicInfo p = messagesStream.iterator.currentTopicInfo;
PartitionTopicInfo p = iterator.currentTopicInfo;
Console.WriteLine("\tRead some and commit once, Thread: {8} consumedTotalCount:{9} Target:{10} LATEST message offset: {0}. PartitionID:{1} -- {2} Totally read {3} will commit offset. {4} FetchOffset:{5} ConsumeOffset:{6} CommitedOffset:{7}"
, lastMessage.Offset, lastMessage.PartitionId.Value, p == null ? "null" : p.PartitionId.ToString(), latestTotalCount, DateTime.Now
, p == null ? "null" : p.FetchOffset.ToString()

Просмотреть файл

@ -170,6 +170,9 @@
<Compile Include="Cfg\ISyncProducerConfigShared.cs" />
<Compile Include="Cfg\KafkaSimpleManagerConfiguration.cs" />
<Compile Include="Cfg\ProducerConfiguration.cs" />
<Compile Include="Consumers\IConsumerIterator.cs" />
<Compile Include="Consumers\IKafkaMessageStream.cs" />
<Compile Include="Consumers\IZookeeperConsumerConnector.cs" />
<Compile Include="ZooKeeperIntegration\Events\ChildChangedEventItem.cs" />
<Compile Include="ZooKeeperIntegration\Events\DataChangedEventItem.cs" />
<Compile Include="ZooKeeperIntegration\Events\ZooKeeperChildChangedEventArgs.cs" />

Просмотреть файл

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<package >
<metadata>
<id>Microsoft.Kafkanet</id>
<version>1.0.0.0</version>
<authors>Microsoft Corporation</authors>
<owners>Microsoft Corporation</owners>
<licenseUrl>https://github.com/Microsoft/Kafkanet/blob/master/LICENSE.txt</licenseUrl>
<projectUrl>https://github.com/Microsoft/Kafkanet</projectUrl>
<requireLicenseAcceptance>true</requireLicenseAcceptance>
<description>.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.</description>
<releaseNotes>https://github.com/Microsoft/Kafkanet/releases</releaseNotes>
<copyright>Copyright (c) Microsoft. All rights reserved. Licensed under the Apache License, Version 2.0. See LICENSE file in the project root for full license information.</copyright>
<tags>.Net C# Kafka Kafkanet Client</tags>
<dependencies>
<dependency id="log4net" version="1.2.10" />
<dependency id="ZooKeeper.Net" version="3.4.6.2" />
</dependencies>
</metadata>
<files>
</files>
</package>

Просмотреть файл

@ -15,6 +15,8 @@
* limitations under the License.
*/
using System.Collections.Generic;
namespace Kafka.Client.Producers
{
using Kafka.Client.Cfg;
@ -29,6 +31,13 @@ namespace Kafka.Client.Producers
{
ProducerConfiguration Config { get; }
/// <summary>
/// Sends the data to a multiple topics, partitioned by key, using either the
/// synchronous or the asynchronous producer.
/// </summary>
/// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
void Send(IEnumerable<ProducerData<TKey, TData>> data);
/// <summary>
/// Sends the data to a single topic, partitioned by key, using either the
/// synchronous or the asynchronous producer.

Просмотреть файл

@ -514,6 +514,7 @@ namespace Kafka.Client.ZooKeeperIntegration.Listeners
default:
throw new ConfigurationErrorsException("Wrong value in autoOffsetReset in ConsumerConfig");
}
offsetCommited = Math.Max(offset - 1, 0);
}
else
{