specify max batch size as a constructor argument of BatchWorker

This commit is contained in:
Sebastian Burckhardt 2021-01-28 07:36:07 -08:00
Родитель f7cbe45cbc
Коммит 460b38c1aa
8 изменённых файлов: 11 добавлений и 9 удалений

Просмотреть файл

@ -18,7 +18,7 @@ namespace DurableTask.Netherite.Scaling
public static TimeSpan AggregatePublishInterval = TimeSpan.FromSeconds(15);
readonly CancellationTokenSource cancelWait = new CancellationTokenSource();
public LoadPublisher(ILoadMonitorService service, CancellationToken token, ILogger logger) : base(nameof(LoadPublisher), false, token)
public LoadPublisher(ILoadMonitorService service, CancellationToken token, ILogger logger) : base(nameof(LoadPublisher), false, int.MaxValue, token)
{
this.service = service;
this.logger = logger;

Просмотреть файл

@ -24,7 +24,7 @@ namespace DurableTask.Netherite.Faster
readonly IntakeWorker intakeWorker;
public LogWorker(BlobManager blobManager, FasterLog log, Partition partition, StoreWorker storeWorker, FasterTraceHelper traceHelper, CancellationToken cancellationToken)
: base(nameof(LogWorker), true, cancellationToken)
: base(nameof(LogWorker), true, 500, cancellationToken)
{
partition.ErrorHandler.Token.ThrowIfCancellationRequested();
@ -56,7 +56,7 @@ namespace DurableTask.Netherite.Faster
readonly LogWorker logWorker;
readonly List<PartitionUpdateEvent> updateEvents;
public IntakeWorker(CancellationToken token, LogWorker logWorker) : base(nameof(IntakeWorker), true, token)
public IntakeWorker(CancellationToken token, LogWorker logWorker) : base(nameof(IntakeWorker), true, int.MaxValue, token)
{
this.logWorker = logWorker;
this.updateEvents = new List<PartitionUpdateEvent>();

Просмотреть файл

@ -45,7 +45,7 @@ namespace DurableTask.Netherite.Faster
public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHelper traceHelper, BlobManager blobManager, CancellationToken cancellationToken)
: base($"{nameof(StoreWorker)}{partition.PartitionId:D2}", true, cancellationToken)
: base($"{nameof(StoreWorker)}{partition.PartitionId:D2}", true, 500, cancellationToken)
{
partition.ErrorHandler.Token.ThrowIfCancellationRequested();

Просмотреть файл

@ -22,7 +22,7 @@ namespace DurableTask.Netherite
readonly ConcurrentDictionary<TrackedObjectKey, TrackedObject> trackedObjects
= new ConcurrentDictionary<TrackedObjectKey, TrackedObject>();
public MemoryStorage(ILogger logger) : base(nameof(MemoryStorage), true, CancellationToken.None)
public MemoryStorage(ILogger logger) : base(nameof(MemoryStorage), true, int.MaxValue, CancellationToken.None)
{
this.logger = logger;
this.GetOrAdd(TrackedObjectKey.Activities);

Просмотреть файл

@ -25,7 +25,7 @@ namespace DurableTask.Netherite.EventHubs
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base(nameof(EventHubsSender<T>), false, CancellationToken.None)
: base(nameof(EventHubsSender<T>), false, 2000, CancellationToken.None)
{
this.host = host;
this.taskHubGuid = taskHubGuid;

Просмотреть файл

@ -18,7 +18,7 @@ namespace DurableTask.Netherite.Emulated
readonly string name;
readonly ILogger logger;
public MemoryQueue(CancellationToken cancellationToken, string name, ILogger logger) : base(nameof(MemoryQueue<T,B>), true, cancellationToken)
public MemoryQueue(CancellationToken cancellationToken, string name, ILogger logger) : base(nameof(MemoryQueue<T,B>), true, int.MaxValue, cancellationToken)
{
this.name = name;
this.logger = logger;

Просмотреть файл

@ -13,7 +13,7 @@ namespace DurableTask.Netherite.Emulated
Action<IEnumerable<Event>> sendHandler;
public SendWorker(CancellationToken token)
: base(nameof(SendWorker), false, token)
: base(nameof(SendWorker), false, int.MaxValue, token)
{
}

Просмотреть файл

@ -19,6 +19,7 @@ namespace DurableTask.Netherite
/// </summary>
abstract class BatchWorker<T>
{
readonly int maxBatchSize;
readonly Stopwatch stopwatch;
protected string Name { get; }
protected readonly CancellationToken cancellationToken;
@ -36,11 +37,12 @@ namespace DurableTask.Netherite
/// <summary>
/// Constructor including a cancellation token.
/// </summary>
public BatchWorker(string name, bool startSuspended, CancellationToken cancellationToken)
public BatchWorker(string name, bool startSuspended, int maxBatchSize, CancellationToken cancellationToken)
{
this.Name = name;
this.cancellationToken = cancellationToken;
this.state = startSuspended ? SUSPENDED : IDLE;
this.maxBatchSize = maxBatchSize;
this.stopwatch = new Stopwatch();
}