Changed versionId from 1 to 0, replaced hard coded values with constant fields
This commit is contained in:
Родитель
f07599f176
Коммит
ec34c27af9
|
@ -36,6 +36,10 @@ namespace Kafka.Client.Consumers
|
|||
private static readonly int FailureRetryDelayMs = (int)TimeSpan.FromSeconds(5).TotalMilliseconds;
|
||||
|
||||
private const string clientId = "LeaderFetcher";
|
||||
|
||||
private const short VersionId = 0;
|
||||
|
||||
private const int CorrelationId = 0;
|
||||
|
||||
private readonly Cluster _brokers;
|
||||
|
||||
|
@ -77,7 +81,7 @@ namespace Kafka.Client.Consumers
|
|||
var consumer = new Consumer(_config, broker.Value.Host, broker.Value.Port);
|
||||
try
|
||||
{
|
||||
IEnumerable<TopicMetadata> metaData = consumer.GetMetaData(TopicMetadataRequest.Create(new[] { partition.Topic }, 1, 0, clientId));
|
||||
IEnumerable<TopicMetadata> metaData = consumer.GetMetaData(TopicMetadataRequest.Create(new[] { partition.Topic }, VersionId, CorrelationId, clientId));
|
||||
if (metaData != null && metaData.Any())
|
||||
{
|
||||
PartitionMetadata newPartitionData = metaData.First().PartitionsMetadata.FirstOrDefault(p => p.PartitionId == partition.PartitionId);
|
||||
|
@ -120,4 +124,4 @@ namespace Kafka.Client.Consumers
|
|||
_stop = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче