diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
index 687a767..46a14f8 100644
--- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
@@ -91,6 +91,11 @@ namespace DurableTask.Netherite
[JsonConverter(typeof(StringEnumConverter))]
public PartitionManagementOptions PartitionManagement { get; set; } = PartitionManagementOptions.EventProcessorHost;
+ ///
+ /// Additional parameters for the partition management, if necessary
+ ///
+ public string PartitionManagementParameters { get; set; } = null;
+
///
/// Gets or sets the activity scheduler option
///
diff --git a/src/DurableTask.Netherite/OrchestrationService/PartitionManagementOptions.cs b/src/DurableTask.Netherite/OrchestrationService/PartitionManagementOptions.cs
index 0470a1a..949aae7 100644
--- a/src/DurableTask.Netherite/OrchestrationService/PartitionManagementOptions.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/PartitionManagementOptions.cs
@@ -18,13 +18,20 @@ namespace DurableTask.Netherite
ClientOnly,
///
- /// Use the event processor host implementation provided by the EventHubs client library.
+ /// Use the event processor host implementation provided by the EventHubs client library. This dynamically
+ /// balances the partitions across all connected hosts.
///
EventProcessorHost,
///
/// Follow a predefined partition management script. This is meant to be used for testing and benchmarking scenarios.
+ /// This was an internal feature and is no longer supported.
///
Scripted,
+
+ ///
+ /// Test the recovery without modifying storage. Useful for diagnosing recovery problems. Not functional as an orchestration service!
+ ///
+ RecoveryTester,
}
}
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs
index 0dbe9c6..bf37ef2 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs
@@ -119,6 +119,7 @@ namespace DurableTask.Netherite.Faster
this.pageBlobDirectory.ToString(),
2000,
true,
+ failIfReadonly: false,
async (numAttempts) =>
{
var client = this.pageBlobDirectory.Client.WithRetries;
@@ -299,6 +300,7 @@ namespace DurableTask.Netherite.Faster
entry.PageBlob.Default.Name,
5000,
true,
+ failIfReadonly: true,
async (numAttempts) =>
{
var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive;
@@ -334,6 +336,7 @@ namespace DurableTask.Netherite.Faster
entry.PageBlob.Default.Name,
5000,
false,
+ failIfReadonly: true,
async (numAttempts) =>
{
var client = (numAttempts > 1) ? entry.PageBlob.Default : entry.PageBlob.Aggressive;
@@ -459,6 +462,7 @@ namespace DurableTask.Netherite.Faster
blobEntry.PageBlob.Default.Name,
1000 + (int)length / 1000,
true,
+ failIfReadonly: true,
async (numAttempts) =>
{
if (numAttempts > 0)
@@ -519,6 +523,7 @@ namespace DurableTask.Netherite.Faster
blob.Default.Name,
1000 + (int)length / 1000,
true,
+ failIfReadonly: false,
async (numAttempts) =>
{
if (numAttempts > 0)
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobEntry.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobEntry.cs
index 54fd854..438a059 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobEntry.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobEntry.cs
@@ -71,6 +71,7 @@ namespace DurableTask.Netherite.Faster
pageBlob.Default.Name,
3000,
true,
+ failIfReadonly: true,
async (numAttempts) =>
{
var client = (numAttempts > 1) ? pageBlob.Default : pageBlob.Aggressive;
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
index 453c491..cf7a2a7 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
@@ -31,6 +31,7 @@ namespace DurableTask.Netherite.Faster
readonly uint partitionId;
readonly CancellationTokenSource shutDownOrTermination;
readonly string taskHubPrefix;
+ readonly bool readOnlyMode;
BlobUtilsV12.ServiceClients blockBlobAccount;
BlobUtilsV12.ServiceClients pageBlobAccount;
@@ -62,6 +63,8 @@ namespace DurableTask.Netherite.Faster
public string ContainerName { get; }
+ public bool ReadonlyMode => this.readOnlyMode;
+
internal BlobUtilsV12.ContainerClients BlockBlobContainer => this.blockBlobContainer;
internal BlobUtilsV12.ContainerClients PageBlobContainer => this.pageBlobContainer;
@@ -349,6 +352,11 @@ namespace DurableTask.Netherite.Faster
this.TraceHelper = new FasterTraceHelper(logger, logLevelLimit, performanceLogger, this.partitionId, this.UseLocalFiles ? "none" : this.settings.StorageAccountName, taskHubName);
this.PartitionErrorHandler = errorHandler;
this.shutDownOrTermination = CancellationTokenSource.CreateLinkedTokenSource(errorHandler.Token);
+
+ if (this.settings.PartitionManagement == PartitionManagementOptions.RecoveryTester)
+ {
+ this.readOnlyMode = true;
+ }
}
string PartitionFolderName => $"{this.taskHubPrefix}p{this.partitionId:D2}";
@@ -589,6 +597,7 @@ namespace DurableTask.Netherite.Faster
this.eventLogCommitBlob.Default.Name,
2000,
true,
+ failIfReadonly: true,
async (numAttempts) =>
{
try
@@ -769,7 +778,7 @@ namespace DurableTask.Netherite.Faster
//TODO
return;
}
- else
+ else if (!this.readOnlyMode)
{
string token1 = this.CheckpointInfo.LogToken.ToString();
string token2 = this.CheckpointInfo.IndexToken.ToString();
@@ -804,6 +813,7 @@ namespace DurableTask.Netherite.Faster
directory.Prefix,
1000,
false,
+ failIfReadonly: true,
async (numAttempts) =>
{
results = await directory.GetBlobsAsync(this.shutDownOrTermination.Token);
@@ -841,6 +851,7 @@ namespace DurableTask.Netherite.Faster
blobName,
1000,
false,
+ failIfReadonly: true,
async (numAttempts) => (await BlobUtilsV12.ForceDeleteAsync(directory.Client.Default, blobName) ? 1 : 0)));
}
await Task.WhenAll(deletionTasks);
@@ -889,6 +900,7 @@ namespace DurableTask.Netherite.Faster
this.eventLogCommitBlob.Default.Name,
1000,
true,
+ failIfReadonly: true,
(int numAttempts) =>
{
try
@@ -967,6 +979,7 @@ namespace DurableTask.Netherite.Faster
this.eventLogCommitBlob.Name,
1000,
true,
+ failIfReadonly: false,
(int numAttempts) =>
{
if (numAttempts > 0)
@@ -1074,6 +1087,7 @@ namespace DurableTask.Netherite.Faster
checkpointCompletedBlob.Name,
1000,
true,
+ failIfReadonly: false,
async (numAttempts) =>
{
try
@@ -1125,6 +1139,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
+ failIfReadonly: true,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@@ -1162,6 +1177,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
+ failIfReadonly: true,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@@ -1205,6 +1221,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
+ failIfReadonly: false,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@@ -1242,6 +1259,7 @@ namespace DurableTask.Netherite.Faster
metaFileBlob.Name,
1000,
true,
+ failIfReadonly: false,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
@@ -1369,6 +1387,7 @@ namespace DurableTask.Netherite.Faster
singletonsBlob.Name,
1000 + singletons.Length / 5000,
false,
+ failIfReadonly: true,
async (numAttempts) =>
{
var client = singletonsBlob.WithRetries;
@@ -1402,6 +1421,7 @@ namespace DurableTask.Netherite.Faster
singletonsBlob.Name,
20000,
true,
+ failIfReadonly: false,
async (numAttempts) =>
{
@@ -1435,6 +1455,7 @@ namespace DurableTask.Netherite.Faster
checkpointCompletedBlob.Name,
1000,
true,
+ failIfReadonly: true,
async (numAttempts) =>
{
var client = numAttempts > 1 ? checkpointCompletedBlob.Default : checkpointCompletedBlob.Aggressive;
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs
index a07acbb..8aa5712 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs
@@ -25,9 +25,18 @@ namespace DurableTask.Netherite.Faster
string target,
int expectedLatencyBound,
bool isCritical,
+ bool failIfReadonly,
Func> operationAsync,
Func readETagAsync = null)
{
+ if (this.readOnlyMode && failIfReadonly)
+ {
+ string message = $"storage operation {name} ({intent}) cannot be performed in read-only mode";
+ this.StorageTracer?.FasterStorageProgress(message);
+ this.HandleStorageError(name, message, target, null, isCritical, false);
+ throw new OperationCanceledException(message);
+ }
+
try
{
if (semaphore != null)
@@ -138,8 +147,17 @@ namespace DurableTask.Netherite.Faster
string target,
int expectedLatencyBound,
bool isCritical,
+ bool failIfReadonly,
Func operation)
{
+ if (this.readOnlyMode && failIfReadonly)
+ {
+ string message = $"storage operation {name} ({intent}) cannot be performed in read-only mode";
+ this.StorageTracer?.FasterStorageProgress(message);
+ this.HandleStorageError(name, message, target, null, isCritical, false);
+ throw new OperationCanceledException(message);
+ }
+
Stopwatch stopwatch = new Stopwatch();
int numAttempts = 0;
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs
index 37ce09a..7d7bf28 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs
@@ -124,6 +124,14 @@ namespace DurableTask.Netherite.Faster
async Task IStorageLayer.CreateTaskhubIfNotExistsAsync()
{
+ if (this.settings.PartitionManagement == PartitionManagementOptions.RecoveryTester)
+ {
+ // we do NOT create any resources during recovery testing
+ await ((IStorageLayer)this).TryLoadTaskhubAsync(true);
+ this.traceHelper.TraceProgress("Confirmed existing taskhub");
+ return false;
+ }
+
bool containerCreated = await (await this.cloudBlobContainer).CreateIfNotExistsAsync();
if (containerCreated)
{
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
index f94e465..c856fc8 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
@@ -206,8 +206,13 @@ namespace DurableTask.Netherite.Faster
throw;
}
+ this.TraceHelper.FasterProgress("Replay complete");
+
// restart pending actitivities, timers, work items etc.
- this.storeWorker.RestartThingsAtEndOfRecovery(inputQueueFingerprint, this.blobManager.IncarnationTimestamp);
+ if (!this.blobManager.ReadonlyMode)
+ {
+ this.storeWorker.RestartThingsAtEndOfRecovery(inputQueueFingerprint, this.blobManager.IncarnationTimestamp);
+ }
this.TraceHelper.FasterProgress("Recovery complete");
}
@@ -217,8 +222,11 @@ namespace DurableTask.Netherite.Faster
public void StartProcessing()
{
- this.storeWorker.StartProcessing();
- this.logWorker.StartProcessing();
+ if (!this.blobManager.ReadonlyMode)
+ {
+ this.storeWorker.StartProcessing();
+ this.logWorker.StartProcessing();
+ }
}
internal void CheckForStuckWorkers(object _)
@@ -252,21 +260,24 @@ namespace DurableTask.Netherite.Faster
public async Task CleanShutdown(bool takeFinalCheckpoint)
{
- this.TraceHelper.FasterProgress("Stopping workers");
-
- // in parallel, finish processing log requests and stop processing store requests
- Task t1 = this.logWorker.PersistAndShutdownAsync();
- Task t2 = this.storeWorker.CancelAndShutdown();
-
- // observe exceptions if the clean shutdown is not working correctly
- await this.TerminationWrapper(t1);
- await this.TerminationWrapper(t2);
-
- // if the the settings indicate we want to take a final checkpoint, do so now.
- if (takeFinalCheckpoint)
+ if (!this.blobManager.ReadonlyMode)
{
- this.TraceHelper.FasterProgress("Writing final checkpoint");
- await this.TerminationWrapper(this.storeWorker.TakeFullCheckpointAsync("final checkpoint").AsTask());
+ this.TraceHelper.FasterProgress("Stopping workers");
+
+ // in parallel, finish processing log requests and stop processing store requests
+ Task t1 = this.logWorker.PersistAndShutdownAsync();
+ Task t2 = this.storeWorker.CancelAndShutdown();
+
+ // observe exceptions if the clean shutdown is not working correctly
+ await this.TerminationWrapper(t1);
+ await this.TerminationWrapper(t2);
+
+ // if the the settings indicate we want to take a final checkpoint, do so now.
+ if (takeFinalCheckpoint)
+ {
+ this.TraceHelper.FasterProgress("Writing final checkpoint");
+ await this.TerminationWrapper(this.storeWorker.TakeFullCheckpointAsync("final checkpoint").AsTask());
+ }
}
this.TraceHelper.FasterProgress("Stopping BlobManager");
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsPartitionManager.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsPartitionManager.cs
new file mode 100644
index 0000000..8bc58ff
--- /dev/null
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsPartitionManager.cs
@@ -0,0 +1,169 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace DurableTask.Netherite.EventHubsTransport
+{
+ using DurableTask.Core.Common;
+ using DurableTask.Netherite.Abstractions;
+ using Microsoft.Azure.EventHubs;
+ using Microsoft.Azure.EventHubs.Processor;
+ using Microsoft.Azure.Storage.Blob;
+ using Microsoft.Extensions.Logging;
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ ///
+ /// The partition manager natively provided by EH
+ ///
+ class EventHubsPartitionManager : IPartitionManager
+ {
+ readonly TransportAbstraction.IHost host;
+ readonly EventHubsConnections connections;
+ readonly TaskhubParameters parameters;
+ readonly NetheriteOrchestrationServiceSettings settings;
+ readonly EventHubsTraceHelper traceHelper;
+ readonly CloudBlobContainer cloudBlobContainer;
+ readonly string pathPrefix;
+ readonly EventHubsTransport transport;
+ readonly CancellationToken shutdownToken;
+
+ EventProcessorHost eventProcessorHost;
+ EventProcessorHost loadMonitorHost;
+
+ public EventHubsPartitionManager(
+ TransportAbstraction.IHost host,
+ EventHubsConnections connections,
+ TaskhubParameters parameters,
+ NetheriteOrchestrationServiceSettings settings,
+ EventHubsTraceHelper traceHelper,
+ CloudBlobContainer cloudBlobContainer,
+ string pathPrefix,
+ EventHubsTransport transport,
+ CancellationToken shutdownToken)
+ {
+ this.host = host;
+ this.connections = connections;
+ this.parameters = parameters;
+ this.settings = settings;
+ this.traceHelper = traceHelper;
+ this.cloudBlobContainer = cloudBlobContainer;
+ this.pathPrefix = pathPrefix;
+ this.transport = transport;
+ this.shutdownToken = shutdownToken;
+ }
+
+ public string Fingerprint => this.connections.Fingerprint;
+
+ public Task StartHostingAsync()
+ {
+ return Task.WhenAll(this.StartHostingPartitionsAsync(), this.StartHostingLoadMonitorAsync());
+ }
+
+ public Task StopHostingAsync()
+ {
+ return Task.WhenAll(this.eventProcessorHost.UnregisterEventProcessorAsync(), this.loadMonitorHost.UnregisterEventProcessorAsync());
+ }
+
+ async Task StartHostingPartitionsAsync()
+ {
+ this.traceHelper.LogInformation($"EventHubsTransport is registering EventProcessorHost");
+
+ string formattedCreationDate = this.connections.CreationTimestamp.ToString("o").Replace("/", "-");
+
+ this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
+ Guid.NewGuid().ToString(),
+ EventHubsTransport.PartitionHub,
+ EventHubsTransport.PartitionConsumerGroup,
+ this.settings.BlobStorageConnection,
+ this.cloudBlobContainer.Name,
+ $"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}");
+
+ var processorOptions = new EventProcessorOptions()
+ {
+ MaxBatchSize = 300,
+ PrefetchCount = 500,
+ };
+
+ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
+ new PartitionEventProcessorFactory(this),
+ processorOptions);
+
+ this.traceHelper.LogInformation($"EventHubsTransport started EventProcessorHost");
+ }
+
+ async Task StartHostingLoadMonitorAsync()
+ {
+ this.traceHelper.LogInformation("EventHubsTransport is registering LoadMonitorHost");
+
+ this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
+ Guid.NewGuid().ToString(),
+ EventHubsTransport.LoadMonitorHub,
+ EventHubsTransport.LoadMonitorConsumerGroup,
+ this.settings.BlobStorageConnection,
+ this.cloudBlobContainer.Name,
+ $"{this.pathPrefix}eh-checkpoints/{EventHubsTransport.LoadMonitorHub}");
+
+ var processorOptions = new EventProcessorOptions()
+ {
+ InitialOffsetProvider = (s) => EventPosition.FromEnqueuedTime(DateTime.UtcNow - TimeSpan.FromSeconds(30)),
+ MaxBatchSize = 500,
+ PrefetchCount = 500,
+ };
+
+ await this.loadMonitorHost.RegisterEventProcessorFactoryAsync(
+ new LoadMonitorEventProcessorFactory(this),
+ processorOptions);
+
+ this.traceHelper.LogInformation($"EventHubsTransport started LoadMonitorHost");
+ }
+
+ class PartitionEventProcessorFactory : IEventProcessorFactory
+ {
+ readonly EventHubsPartitionManager manager;
+
+ public PartitionEventProcessorFactory(EventHubsPartitionManager transport)
+ {
+ this.manager = transport;
+ }
+
+ public IEventProcessor CreateEventProcessor(PartitionContext context)
+ {
+ return new EventHubsProcessor(
+ this.manager.host,
+ this.manager.transport,
+ this.manager.parameters,
+ context,
+ this.manager.settings,
+ this.manager.transport,
+ this.manager.traceHelper,
+ this.manager.shutdownToken);
+ }
+ }
+
+ class LoadMonitorEventProcessorFactory : IEventProcessorFactory
+ {
+ readonly EventHubsPartitionManager manager;
+
+ public LoadMonitorEventProcessorFactory(EventHubsPartitionManager transport)
+ {
+ this.manager = transport;
+ }
+
+ public IEventProcessor CreateEventProcessor(PartitionContext context)
+ {
+ return new LoadMonitorProcessor(
+ this.manager.host,
+ this.manager.transport,
+ this.manager.parameters,
+ context,
+ this.manager.settings,
+ this.manager.traceHelper,
+ this.manager.shutdownToken);
+ }
+ }
+ }
+}
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs
index ad2ac87..6bc74dc 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs
@@ -33,9 +33,8 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly EventHubsTraceHelper traceHelper;
readonly IStorageLayer storage;
readonly string shortClientId;
+ readonly bool useRealEventHubsConnection;
- EventProcessorHost eventProcessorHost;
- EventProcessorHost loadMonitorHost;
TransportAbstraction.IClient client;
bool hasWorkers;
@@ -50,8 +49,7 @@ namespace DurableTask.Netherite.EventHubsTransport
CancellationTokenSource shutdownSource;
CloudBlobContainer cloudBlobContainer;
- CloudBlockBlob partitionScript;
- ScriptedEventProcessorHost scriptedEventProcessorHost;
+ IPartitionManager partitionManager;
int shutdownTriggered;
@@ -76,6 +74,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName);
this.ClientId = Guid.NewGuid();
this.shortClientId = Client.GetShortId(this.ClientId);
+ this.useRealEventHubsConnection = this.settings.PartitionManagement != PartitionManagementOptions.RecoveryTester;
}
// these are hardcoded now but we may turn them into settings
@@ -105,18 +104,20 @@ namespace DurableTask.Netherite.EventHubsTransport
var cloudStorageAccount = await this.settings.BlobStorageConnection.GetAzureStorageV11AccountAsync();
var cloudBlobClient = cloudStorageAccount.CreateCloudBlobClient();
this.cloudBlobContainer = cloudBlobClient.GetContainerReference(containerName);
- this.partitionScript = this.cloudBlobContainer.GetBlockBlobReference("partitionscript.json");
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
- this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
+ if (this.useRealEventHubsConnection)
{
- Host = host,
- TraceHelper = this.traceHelper,
- };
+ this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
+ {
+ Host = this.host,
+ TraceHelper = this.traceHelper,
+ };
- await this.connections.StartAsync(this.parameters);
+ await this.connections.StartAsync(this.parameters);
+ }
return this.parameters;
}
@@ -125,29 +126,32 @@ namespace DurableTask.Netherite.EventHubsTransport
{
this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this);
- var channel = Channel.CreateBounded(new BoundedChannelOptions(500)
+ if (this.useRealEventHubsConnection)
{
- SingleReader = true,
- AllowSynchronousContinuations = true
- });
+ var channel = Channel.CreateBounded(new BoundedChannelOptions(500)
+ {
+ SingleReader = true,
+ AllowSynchronousContinuations = true
+ });
- var clientReceivers = this.connections.CreateClientReceivers(this.ClientId, EventHubsTransport.ClientConsumerGroup);
+ var clientReceivers = this.connections.CreateClientReceivers(this.ClientId, EventHubsTransport.ClientConsumerGroup);
- this.clientConnectionsEstablished = Enumerable
- .Range(0, EventHubsConnections.NumClientChannels)
- .Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
- .ToArray();
-
- this.clientReceiveLoops = Enumerable
- .Range(0, EventHubsConnections.NumClientChannels)
- .Select(i => this.ClientReceiveLoopAsync(i, clientReceivers[i], channel.Writer))
- .ToArray();
+ this.clientConnectionsEstablished = Enumerable
+ .Range(0, EventHubsConnections.NumClientChannels)
+ .Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
+ .ToArray();
- this.clientProcessTask = this.ClientProcessLoopAsync(channel.Reader);
+ this.clientReceiveLoops = Enumerable
+ .Range(0, EventHubsConnections.NumClientChannels)
+ .Select(i => this.ClientReceiveLoopAsync(i, clientReceivers[i], channel.Writer))
+ .ToArray();
- // we must wait for the client receive connections to be established before continuing
- // otherwise, we may miss messages that are sent before the client receiver establishes the receive position
- await Task.WhenAll(this.clientConnectionsEstablished);
+ this.clientProcessTask = this.ClientProcessLoopAsync(channel.Reader);
+
+ // we must wait for the client receive connections to be established before continuing
+ // otherwise, we may miss messages that are sent before the client receiver establishes the receive position
+ await Task.WhenAll(this.clientConnectionsEstablished);
+ }
}
async Task ITransportLayer.StartWorkersAsync()
@@ -156,91 +160,55 @@ namespace DurableTask.Netherite.EventHubsTransport
{
throw new InvalidOperationException("client must be started before partition hosting is started.");
}
-
- if (this.settings.PartitionManagement != PartitionManagementOptions.ClientOnly)
+
+ switch (this.settings.PartitionManagement)
{
- this.hasWorkers = true;
- await Task.WhenAll(StartPartitionHost(), StartLoadMonitorHost());
- }
+ case (PartitionManagementOptions.ClientOnly):
+ this.hasWorkers = false;
+ break;
- async Task StartPartitionHost()
- {
- if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted)
- {
- this.traceHelper.LogInformation($"EventHubsTransport is registering PartitionHost");
+ case (PartitionManagementOptions.EventProcessorHost):
+ this.hasWorkers = true;
+ this.partitionManager = new EventHubsPartitionManager(
+ this.host,
+ this.connections,
+ this.parameters,
+ this.settings,
+ this.traceHelper,
+ this.cloudBlobContainer,
+ this.pathPrefix,
+ this,
+ this.shutdownSource.Token);
+ break;
- string formattedCreationDate = this.connections.CreationTimestamp.ToString("o").Replace("/", "-");
-
- this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
- Guid.NewGuid().ToString(),
- EventHubsTransport.PartitionHub,
- EventHubsTransport.PartitionConsumerGroup,
- this.settings.BlobStorageConnection,
- this.cloudBlobContainer.Name,
- $"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}");
-
- var processorOptions = new EventProcessorOptions()
- {
- MaxBatchSize = 300,
- PrefetchCount = 500,
- };
-
- await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
- new PartitionEventProcessorFactory(this),
- processorOptions);
-
- this.traceHelper.LogInformation($"EventHubsTransport started PartitionHost");
- }
- else
- {
- this.traceHelper.LogInformation($"EventHubsTransport is starting scripted partition host");
- this.scriptedEventProcessorHost = new ScriptedEventProcessorHost(
- EventHubsTransport.PartitionHub,
- EventHubsTransport.PartitionConsumerGroup,
- this.settings.EventHubsConnection,
- this.settings.BlobStorageConnection,
- this.cloudBlobContainer.Name,
+ case (PartitionManagementOptions.RecoveryTester):
+ this.hasWorkers = true;
+ this.partitionManager = new RecoveryTester(
this.host,
- this,
- this.connections,
this.parameters,
this.settings,
- this.traceHelper,
- this.settings.WorkerId);
+ this.traceHelper);
+ break;
- var thread = TrackedThreads.MakeTrackedThread(() => this.scriptedEventProcessorHost.StartEventProcessing(this.settings, this.partitionScript), "ScriptedEventProcessorHost");
- thread.Start();
- }
+ case (PartitionManagementOptions.Scripted):
+ // we retired this feature since it was only meant for internal development, and we did not use it much
+ throw new NetheriteConfigurationException($"the partition management setting {PartitionManagementOptions.Scripted} is no longer supported.");
+
+ default:
+ throw new NetheriteConfigurationException($"unknown partition management setting: {this.settings.PartitionManagement}");
}
- async Task StartLoadMonitorHost()
+ if (this.hasWorkers)
{
- this.traceHelper.LogInformation("EventHubsTransport is registering LoadMonitorHost");
-
- this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
- Guid.NewGuid().ToString(),
- LoadMonitorHub,
- LoadMonitorConsumerGroup,
- this.settings.BlobStorageConnection,
- this.cloudBlobContainer.Name,
- $"{this.pathPrefix}eh-checkpoints/{LoadMonitorHub}");
-
- var processorOptions = new EventProcessorOptions()
- {
- InitialOffsetProvider = (s) => EventPosition.FromEnqueuedTime(DateTime.UtcNow - TimeSpan.FromSeconds(30)),
- MaxBatchSize = 500,
- PrefetchCount = 500,
- };
-
- await this.loadMonitorHost.RegisterEventProcessorFactoryAsync(
- new LoadMonitorEventProcessorFactory(this),
- processorOptions);
+ this.traceHelper.LogDebug("EventHubsTransport is starting hosting work");
+ await this.partitionManager.StartHostingAsync();
+ this.traceHelper.LogDebug("EventHubsTransport started hosting work");
}
}
internal async Task ExitProcess(bool deletePartitionsFirst)
{
- if (deletePartitionsFirst)
+ if (deletePartitionsFirst && this.useRealEventHubsConnection)
{
await this.connections.DeletePartitions();
}
@@ -250,51 +218,6 @@ namespace DurableTask.Netherite.EventHubsTransport
System.Environment.Exit(222);
}
- class PartitionEventProcessorFactory : IEventProcessorFactory
- {
- readonly EventHubsTransport transport;
-
- public PartitionEventProcessorFactory(EventHubsTransport transport)
- {
- this.transport = transport;
- }
-
- public IEventProcessor CreateEventProcessor(PartitionContext context)
- {
- return new EventHubsProcessor(
- this.transport.host,
- this.transport,
- this.transport.parameters,
- context,
- this.transport.settings,
- this.transport,
- this.transport.traceHelper,
- this.transport.shutdownSource.Token);
- }
- }
-
- class LoadMonitorEventProcessorFactory : IEventProcessorFactory
- {
- readonly EventHubsTransport transport;
-
- public LoadMonitorEventProcessorFactory(EventHubsTransport transport)
- {
- this.transport = transport;
- }
-
- public IEventProcessor CreateEventProcessor(PartitionContext context)
- {
- return new LoadMonitorProcessor(
- this.transport.host,
- this.transport,
- this.transport.parameters,
- context,
- this.transport.settings,
- this.transport.traceHelper,
- this.transport.shutdownSource.Token);
- }
- }
-
async Task ITransportLayer.StopAsync(bool fatalExceptionObserved)
{
if (Interlocked.CompareExchange(ref this.shutdownTriggered, 1, 0) == 0)
@@ -304,7 +227,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions
await Task.WhenAll(
- this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask,
+ this.hasWorkers ? this.partitionManager.StopHostingAsync() : Task.CompletedTask,
this.StopClientsAndConnectionsAsync());
this.traceHelper.LogInformation("EventHubsTransport is shut down");
@@ -313,45 +236,28 @@ namespace DurableTask.Netherite.EventHubsTransport
async Task StopWorkersAsync()
{
- this.traceHelper.LogDebug("EventHubsTransport is stopping partition and loadmonitor hosts");
- await Task.WhenAll(
- this.StopPartitionHostAsync(),
- this.StopLoadMonitorHostAsync());
+ this.traceHelper.LogDebug("EventHubsTransport is stopping hosting work");
+ await this.partitionManager.StopHostingAsync();
+ this.traceHelper.LogDebug("EventHubsTransport stopped hosting work");
}
async Task StopClientsAndConnectionsAsync()
{
- this.traceHelper.LogDebug("EventHubsTransport is stopping client process loop");
- await this.clientProcessTask;
+ if (this.useRealEventHubsConnection)
+ {
+ this.traceHelper.LogDebug("EventHubsTransport is stopping client process loop");
+ await this.clientProcessTask;
- this.traceHelper.LogDebug("EventHubsTransport is closing connections");
- await this.connections.StopAsync();
+ this.traceHelper.LogDebug("EventHubsTransport is closing connections");
+ await this.connections.StopAsync();
+ }
this.traceHelper.LogDebug("EventHubsTransport is stopping client");
await this.client.StopAsync();
-
+
this.traceHelper.LogDebug("EventHubsTransport stopped clients");
}
- async Task StopPartitionHostAsync()
- {
- if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted)
- {
- await this.eventProcessorHost.UnregisterEventProcessorAsync();
- }
- else
- {
- await this.scriptedEventProcessorHost.StopAsync();
- }
- this.traceHelper.LogDebug("EventHubsTransport stopped partition host");
- }
-
- async Task StopLoadMonitorHostAsync()
- {
- await this.loadMonitorHost.UnregisterEventProcessorAsync();
- this.traceHelper.LogDebug("EventHubsTransport stopped loadmonitor host");
- }
-
IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext partitionContext)
{
var processor = new EventHubsProcessor(this.host, this, this.parameters, partitionContext, this.settings, this, this.traceHelper, this.shutdownSource.Token);
@@ -360,6 +266,12 @@ namespace DurableTask.Netherite.EventHubsTransport
void TransportAbstraction.ISender.Submit(Event evt)
{
+ if (!this.useRealEventHubsConnection)
+ {
+ DurabilityListeners.ReportException(evt, new InvalidOperationException("client is not functional - are you running in RecoveryTester mode?"));
+ return;
+ }
+
switch (evt)
{
case ClientEvent clientEvent:
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/IPartitionManager.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/IPartitionManager.cs
new file mode 100644
index 0000000..12f4ad9
--- /dev/null
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/IPartitionManager.cs
@@ -0,0 +1,17 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace DurableTask.Netherite.EventHubsTransport
+{
+ using System.Threading.Tasks;
+
+ ///
+ /// General interface for starting and stopping the logic that activates the partition and load monitor on hosts.
+ ///
+ interface IPartitionManager
+ {
+ Task StartHostingAsync();
+
+ Task StopHostingAsync();
+ }
+}
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/PartitionScript.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/PartitionScript.cs
index 53cac2c..b43d342 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/PartitionScript.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/PartitionScript.cs
@@ -8,7 +8,7 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.IO;
///
- /// Functionality for parsing the partition scripts used by .
+ /// Functionality for parsing the partition scripts used by .
///
static class PartitionScript
{
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/RecoveryTester.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/RecoveryTester.cs
new file mode 100644
index 0000000..8594420
--- /dev/null
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/RecoveryTester.cs
@@ -0,0 +1,133 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace DurableTask.Netherite.EventHubsTransport
+{
+ using DurableTask.Core.Common;
+ using DurableTask.Netherite.Abstractions;
+ using Microsoft.Azure.EventHubs;
+ using Microsoft.Azure.Storage.Blob;
+ using Microsoft.Extensions.Logging;
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ ///
+ /// A "partition manager" for diagnostic purposes, which recovers one or more partitions, but does not modify any files.
+ ///
+ class RecoveryTester : IPartitionManager
+ {
+ readonly TransportAbstraction.IHost host;
+ readonly TaskhubParameters parameters;
+ readonly NetheriteOrchestrationServiceSettings settings;
+ readonly EventHubsTraceHelper logger;
+ readonly List partitionsToTest;
+ readonly List partitions;
+
+ public RecoveryTester(
+ TransportAbstraction.IHost host,
+ TaskhubParameters parameters,
+ NetheriteOrchestrationServiceSettings settings,
+ EventHubsTraceHelper logger)
+ {
+ this.host = host;
+ this.parameters = parameters;
+ this.settings = settings;
+ this.logger = logger;
+ this.ParseParameters(out this.partitionsToTest);
+ this.partitions = new List();
+ }
+
+ public async Task StartHostingAsync()
+ {
+ this.logger.LogWarning($"RecoveryTester is testing partition recovery, parameters={this.settings.PartitionManagementParameters ?? "null"}");
+
+ this.ParseParameters(out List partitionsToTest);
+
+ foreach(uint partitionId in partitionsToTest)
+ {
+ try
+ {
+ this.logger.LogInformation("RecoveryTester is starting partition {partitionId}", partitionId);
+
+ // start this partition (which may include waiting for the lease to become available)
+ var partition = this.host.AddPartition(partitionId, new NullSender());
+
+ var errorHandler = this.host.CreateErrorHandler(partitionId);
+
+ var nextPacketToReceive = await partition.CreateOrRestoreAsync(errorHandler, this.parameters, "").ConfigureAwait(false);
+
+ this.logger.LogInformation("RecoveryTester recovered partition {partitionId} successfully", partitionId);
+
+ this.partitions.Add(partition);
+ }
+ catch (Exception e)
+ {
+ this.logger.LogError("RecoveryTester failed while recovering partition {partitionId}: {exception}", partitionId, e);
+ }
+ }
+
+ this.logger.LogWarning($"RecoveryTester completed testing partition recovery");
+ }
+
+ public async Task StopHostingAsync()
+ {
+ foreach (var partition in this.partitions)
+ {
+ try
+ {
+ this.logger.LogInformation("RecoveryTester is stopping partition {partitionId}", partition.PartitionId);
+
+ await partition.StopAsync(true);
+
+ this.logger.LogInformation("RecoveryTester stopped partition {partitionId} successfully", partition.PartitionId);
+ }
+ catch (Exception e)
+ {
+ this.logger.LogError("RecoveryTester failed while stopping partition {partitionId}: {exception}", partition.PartitionId, e);
+ }
+ }
+ }
+
+ public void ParseParameters(out List partitionsToTest)
+ {
+ partitionsToTest = new List();
+ if (this.settings.PartitionManagementParameters == null)
+ {
+ // test all partitions
+ for (uint i = 0; i < this.parameters.PartitionCount; i++)
+ {
+ partitionsToTest.Add(i);
+ }
+ }
+ else
+ {
+ // test only one partition as specified in this.settings.PartitionManagementParameters
+ try
+ {
+ int num = int.Parse(this.settings.PartitionManagementParameters);
+ if (num < 0 || num >= this.parameters.PartitionCount)
+ {
+ throw new IndexOutOfRangeException("partition out of range");
+ }
+ partitionsToTest.Add((uint) num);
+
+ }
+ catch (Exception ex)
+ {
+ throw new NetheriteConfigurationException($"invalid parameter for {nameof(RecoveryTester)}: {this.settings.PartitionManagementParameters}", ex);
+ }
+ }
+ }
+
+ class NullSender: TransportAbstraction.ISender
+ {
+ public void Submit(Event element)
+ {
+ }
+ }
+ }
+}
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/ScriptedEventProcessorHost.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/ScriptedEventProcessorHost.cs
deleted file mode 100644
index c22ff64..0000000
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/ScriptedEventProcessorHost.cs
+++ /dev/null
@@ -1,411 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-namespace DurableTask.Netherite.EventHubsTransport
-{
- using DurableTask.Core.Common;
- using DurableTask.Netherite.Abstractions;
- using Microsoft.Azure.EventHubs;
- using Microsoft.Azure.Storage.Blob;
- using Microsoft.Extensions.Logging;
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
-
- ///
- /// An alternate event processor host where partitions are placed (started and stopped)
- /// according to a script that is read from a blob, as opposed to automatically load balanced.
- /// It is intended for benchmarking and testing scenarios only, not production.
- ///
- class ScriptedEventProcessorHost
- {
- readonly string eventHubPath;
- readonly string consumerGroupName;
- readonly ConnectionInfo eventHubConnection;
- readonly ConnectionInfo storageConnection;
- readonly string leaseContainerName;
- readonly string workerId;
- readonly TransportAbstraction.IHost host;
- readonly TransportAbstraction.ISender sender;
- readonly EventHubsConnections connections;
- readonly TaskhubParameters parameters;
- readonly byte[] taskHubGuid;
- readonly NetheriteOrchestrationServiceSettings settings;
- readonly EventHubsTraceHelper logger;
- readonly List partitionInstances = new List();
-
- int numberOfPartitions;
-
- public ScriptedEventProcessorHost(
- string eventHubPath,
- string consumerGroupName,
- ConnectionInfo eventHubConnection,
- ConnectionInfo storageConnection,
- string leaseContainerName,
- TransportAbstraction.IHost host,
- TransportAbstraction.ISender sender,
- EventHubsConnections connections,
- TaskhubParameters parameters,
- NetheriteOrchestrationServiceSettings settings,
- EventHubsTraceHelper logger,
- string workerId)
- {
- this.eventHubPath = eventHubPath;
- this.consumerGroupName = consumerGroupName;
- this.eventHubConnection = eventHubConnection;
- this.storageConnection = storageConnection;
- this.leaseContainerName = leaseContainerName;
- this.host = host;
- this.sender = sender;
- this.connections = connections;
- this.parameters = parameters;
- this.taskHubGuid = parameters.TaskhubGuid.ToByteArray();
- this.settings = settings;
- this.logger = logger;
- this.workerId = workerId;
- }
-
- public string Fingerprint => this.connections.Fingerprint;
-
- public void StartEventProcessing(NetheriteOrchestrationServiceSettings settings, CloudBlockBlob partitionScript)
- {
- if (!partitionScript.Exists())
- {
- this.logger.LogInformation("ScriptedEventProcessorHost workerId={workerId} is waiting for script", this.workerId);
- while (! partitionScript.Exists())
- {
- Thread.Sleep(TimeSpan.FromSeconds(1));
- }
- }
-
- // we use the UTC modification timestamp on the script as the scenario start time
- DateTime scenarioStartTimeUtc = partitionScript.Properties.LastModified.Value.UtcDateTime;
-
- // the number of partitions matters only if the script contains wildcards
- this.numberOfPartitions = this.parameters.PartitionCount;
- for (var partitionIndex = 0; partitionIndex < this.numberOfPartitions; partitionIndex++)
- {
- this.partitionInstances.Add(null);
- }
-
- List timesteps = new List(); ;
-
- try
- {
- using (var memoryStream = new System.IO.MemoryStream())
- {
- partitionScript.DownloadRangeToStream(memoryStream, null, null);
- memoryStream.Seek(0, System.IO.SeekOrigin.Begin);
- timesteps.AddRange(PartitionScript.ParseEvents(scenarioStartTimeUtc, settings.WorkerId, this.numberOfPartitions, memoryStream));
- }
-
- this.logger.LogInformation("ScriptedEventProcessorHost workerId={workerId} started.", this.workerId);
- }
- catch(Exception e)
- {
- this.logger.LogError($"ScriptedEventProcessorHost workerId={this.workerId} failed to parse partitionscript: {e}");
- }
-
- int nextTime = 0;
- List nextGroup = new List();
-
- foreach (var timestep in timesteps)
- {
- if (nextTime == timestep.TimeSeconds)
- {
- nextGroup.Add(timestep);
- }
- else
- {
- this.Process(nextGroup);
- nextGroup.Clear();
- nextGroup.Add(timestep);
- nextTime = timestep.TimeSeconds;
- }
- }
-
- this.Process(nextGroup);
- }
-
- public Task StopAsync()
- {
- // TODO implement this. Not urgent since this class is currently only used for testing/benchmarking
- return Task.CompletedTask;
- }
-
- void Process(List ready)
- {
- if (ready.Count > 0)
- {
- int delay = (int)(ready[0].TimeUtc - DateTime.UtcNow).TotalMilliseconds;
- if (delay > 0)
- {
- this.logger.LogInformation("ScriptedEventProcessorHost workerId={workerId} is waiting for {delay} ms until next hostEvent", this.workerId, delay);
- Thread.Sleep(delay);
- }
-
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.Start();
-
- bool parallel = true;
-
- var tasks = new List();
- int lasttime = 0;
- foreach (var timestep in ready)
- {
- this.logger.LogWarning("ScriptedEventProcessorHost workerId={workerId} performs action={action} partition={partition} time={time}.", this.workerId, timestep.Action, timestep.PartitionId, timestep.TimeSeconds);
- lasttime = timestep.TimeSeconds;
- }
- foreach (var timestep in ready)
- {
- if (parallel)
- {
- tasks.Add(this.ProcessHostEvent(timestep));
- }
- else
- {
- this.ProcessHostEvent(timestep).GetAwaiter().GetResult();
- }
- }
- Task.WhenAll(tasks).GetAwaiter().GetResult();
- this.logger.LogWarning("ScriptedEventProcessorHost workerId={workerId} finished all actions for time={time} in {elapsedSeconds}s.", this.workerId, lasttime, stopwatch.Elapsed.TotalSeconds);
- }
- }
-
- async Task ProcessHostEvent(PartitionScript.ProcessorHostEvent timestep)
- {
- try
- {
- int partitionId = timestep.PartitionId;
- if (timestep.Action == "restart")
- {
- var oldPartitionInstance = this.partitionInstances[partitionId];
- var newPartitionInstance = new PartitionInstance((uint) partitionId, oldPartitionInstance.Incarnation + 1, this);
- this.partitionInstances[partitionId] = newPartitionInstance;
- await Task.WhenAll(newPartitionInstance.StartAsync(), oldPartitionInstance.StopAsync());
- }
- else if (timestep.Action == "start")
- {
- var oldPartitionInstance = this.partitionInstances[partitionId];
- var newPartitionInstance = new PartitionInstance((uint)partitionId, (oldPartitionInstance?.Incarnation ?? 0) + 1, this);
- this.partitionInstances[partitionId] = newPartitionInstance;
- await newPartitionInstance.StartAsync();
- }
- else if (timestep.Action == "stop")
- {
- var oldPartitionInstance = this.partitionInstances[partitionId];
- await oldPartitionInstance.StopAsync();
- }
- else
- {
- throw new InvalidOperationException($"Unknown action: {timestep.Action}");
- }
-
- this.logger.LogWarning("ScriptedEventProcessorHost workerId={workerId} successfully performed action={action} partition={partition} time={time}.", this.workerId, timestep.Action, timestep.PartitionId, timestep.TimeSeconds);
- }
- catch (Exception e) when (!Utils.IsFatal(e))
- {
- // TODO: Maybe in the future we would like to actually do something in case of failure.
- // For now it is fine to ignore them.
- this.logger.LogError("ScriptedEventProcessorHost workerId={workerId} failed on action={action} partition={partition} time={time} exception={exception}", this.workerId, timestep.Action, timestep.PartitionId, timestep.TimeSeconds, e);
- }
- }
-
- ///
- /// Represents a particular instance of a partition that is being managed by a CustomEventProcessor host.
- ///
- class PartitionInstance
- {
- readonly uint partitionId;
- readonly ScriptedEventProcessorHost host;
- readonly BlobBatchReceiver blobBatchReceiver;
-
- TransportAbstraction.IPartition partition;
- Task partitionEventLoop;
- PartitionReceiver partitionReceiver;
- CancellationTokenSource shutdownSource;
- Task shutdownTask;
- // Just copied from EventHubsTransport
- const int MaxReceiveBatchSize = 1000; // actual batches are typically much smaller
-
- public PartitionInstance(uint partitionId, int incarnation, ScriptedEventProcessorHost eventProcessorHost)
- {
- this.partitionId = partitionId;
- this.Incarnation = incarnation;
- this.host = eventProcessorHost;
- string traceContext = $"PartitionInstance {this.host.eventHubPath}/{this.partitionId}({this.Incarnation})";
- this.blobBatchReceiver = new BlobBatchReceiver(traceContext, this.host.logger, this.host.settings, keepUntilConfirmed: true);
- }
-
- public int Incarnation { get; }
-
- public async Task StartAsync()
- {
- this.shutdownSource = new CancellationTokenSource();
- this.shutdownTask = this.WaitForShutdownAsync();
-
- try
- {
- this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) is starting partition", this.host.eventHubPath, this.partitionId, this.Incarnation);
-
- // start this partition (which may include waiting for the lease to become available)
- this.partition = this.host.host.AddPartition(this.partitionId, this.host.sender);
-
- var errorHandler = this.host.host.CreateErrorHandler(this.partitionId);
-
- var nextPacketToReceive = await this.partition.CreateOrRestoreAsync(errorHandler, this.host.parameters, this.host.Fingerprint).ConfigureAwait(false);
- this.host.logger.LogInformation("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) started partition, next expected packet is #{nextSeqno}.{batchPos}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.Item1, nextPacketToReceive.Item2);
-
- this.partitionEventLoop = Task.Run(() => this.PartitionEventLoop(nextPacketToReceive));
- }
- catch (Exception e) when (!Utils.IsFatal(e))
- {
- this.host.logger.LogError("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) failed to start partition: {exception}", this.host.eventHubPath, this.partitionId, this.Incarnation, e);
- throw;
- }
- }
-
- async Task WaitForShutdownAsync()
- {
- if (!this.shutdownSource.IsCancellationRequested)
- {
- var tcs = new TaskCompletionSource