* refactor partition management options, remove Scripted partition manager, and add RecoveryTester feature

* prevent creation of fresh taskhub in partition recovery mode
This commit is contained in:
Sebastian Burckhardt 2024-03-15 12:57:01 -07:00 коммит произвёл GitHub
Родитель 230ada9c11
Коммит 37f49adde5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
15 изменённых файлов: 508 добавлений и 606 удалений

Просмотреть файл

@ -91,6 +91,11 @@ namespace DurableTask.Netherite
[JsonConverter(typeof(StringEnumConverter))]
public PartitionManagementOptions PartitionManagement { get; set; } = PartitionManagementOptions.EventProcessorHost;
/// <summary>
/// Additional parameters for the partition management, if necessary
/// </summary>
public string PartitionManagementParameters { get; set; } = null;
/// <summary>
/// Gets or sets the activity scheduler option
/// </summary>

Просмотреть файл

@ -18,13 +18,20 @@ namespace DurableTask.Netherite
ClientOnly,
/// <summary>
/// Use the event processor host implementation provided by the EventHubs client library.
/// Use the event processor host implementation provided by the EventHubs client library. This dynamically
/// balances the partitions across all connected hosts.
/// </summary>
EventProcessorHost,
/// <summary>
/// Follow a predefined partition management script. This is meant to be used for testing and benchmarking scenarios.
/// This was an internal feature and is no longer supported.
/// </summary>
Scripted,
/// <summary>
/// Test the recovery without modifying storage. Useful for diagnosing recovery problems. Not functional as an orchestration service!
/// </summary>
RecoveryTester,
}
}

Просмотреть файл

@ -119,6 +119,7 @@ namespace DurableTask.Netherite.Faster
this.pageBlobDirectory.ToString(),
2000,
true,
failIfReadonly: false,
async (numAttempts) =>
{
var client = this.pageBlobDirectory.Client.WithRetries;
@ -299,6 +300,7 @@ namespace DurableTask.Netherite.Faster
entry.PageBlob.Default.Name,
5000,
true,
failIfReadonly: true,
async (numAttempts) =>
{
var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive;
@ -334,6 +336,7 @@ namespace DurableTask.Netherite.Faster
entry.PageBlob.Default.Name,
5000,
false,
failIfReadonly: true,
async (numAttempts) =>
{
var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive;
@ -459,6 +462,7 @@ namespace DurableTask.Netherite.Faster
blobEntry.PageBlob.Default.Name,
1000 + (int)length / 1000,
true,
failIfReadonly: true,
async (numAttempts) =>
{
if (numAttempts > 0)
@ -519,6 +523,7 @@ namespace DurableTask.Netherite.Faster
blob.Default.Name,
1000 + (int)length / 1000,
true,
failIfReadonly: false,
async (numAttempts) =>
{
if (numAttempts > 0)

Просмотреть файл

@ -71,6 +71,7 @@ namespace DurableTask.Netherite.Faster
pageBlob.Default.Name,
3000,
true,
failIfReadonly: true,
async (numAttempts) =>
{
var client = (numAttempts > 1) ? pageBlob.Default : pageBlob.Aggressive;

Просмотреть файл

@ -31,6 +31,7 @@ namespace DurableTask.Netherite.Faster
readonly uint partitionId;
readonly CancellationTokenSource shutDownOrTermination;
readonly string taskHubPrefix;
readonly bool readOnlyMode;
BlobUtilsV12.ServiceClients blockBlobAccount;
BlobUtilsV12.ServiceClients pageBlobAccount;
@ -62,6 +63,8 @@ namespace DurableTask.Netherite.Faster
public string ContainerName { get; }
public bool ReadonlyMode => this.readOnlyMode;
internal BlobUtilsV12.ContainerClients BlockBlobContainer => this.blockBlobContainer;
internal BlobUtilsV12.ContainerClients PageBlobContainer => this.pageBlobContainer;
@ -349,6 +352,11 @@ namespace DurableTask.Netherite.Faster
this.TraceHelper = new FasterTraceHelper(logger, logLevelLimit, performanceLogger, this.partitionId, this.UseLocalFiles ? "none" : this.settings.StorageAccountName, taskHubName);
this.PartitionErrorHandler = errorHandler;
this.shutDownOrTermination = CancellationTokenSource.CreateLinkedTokenSource(errorHandler.Token);
if (this.settings.PartitionManagement == PartitionManagementOptions.RecoveryTester)
{
this.readOnlyMode = true;
}
}
string PartitionFolderName => $"{this.taskHubPrefix}p{this.partitionId:D2}";
@ -589,6 +597,7 @@ namespace DurableTask.Netherite.Faster
this.eventLogCommitBlob.Default.Name,
2000,
true,
failIfReadonly: true,
async (numAttempts) =>
{
try
@ -769,7 +778,7 @@ namespace DurableTask.Netherite.Faster
//TODO
return;
}
else
else if (!this.readOnlyMode)
{
string token1 = this.CheckpointInfo.LogToken.ToString();
string token2 = this.CheckpointInfo.IndexToken.ToString();
@ -804,6 +813,7 @@ namespace DurableTask.Netherite.Faster
directory.Prefix,
1000,
false,
failIfReadonly: true,
async (numAttempts) =>
{
results = await directory.GetBlobsAsync(this.shutDownOrTermination.Token);
@ -841,6 +851,7 @@ namespace DurableTask.Netherite.Faster
blobName,
1000,
false,
failIfReadonly: true,
async (numAttempts) => (await BlobUtilsV12.ForceDeleteAsync(directory.Client.Default, blobName) ? 1 : 0)));
}
await Task.WhenAll(deletionTasks);
@ -889,6 +900,7 @@ namespace DurableTask.Netherite.Faster
this.eventLogCommitBlob.Default.Name,
1000,
true,
failIfReadonly: true,
(int numAttempts) =>
{
try
@ -967,6 +979,7 @@ namespace DurableTask.Netherite.Faster
this.eventLogCommitBlob.Name,
1000,
true,
failIfReadonly: false,
(int numAttempts) =>
{
if (numAttempts > 0)
@ -1074,6 +1087,7 @@ namespace DurableTask.Netherite.Faster
checkpointCompletedBlob.Name,
1000,
true,
failIfReadonly: false,
async (numAttempts) =>
{
try
@ -1125,6 +1139,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
failIfReadonly: true,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@ -1162,6 +1177,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
failIfReadonly: true,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@ -1205,6 +1221,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
failIfReadonly: false,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@ -1242,6 +1259,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
failIfReadonly: false,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@ -1369,6 +1387,7 @@ namespace DurableTask.Netherite.Faster
singletonsBlob.Name,
1000 + singletons.Length / 5000,
false,
failIfReadonly: true,
async (numAttempts) =>
{
var client = singletonsBlob.WithRetries;
@ -1402,6 +1421,7 @@ namespace DurableTask.Netherite.Faster
singletonsBlob.Name,
20000,
true,
failIfReadonly: false,
async (numAttempts) =>
{
@ -1435,6 +1455,7 @@ namespace DurableTask.Netherite.Faster
checkpointCompletedBlob.Name,
1000,
true,
failIfReadonly: true,
async (numAttempts) =>
{
var client = numAttempts > 1 ? checkpointCompletedBlob.Default : checkpointCompletedBlob.Aggressive;

Просмотреть файл

@ -25,9 +25,18 @@ namespace DurableTask.Netherite.Faster
string target,
int expectedLatencyBound,
bool isCritical,
bool failIfReadonly,
Func<int, Task<long>> operationAsync,
Func<Task> readETagAsync = null)
{
if (this.readOnlyMode && failIfReadonly)
{
string message = $"storage operation {name} ({intent}) cannot be performed in read-only mode";
this.StorageTracer?.FasterStorageProgress(message);
this.HandleStorageError(name, message, target, null, isCritical, false);
throw new OperationCanceledException(message);
}
try
{
if (semaphore != null)
@ -138,8 +147,17 @@ namespace DurableTask.Netherite.Faster
string target,
int expectedLatencyBound,
bool isCritical,
bool failIfReadonly,
Func<int,(long,bool)> operation)
{
if (this.readOnlyMode && failIfReadonly)
{
string message = $"storage operation {name} ({intent}) cannot be performed in read-only mode";
this.StorageTracer?.FasterStorageProgress(message);
this.HandleStorageError(name, message, target, null, isCritical, false);
throw new OperationCanceledException(message);
}
Stopwatch stopwatch = new Stopwatch();
int numAttempts = 0;

Просмотреть файл

@ -124,6 +124,14 @@ namespace DurableTask.Netherite.Faster
async Task<bool> IStorageLayer.CreateTaskhubIfNotExistsAsync()
{
if (this.settings.PartitionManagement == PartitionManagementOptions.RecoveryTester)
{
// we do NOT create any resources during recovery testing
await ((IStorageLayer)this).TryLoadTaskhubAsync(true);
this.traceHelper.TraceProgress("Confirmed existing taskhub");
return false;
}
bool containerCreated = await (await this.cloudBlobContainer).CreateIfNotExistsAsync();
if (containerCreated)
{

Просмотреть файл

@ -206,8 +206,13 @@ namespace DurableTask.Netherite.Faster
throw;
}
this.TraceHelper.FasterProgress("Replay complete");
// restart pending actitivities, timers, work items etc.
this.storeWorker.RestartThingsAtEndOfRecovery(inputQueueFingerprint, this.blobManager.IncarnationTimestamp);
if (!this.blobManager.ReadonlyMode)
{
this.storeWorker.RestartThingsAtEndOfRecovery(inputQueueFingerprint, this.blobManager.IncarnationTimestamp);
}
this.TraceHelper.FasterProgress("Recovery complete");
}
@ -217,8 +222,11 @@ namespace DurableTask.Netherite.Faster
public void StartProcessing()
{
this.storeWorker.StartProcessing();
this.logWorker.StartProcessing();
if (!this.blobManager.ReadonlyMode)
{
this.storeWorker.StartProcessing();
this.logWorker.StartProcessing();
}
}
internal void CheckForStuckWorkers(object _)
@ -252,21 +260,24 @@ namespace DurableTask.Netherite.Faster
public async Task CleanShutdown(bool takeFinalCheckpoint)
{
this.TraceHelper.FasterProgress("Stopping workers");
// in parallel, finish processing log requests and stop processing store requests
Task t1 = this.logWorker.PersistAndShutdownAsync();
Task t2 = this.storeWorker.CancelAndShutdown();
// observe exceptions if the clean shutdown is not working correctly
await this.TerminationWrapper(t1);
await this.TerminationWrapper(t2);
// if the the settings indicate we want to take a final checkpoint, do so now.
if (takeFinalCheckpoint)
if (!this.blobManager.ReadonlyMode)
{
this.TraceHelper.FasterProgress("Writing final checkpoint");
await this.TerminationWrapper(this.storeWorker.TakeFullCheckpointAsync("final checkpoint").AsTask());
this.TraceHelper.FasterProgress("Stopping workers");
// in parallel, finish processing log requests and stop processing store requests
Task t1 = this.logWorker.PersistAndShutdownAsync();
Task t2 = this.storeWorker.CancelAndShutdown();
// observe exceptions if the clean shutdown is not working correctly
await this.TerminationWrapper(t1);
await this.TerminationWrapper(t2);
// if the the settings indicate we want to take a final checkpoint, do so now.
if (takeFinalCheckpoint)
{
this.TraceHelper.FasterProgress("Writing final checkpoint");
await this.TerminationWrapper(this.storeWorker.TakeFullCheckpointAsync("final checkpoint").AsTask());
}
}
this.TraceHelper.FasterProgress("Stopping BlobManager");

Просмотреть файл

@ -0,0 +1,169 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.EventHubsTransport
{
using DurableTask.Core.Common;
using DurableTask.Netherite.Abstractions;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// The partition manager natively provided by EH
/// </summary>
class EventHubsPartitionManager : IPartitionManager
{
readonly TransportAbstraction.IHost host;
readonly EventHubsConnections connections;
readonly TaskhubParameters parameters;
readonly NetheriteOrchestrationServiceSettings settings;
readonly EventHubsTraceHelper traceHelper;
readonly CloudBlobContainer cloudBlobContainer;
readonly string pathPrefix;
readonly EventHubsTransport transport;
readonly CancellationToken shutdownToken;
EventProcessorHost eventProcessorHost;
EventProcessorHost loadMonitorHost;
public EventHubsPartitionManager(
TransportAbstraction.IHost host,
EventHubsConnections connections,
TaskhubParameters parameters,
NetheriteOrchestrationServiceSettings settings,
EventHubsTraceHelper traceHelper,
CloudBlobContainer cloudBlobContainer,
string pathPrefix,
EventHubsTransport transport,
CancellationToken shutdownToken)
{
this.host = host;
this.connections = connections;
this.parameters = parameters;
this.settings = settings;
this.traceHelper = traceHelper;
this.cloudBlobContainer = cloudBlobContainer;
this.pathPrefix = pathPrefix;
this.transport = transport;
this.shutdownToken = shutdownToken;
}
public string Fingerprint => this.connections.Fingerprint;
public Task StartHostingAsync()
{
return Task.WhenAll(this.StartHostingPartitionsAsync(), this.StartHostingLoadMonitorAsync());
}
public Task StopHostingAsync()
{
return Task.WhenAll(this.eventProcessorHost.UnregisterEventProcessorAsync(), this.loadMonitorHost.UnregisterEventProcessorAsync());
}
async Task StartHostingPartitionsAsync()
{
this.traceHelper.LogInformation($"EventHubsTransport is registering EventProcessorHost");
string formattedCreationDate = this.connections.CreationTimestamp.ToString("o").Replace("/", "-");
this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}");
var processorOptions = new EventProcessorOptions()
{
MaxBatchSize = 300,
PrefetchCount = 500,
};
await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
new PartitionEventProcessorFactory(this),
processorOptions);
this.traceHelper.LogInformation($"EventHubsTransport started EventProcessorHost");
}
async Task StartHostingLoadMonitorAsync()
{
this.traceHelper.LogInformation("EventHubsTransport is registering LoadMonitorHost");
this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
EventHubsTransport.LoadMonitorHub,
EventHubsTransport.LoadMonitorConsumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{EventHubsTransport.LoadMonitorHub}");
var processorOptions = new EventProcessorOptions()
{
InitialOffsetProvider = (s) => EventPosition.FromEnqueuedTime(DateTime.UtcNow - TimeSpan.FromSeconds(30)),
MaxBatchSize = 500,
PrefetchCount = 500,
};
await this.loadMonitorHost.RegisterEventProcessorFactoryAsync(
new LoadMonitorEventProcessorFactory(this),
processorOptions);
this.traceHelper.LogInformation($"EventHubsTransport started LoadMonitorHost");
}
class PartitionEventProcessorFactory : IEventProcessorFactory
{
readonly EventHubsPartitionManager manager;
public PartitionEventProcessorFactory(EventHubsPartitionManager transport)
{
this.manager = transport;
}
public IEventProcessor CreateEventProcessor(PartitionContext context)
{
return new EventHubsProcessor(
this.manager.host,
this.manager.transport,
this.manager.parameters,
context,
this.manager.settings,
this.manager.transport,
this.manager.traceHelper,
this.manager.shutdownToken);
}
}
class LoadMonitorEventProcessorFactory : IEventProcessorFactory
{
readonly EventHubsPartitionManager manager;
public LoadMonitorEventProcessorFactory(EventHubsPartitionManager transport)
{
this.manager = transport;
}
public IEventProcessor CreateEventProcessor(PartitionContext context)
{
return new LoadMonitorProcessor(
this.manager.host,
this.manager.transport,
this.manager.parameters,
context,
this.manager.settings,
this.manager.traceHelper,
this.manager.shutdownToken);
}
}
}
}

Просмотреть файл

@ -33,9 +33,8 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly EventHubsTraceHelper traceHelper;
readonly IStorageLayer storage;
readonly string shortClientId;
readonly bool useRealEventHubsConnection;
EventProcessorHost eventProcessorHost;
EventProcessorHost loadMonitorHost;
TransportAbstraction.IClient client;
bool hasWorkers;
@ -50,8 +49,7 @@ namespace DurableTask.Netherite.EventHubsTransport
CancellationTokenSource shutdownSource;
CloudBlobContainer cloudBlobContainer;
CloudBlockBlob partitionScript;
ScriptedEventProcessorHost scriptedEventProcessorHost;
IPartitionManager partitionManager;
int shutdownTriggered;
@ -76,6 +74,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName);
this.ClientId = Guid.NewGuid();
this.shortClientId = Client.GetShortId(this.ClientId);
this.useRealEventHubsConnection = this.settings.PartitionManagement != PartitionManagementOptions.RecoveryTester;
}
// these are hardcoded now but we may turn them into settings
@ -105,18 +104,20 @@ namespace DurableTask.Netherite.EventHubsTransport
var cloudStorageAccount = await this.settings.BlobStorageConnection.GetAzureStorageV11AccountAsync();
var cloudBlobClient = cloudStorageAccount.CreateCloudBlobClient();
this.cloudBlobContainer = cloudBlobClient.GetContainerReference(containerName);
this.partitionScript = this.cloudBlobContainer.GetBlockBlobReference("partitionscript.json");
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
if (this.useRealEventHubsConnection)
{
Host = host,
TraceHelper = this.traceHelper,
};
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
{
Host = this.host,
TraceHelper = this.traceHelper,
};
await this.connections.StartAsync(this.parameters);
await this.connections.StartAsync(this.parameters);
}
return this.parameters;
}
@ -125,29 +126,32 @@ namespace DurableTask.Netherite.EventHubsTransport
{
this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this);
var channel = Channel.CreateBounded<ClientEvent>(new BoundedChannelOptions(500)
if (this.useRealEventHubsConnection)
{
SingleReader = true,
AllowSynchronousContinuations = true
});
var channel = Channel.CreateBounded<ClientEvent>(new BoundedChannelOptions(500)
{
SingleReader = true,
AllowSynchronousContinuations = true
});
var clientReceivers = this.connections.CreateClientReceivers(this.ClientId, EventHubsTransport.ClientConsumerGroup);
var clientReceivers = this.connections.CreateClientReceivers(this.ClientId, EventHubsTransport.ClientConsumerGroup);
this.clientConnectionsEstablished = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
.ToArray();
this.clientReceiveLoops = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientReceiveLoopAsync(i, clientReceivers[i], channel.Writer))
.ToArray();
this.clientConnectionsEstablished = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
.ToArray();
this.clientProcessTask = this.ClientProcessLoopAsync(channel.Reader);
this.clientReceiveLoops = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientReceiveLoopAsync(i, clientReceivers[i], channel.Writer))
.ToArray();
// we must wait for the client receive connections to be established before continuing
// otherwise, we may miss messages that are sent before the client receiver establishes the receive position
await Task.WhenAll(this.clientConnectionsEstablished);
this.clientProcessTask = this.ClientProcessLoopAsync(channel.Reader);
// we must wait for the client receive connections to be established before continuing
// otherwise, we may miss messages that are sent before the client receiver establishes the receive position
await Task.WhenAll(this.clientConnectionsEstablished);
}
}
async Task ITransportLayer.StartWorkersAsync()
@ -156,91 +160,55 @@ namespace DurableTask.Netherite.EventHubsTransport
{
throw new InvalidOperationException("client must be started before partition hosting is started.");
}
if (this.settings.PartitionManagement != PartitionManagementOptions.ClientOnly)
switch (this.settings.PartitionManagement)
{
this.hasWorkers = true;
await Task.WhenAll(StartPartitionHost(), StartLoadMonitorHost());
}
case (PartitionManagementOptions.ClientOnly):
this.hasWorkers = false;
break;
async Task StartPartitionHost()
{
if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted)
{
this.traceHelper.LogInformation($"EventHubsTransport is registering PartitionHost");
case (PartitionManagementOptions.EventProcessorHost):
this.hasWorkers = true;
this.partitionManager = new EventHubsPartitionManager(
this.host,
this.connections,
this.parameters,
this.settings,
this.traceHelper,
this.cloudBlobContainer,
this.pathPrefix,
this,
this.shutdownSource.Token);
break;
string formattedCreationDate = this.connections.CreationTimestamp.ToString("o").Replace("/", "-");
this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}");
var processorOptions = new EventProcessorOptions()
{
MaxBatchSize = 300,
PrefetchCount = 500,
};
await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
new PartitionEventProcessorFactory(this),
processorOptions);
this.traceHelper.LogInformation($"EventHubsTransport started PartitionHost");
}
else
{
this.traceHelper.LogInformation($"EventHubsTransport is starting scripted partition host");
this.scriptedEventProcessorHost = new ScriptedEventProcessorHost(
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
this.settings.EventHubsConnection,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
case (PartitionManagementOptions.RecoveryTester):
this.hasWorkers = true;
this.partitionManager = new RecoveryTester(
this.host,
this,
this.connections,
this.parameters,
this.settings,
this.traceHelper,
this.settings.WorkerId);
this.traceHelper);
break;
var thread = TrackedThreads.MakeTrackedThread(() => this.scriptedEventProcessorHost.StartEventProcessing(this.settings, this.partitionScript), "ScriptedEventProcessorHost");
thread.Start();
}
case (PartitionManagementOptions.Scripted):
// we retired this feature since it was only meant for internal development, and we did not use it much
throw new NetheriteConfigurationException($"the partition management setting {PartitionManagementOptions.Scripted} is no longer supported.");
default:
throw new NetheriteConfigurationException($"unknown partition management setting: {this.settings.PartitionManagement}");
}
async Task StartLoadMonitorHost()
if (this.hasWorkers)
{
this.traceHelper.LogInformation("EventHubsTransport is registering LoadMonitorHost");
this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
LoadMonitorHub,
LoadMonitorConsumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{LoadMonitorHub}");
var processorOptions = new EventProcessorOptions()
{
InitialOffsetProvider = (s) => EventPosition.FromEnqueuedTime(DateTime.UtcNow - TimeSpan.FromSeconds(30)),
MaxBatchSize = 500,
PrefetchCount = 500,
};
await this.loadMonitorHost.RegisterEventProcessorFactoryAsync(
new LoadMonitorEventProcessorFactory(this),
processorOptions);
this.traceHelper.LogDebug("EventHubsTransport is starting hosting work");
await this.partitionManager.StartHostingAsync();
this.traceHelper.LogDebug("EventHubsTransport started hosting work");
}
}
internal async Task ExitProcess(bool deletePartitionsFirst)
{
if (deletePartitionsFirst)
if (deletePartitionsFirst && this.useRealEventHubsConnection)
{
await this.connections.DeletePartitions();
}
@ -250,51 +218,6 @@ namespace DurableTask.Netherite.EventHubsTransport
System.Environment.Exit(222);
}
class PartitionEventProcessorFactory : IEventProcessorFactory
{
readonly EventHubsTransport transport;
public PartitionEventProcessorFactory(EventHubsTransport transport)
{
this.transport = transport;
}
public IEventProcessor CreateEventProcessor(PartitionContext context)
{
return new EventHubsProcessor(
this.transport.host,
this.transport,
this.transport.parameters,
context,
this.transport.settings,
this.transport,
this.transport.traceHelper,
this.transport.shutdownSource.Token);
}
}
class LoadMonitorEventProcessorFactory : IEventProcessorFactory
{
readonly EventHubsTransport transport;
public LoadMonitorEventProcessorFactory(EventHubsTransport transport)
{
this.transport = transport;
}
public IEventProcessor CreateEventProcessor(PartitionContext context)
{
return new LoadMonitorProcessor(
this.transport.host,
this.transport,
this.transport.parameters,
context,
this.transport.settings,
this.transport.traceHelper,
this.transport.shutdownSource.Token);
}
}
async Task ITransportLayer.StopAsync(bool fatalExceptionObserved)
{
if (Interlocked.CompareExchange(ref this.shutdownTriggered, 1, 0) == 0)
@ -304,7 +227,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions
await Task.WhenAll(
this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask,
this.hasWorkers ? this.partitionManager.StopHostingAsync() : Task.CompletedTask,
this.StopClientsAndConnectionsAsync());
this.traceHelper.LogInformation("EventHubsTransport is shut down");
@ -313,45 +236,28 @@ namespace DurableTask.Netherite.EventHubsTransport
async Task StopWorkersAsync()
{
this.traceHelper.LogDebug("EventHubsTransport is stopping partition and loadmonitor hosts");
await Task.WhenAll(
this.StopPartitionHostAsync(),
this.StopLoadMonitorHostAsync());
this.traceHelper.LogDebug("EventHubsTransport is stopping hosting work");
await this.partitionManager.StopHostingAsync();
this.traceHelper.LogDebug("EventHubsTransport stopped hosting work");
}
async Task StopClientsAndConnectionsAsync()
{
this.traceHelper.LogDebug("EventHubsTransport is stopping client process loop");
await this.clientProcessTask;
if (this.useRealEventHubsConnection)
{
this.traceHelper.LogDebug("EventHubsTransport is stopping client process loop");
await this.clientProcessTask;
this.traceHelper.LogDebug("EventHubsTransport is closing connections");
await this.connections.StopAsync();
this.traceHelper.LogDebug("EventHubsTransport is closing connections");
await this.connections.StopAsync();
}
this.traceHelper.LogDebug("EventHubsTransport is stopping client");
await this.client.StopAsync();
this.traceHelper.LogDebug("EventHubsTransport stopped clients");
}
async Task StopPartitionHostAsync()
{
if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted)
{
await this.eventProcessorHost.UnregisterEventProcessorAsync();
}
else
{
await this.scriptedEventProcessorHost.StopAsync();
}
this.traceHelper.LogDebug("EventHubsTransport stopped partition host");
}
async Task StopLoadMonitorHostAsync()
{
await this.loadMonitorHost.UnregisterEventProcessorAsync();
this.traceHelper.LogDebug("EventHubsTransport stopped loadmonitor host");
}
IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext partitionContext)
{
var processor = new EventHubsProcessor(this.host, this, this.parameters, partitionContext, this.settings, this, this.traceHelper, this.shutdownSource.Token);
@ -360,6 +266,12 @@ namespace DurableTask.Netherite.EventHubsTransport
void TransportAbstraction.ISender.Submit(Event evt)
{
if (!this.useRealEventHubsConnection)
{
DurabilityListeners.ReportException(evt, new InvalidOperationException("client is not functional - are you running in RecoveryTester mode?"));
return;
}
switch (evt)
{
case ClientEvent clientEvent:

Просмотреть файл

@ -0,0 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.EventHubsTransport
{
using System.Threading.Tasks;
/// <summary>
/// General interface for starting and stopping the logic that activates the partition and load monitor on hosts.
/// </summary>
interface IPartitionManager
{
Task StartHostingAsync();
Task StopHostingAsync();
}
}

Просмотреть файл

@ -8,7 +8,7 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.IO;
/// <summary>
/// Functionality for parsing the partition scripts used by <see cref="ScriptedEventProcessorHost"/>.
/// Functionality for parsing the partition scripts used by <see cref="ScriptedPartitionManager"/>.
/// </summary>
static class PartitionScript
{

Просмотреть файл

@ -0,0 +1,133 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.EventHubsTransport
{
using DurableTask.Core.Common;
using DurableTask.Netherite.Abstractions;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// A "partition manager" for diagnostic purposes, which recovers one or more partitions, but does not modify any files.
/// </summary>
class RecoveryTester : IPartitionManager
{
readonly TransportAbstraction.IHost host;
readonly TaskhubParameters parameters;
readonly NetheriteOrchestrationServiceSettings settings;
readonly EventHubsTraceHelper logger;
readonly List<uint> partitionsToTest;
readonly List<TransportAbstraction.IPartition> partitions;
public RecoveryTester(
TransportAbstraction.IHost host,
TaskhubParameters parameters,
NetheriteOrchestrationServiceSettings settings,
EventHubsTraceHelper logger)
{
this.host = host;
this.parameters = parameters;
this.settings = settings;
this.logger = logger;
this.ParseParameters(out this.partitionsToTest);
this.partitions = new List<TransportAbstraction.IPartition>();
}
public async Task StartHostingAsync()
{
this.logger.LogWarning($"RecoveryTester is testing partition recovery, parameters={this.settings.PartitionManagementParameters ?? "null"}");
this.ParseParameters(out List<uint> partitionsToTest);
foreach(uint partitionId in partitionsToTest)
{
try
{
this.logger.LogInformation("RecoveryTester is starting partition {partitionId}", partitionId);
// start this partition (which may include waiting for the lease to become available)
var partition = this.host.AddPartition(partitionId, new NullSender());
var errorHandler = this.host.CreateErrorHandler(partitionId);
var nextPacketToReceive = await partition.CreateOrRestoreAsync(errorHandler, this.parameters, "").ConfigureAwait(false);
this.logger.LogInformation("RecoveryTester recovered partition {partitionId} successfully", partitionId);
this.partitions.Add(partition);
}
catch (Exception e)
{
this.logger.LogError("RecoveryTester failed while recovering partition {partitionId}: {exception}", partitionId, e);
}
}
this.logger.LogWarning($"RecoveryTester completed testing partition recovery");
}
public async Task StopHostingAsync()
{
foreach (var partition in this.partitions)
{
try
{
this.logger.LogInformation("RecoveryTester is stopping partition {partitionId}", partition.PartitionId);
await partition.StopAsync(true);
this.logger.LogInformation("RecoveryTester stopped partition {partitionId} successfully", partition.PartitionId);
}
catch (Exception e)
{
this.logger.LogError("RecoveryTester failed while stopping partition {partitionId}: {exception}", partition.PartitionId, e);
}
}
}
public void ParseParameters(out List<uint> partitionsToTest)
{
partitionsToTest = new List<uint>();
if (this.settings.PartitionManagementParameters == null)
{
// test all partitions
for (uint i = 0; i < this.parameters.PartitionCount; i++)
{
partitionsToTest.Add(i);
}
}
else
{
// test only one partition as specified in this.settings.PartitionManagementParameters
try
{
int num = int.Parse(this.settings.PartitionManagementParameters);
if (num < 0 || num >= this.parameters.PartitionCount)
{
throw new IndexOutOfRangeException("partition out of range");
}
partitionsToTest.Add((uint) num);
}
catch (Exception ex)
{
throw new NetheriteConfigurationException($"invalid parameter for {nameof(RecoveryTester)}: {this.settings.PartitionManagementParameters}", ex);
}
}
}
class NullSender: TransportAbstraction.ISender
{
public void Submit(Event element)
{
}
}
}
}

Просмотреть файл

@ -1,411 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.EventHubsTransport
{
using DurableTask.Core.Common;
using DurableTask.Netherite.Abstractions;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// An alternate event processor host where partitions are placed (started and stopped)
/// according to a script that is read from a blob, as opposed to automatically load balanced.
/// It is intended for benchmarking and testing scenarios only, not production.
/// </summary>
class ScriptedEventProcessorHost
{
readonly string eventHubPath;
readonly string consumerGroupName;
readonly ConnectionInfo eventHubConnection;
readonly ConnectionInfo storageConnection;
readonly string leaseContainerName;
readonly string workerId;
readonly TransportAbstraction.IHost host;
readonly TransportAbstraction.ISender sender;
readonly EventHubsConnections connections;
readonly TaskhubParameters parameters;
readonly byte[] taskHubGuid;
readonly NetheriteOrchestrationServiceSettings settings;
readonly EventHubsTraceHelper logger;
readonly List<PartitionInstance> partitionInstances = new List<PartitionInstance>();
int numberOfPartitions;
public ScriptedEventProcessorHost(
string eventHubPath,
string consumerGroupName,
ConnectionInfo eventHubConnection,
ConnectionInfo storageConnection,
string leaseContainerName,
TransportAbstraction.IHost host,
TransportAbstraction.ISender sender,
EventHubsConnections connections,
TaskhubParameters parameters,
NetheriteOrchestrationServiceSettings settings,
EventHubsTraceHelper logger,
string workerId)
{
this.eventHubPath = eventHubPath;
this.consumerGroupName = consumerGroupName;
this.eventHubConnection = eventHubConnection;
this.storageConnection = storageConnection;
this.leaseContainerName = leaseContainerName;
this.host = host;
this.sender = sender;
this.connections = connections;
this.parameters = parameters;
this.taskHubGuid = parameters.TaskhubGuid.ToByteArray();
this.settings = settings;
this.logger = logger;
this.workerId = workerId;
}
public string Fingerprint => this.connections.Fingerprint;
public void StartEventProcessing(NetheriteOrchestrationServiceSettings settings, CloudBlockBlob partitionScript)
{
if (!partitionScript.Exists())
{
this.logger.LogInformation("ScriptedEventProcessorHost workerId={workerId} is waiting for script", this.workerId);
while (! partitionScript.Exists())
{
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}
// we use the UTC modification timestamp on the script as the scenario start time
DateTime scenarioStartTimeUtc = partitionScript.Properties.LastModified.Value.UtcDateTime;
// the number of partitions matters only if the script contains wildcards
this.numberOfPartitions = this.parameters.PartitionCount;
for (var partitionIndex = 0; partitionIndex < this.numberOfPartitions; partitionIndex++)
{
this.partitionInstances.Add(null);
}
List<PartitionScript.ProcessorHostEvent> timesteps = new List<PartitionScript.ProcessorHostEvent>(); ;
try
{
using (var memoryStream = new System.IO.MemoryStream())
{
partitionScript.DownloadRangeToStream(memoryStream, null, null);
memoryStream.Seek(0, System.IO.SeekOrigin.Begin);
timesteps.AddRange(PartitionScript.ParseEvents(scenarioStartTimeUtc, settings.WorkerId, this.numberOfPartitions, memoryStream));
}
this.logger.LogInformation("ScriptedEventProcessorHost workerId={workerId} started.", this.workerId);
}
catch(Exception e)
{
this.logger.LogError($"ScriptedEventProcessorHost workerId={this.workerId} failed to parse partitionscript: {e}");
}
int nextTime = 0;
List<PartitionScript.ProcessorHostEvent> nextGroup = new List<PartitionScript.ProcessorHostEvent>();
foreach (var timestep in timesteps)
{
if (nextTime == timestep.TimeSeconds)
{
nextGroup.Add(timestep);
}
else
{
this.Process(nextGroup);
nextGroup.Clear();
nextGroup.Add(timestep);
nextTime = timestep.TimeSeconds;
}
}
this.Process(nextGroup);
}
public Task StopAsync()
{
// TODO implement this. Not urgent since this class is currently only used for testing/benchmarking
return Task.CompletedTask;
}
void Process(List<PartitionScript.ProcessorHostEvent> ready)
{
if (ready.Count > 0)
{
int delay = (int)(ready[0].TimeUtc - DateTime.UtcNow).TotalMilliseconds;
if (delay > 0)
{
this.logger.LogInformation("ScriptedEventProcessorHost workerId={workerId} is waiting for {delay} ms until next hostEvent", this.workerId, delay);
Thread.Sleep(delay);
}
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
bool parallel = true;
var tasks = new List<Task>();
int lasttime = 0;
foreach (var timestep in ready)
{
this.logger.LogWarning("ScriptedEventProcessorHost workerId={workerId} performs action={action} partition={partition} time={time}.", this.workerId, timestep.Action, timestep.PartitionId, timestep.TimeSeconds);
lasttime = timestep.TimeSeconds;
}
foreach (var timestep in ready)
{
if (parallel)
{
tasks.Add(this.ProcessHostEvent(timestep));
}
else
{
this.ProcessHostEvent(timestep).GetAwaiter().GetResult();
}
}
Task.WhenAll(tasks).GetAwaiter().GetResult();
this.logger.LogWarning("ScriptedEventProcessorHost workerId={workerId} finished all actions for time={time} in {elapsedSeconds}s.", this.workerId, lasttime, stopwatch.Elapsed.TotalSeconds);
}
}
async Task ProcessHostEvent(PartitionScript.ProcessorHostEvent timestep)
{
try
{
int partitionId = timestep.PartitionId;
if (timestep.Action == "restart")
{
var oldPartitionInstance = this.partitionInstances[partitionId];
var newPartitionInstance = new PartitionInstance((uint) partitionId, oldPartitionInstance.Incarnation + 1, this);
this.partitionInstances[partitionId] = newPartitionInstance;
await Task.WhenAll(newPartitionInstance.StartAsync(), oldPartitionInstance.StopAsync());
}
else if (timestep.Action == "start")
{
var oldPartitionInstance = this.partitionInstances[partitionId];
var newPartitionInstance = new PartitionInstance((uint)partitionId, (oldPartitionInstance?.Incarnation ?? 0) + 1, this);
this.partitionInstances[partitionId] = newPartitionInstance;
await newPartitionInstance.StartAsync();
}
else if (timestep.Action == "stop")
{
var oldPartitionInstance = this.partitionInstances[partitionId];
await oldPartitionInstance.StopAsync();
}
else
{
throw new InvalidOperationException($"Unknown action: {timestep.Action}");
}
this.logger.LogWarning("ScriptedEventProcessorHost workerId={workerId} successfully performed action={action} partition={partition} time={time}.", this.workerId, timestep.Action, timestep.PartitionId, timestep.TimeSeconds);
}
catch (Exception e) when (!Utils.IsFatal(e))
{
// TODO: Maybe in the future we would like to actually do something in case of failure.
// For now it is fine to ignore them.
this.logger.LogError("ScriptedEventProcessorHost workerId={workerId} failed on action={action} partition={partition} time={time} exception={exception}", this.workerId, timestep.Action, timestep.PartitionId, timestep.TimeSeconds, e);
}
}
/// <summary>
/// Represents a particular instance of a partition that is being managed by a CustomEventProcessor host.
/// </summary>
class PartitionInstance
{
readonly uint partitionId;
readonly ScriptedEventProcessorHost host;
readonly BlobBatchReceiver<PartitionEvent> blobBatchReceiver;
TransportAbstraction.IPartition partition;
Task partitionEventLoop;
PartitionReceiver partitionReceiver;
CancellationTokenSource shutdownSource;
Task shutdownTask;
// Just copied from EventHubsTransport
const int MaxReceiveBatchSize = 1000; // actual batches are typically much smaller
public PartitionInstance(uint partitionId, int incarnation, ScriptedEventProcessorHost eventProcessorHost)
{
this.partitionId = partitionId;
this.Incarnation = incarnation;
this.host = eventProcessorHost;
string traceContext = $"PartitionInstance {this.host.eventHubPath}/{this.partitionId}({this.Incarnation})";
this.blobBatchReceiver = new BlobBatchReceiver<PartitionEvent>(traceContext, this.host.logger, this.host.settings, keepUntilConfirmed: true);
}
public int Incarnation { get; }
public async Task StartAsync()
{
this.shutdownSource = new CancellationTokenSource();
this.shutdownTask = this.WaitForShutdownAsync();
try
{
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) is starting partition", this.host.eventHubPath, this.partitionId, this.Incarnation);
// start this partition (which may include waiting for the lease to become available)
this.partition = this.host.host.AddPartition(this.partitionId, this.host.sender);
var errorHandler = this.host.host.CreateErrorHandler(this.partitionId);
var nextPacketToReceive = await this.partition.CreateOrRestoreAsync(errorHandler, this.host.parameters, this.host.Fingerprint).ConfigureAwait(false);
this.host.logger.LogInformation("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) started partition, next expected packet is #{nextSeqno}.{batchPos}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.Item1, nextPacketToReceive.Item2);
this.partitionEventLoop = Task.Run(() => this.PartitionEventLoop(nextPacketToReceive));
}
catch (Exception e) when (!Utils.IsFatal(e))
{
this.host.logger.LogError("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) failed to start partition: {exception}", this.host.eventHubPath, this.partitionId, this.Incarnation, e);
throw;
}
}
async Task WaitForShutdownAsync()
{
if (!this.shutdownSource.IsCancellationRequested)
{
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var registration = this.shutdownSource.Token.Register(() =>
{
tcs.TrySetResult(true);
});
await tcs.Task;
registration.Dispose();
}
}
// TODO: Handle errors
public async Task StopAsync()
{
try
{
// First stop the partition. We need to wait until it shutdowns before closing the receiver, since it needs to receive confirmation events.
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) stopping partition)", this.host.eventHubPath, this.partitionId, this.Incarnation);
await this.partition.StopAsync(false).ConfigureAwait(false);
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) stopped partition", this.host.eventHubPath, this.partitionId, this.Incarnation);
// wait for the receiver loop to terminate
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) stopping receiver loop", this.host.eventHubPath, this.partitionId, this.Incarnation);
this.shutdownSource.Cancel();
await this.partitionEventLoop.ConfigureAwait(false);
// shut down the partition receiver (eventHubs complains if more than 5 of these are active per partition)
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) closing the partition receiver", this.host.eventHubPath, this.partitionId, this.Incarnation);
await this.partitionReceiver.CloseAsync().ConfigureAwait(false);
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) stopped partition", this.host.eventHubPath, this.partitionId, this.Incarnation);
}
catch (Exception e) when (!Utils.IsFatal(e))
{
this.host.logger.LogError("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) failed to stop partition: {exception}", this.host.eventHubPath, this.partitionId, this.Incarnation, e);
throw;
}
}
// TODO: Update all the logging messages
async Task PartitionEventLoop((long seqNo, int batchPos) nextPacketToReceive)
{
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) starting receive loop", this.host.eventHubPath, this.partitionId, this.Incarnation);
try
{
this.partitionReceiver = this.host.connections.CreatePartitionReceiver((int)this.partitionId, this.host.consumerGroupName, nextPacketToReceive.Item1);
List<PartitionEvent> batch = new List<PartitionEvent>();
while (!this.shutdownSource.IsCancellationRequested)
{
this.host.logger.LogTrace("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) trying to receive eventdata from position {position}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.Item1);
IEnumerable<EventData> hubMessages;
try
{
var receiveTask = this.partitionReceiver.ReceiveAsync(MaxReceiveBatchSize, TimeSpan.FromMinutes(1));
await Task.WhenAny(receiveTask, this.shutdownTask).ConfigureAwait(false);
this.shutdownSource.Token.ThrowIfCancellationRequested();
hubMessages = await receiveTask.ConfigureAwait(false);
}
catch (TimeoutException exception)
{
// not sure that we should be seeing this, but we do.
this.host.logger.LogWarning("Retrying after transient(?) TimeoutException in ReceiveAsync {exception}", exception);
hubMessages = null;
}
if (hubMessages != null)
{
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) received eventdata from position {position}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.Item1);
int totalEvents = 0;
Stopwatch stopwatch = Stopwatch.StartNew();
var receivedTimestamp = this.partition.CurrentTimeMs;
// we need to update the next expected seqno even if the iterator returns no packets, since it may have discarded some
// iterators do not support ref arguments, so we use a simple wrapper class to work around this limitation
MutableLong nextPacket = new MutableLong() { Value = nextPacketToReceive.seqNo };
await foreach ((EventData eventData, PartitionEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.host.taskHubGuid, hubMessages, this.shutdownSource.Token, nextPacket))
{
for (int i = 0; i < events.Length; i++)
{
PartitionEvent evt = events[i];
if (i < events.Length - 1)
{
evt.NextInputQueuePosition = seqNo;
evt.NextInputQueueBatchPosition = i + 1;
}
else
{
evt.NextInputQueuePosition = seqNo + 1;
}
if (this.host.logger.IsEnabled(LogLevel.Trace))
{
this.host.logger.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received packet #{seqno}.{subSeqNo} {event} id={eventId}", this.host.eventHubPath, this.partitionId, this.Incarnation, seqNo, i, evt, evt.EventIdString);
}
totalEvents++;
}
if (nextPacketToReceive.batchPos == 0)
{
this.partition.SubmitEvents(events);
}
else
{
this.host.logger.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) skipping {batchPos} events in batch #{seqno} because they are already processed", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.batchPos, seqNo);
this.partition.SubmitEvents(events.Skip(nextPacketToReceive.batchPos).ToList());
}
nextPacketToReceive = (nextPacket.Value, 0);
}
this.host.logger.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received {totalEvents} events in {latencyMs:F2}ms, next expected packet is #{nextSeqno}", this.host.eventHubPath, this.partitionId, this.Incarnation, totalEvents, stopwatch.Elapsed.TotalMilliseconds, nextPacketToReceive.seqNo);
}
}
}
catch (OperationCanceledException)
{
this.host.logger.LogInformation("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) was terminated", this.host.eventHubPath, this.partitionId, this.Incarnation);
}
catch (Exception exception)
{
this.host.logger.LogError("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) encountered an exception while processing packets : {exception}", this.host.eventHubPath, this.partitionId, this.Incarnation, exception);
this.partition.ErrorHandler.HandleError("IEventProcessor.ProcessEventsAsync", "Encountered exception while processing events", exception, true, false);
}
this.host.logger.LogInformation("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) ReceiverLoop exits", this.host.eventHubPath, this.partitionId, this.Incarnation);
}
}
}
}

Просмотреть файл

@ -21,7 +21,7 @@
// --- the levels below are used to control the Netherite tracing.
"DurableTask.Netherite": "Information",
"DurableTask.Netherite.FasterStorage": "Warning",
"DurableTask.Netherite.FasterStorage": "Information",
"DurableTask.Netherite.FasterStorage.Performance": "Error",
"DurableTask.Netherite.EventHubsTransport": "Warning",
"DurableTask.Netherite.Events": "Warning",
@ -90,10 +90,16 @@
// can change this to use a different table, or blobs
//"LoadInformationAzureTableName": "",
// set this to "Scripted" to control the scenario with a partition script
// or to "ClientOnly" to run only the client
// default partition management: use native EH partition management
"PartitionManagement": "EventProcessorHost",
// EXAMPLE alternative: run only a client, but no work items
// "PartitionManagement": "ClientOnly",
// EXAMPLE alternative: test recovery of partition 7 (without modifying any data)
// "PartitionManagement": "RecoveryTester",
// "PartitionManagementParameters": "7",
// set this to "Local" to disable the global activity distribution algorithm
// options: "Local", "Static", "Locavore"
"ActivityScheduler": "Locavore",