diff --git a/README.md b/README.md index 2922de0..010f615 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Netherite is a distributed workflow execution engine for [Durable Functions](htt It is of potential interest to anyone developing applications on those platforms who has an appetite for performance, scalability, and reliability. As Netherite is intended to be a drop-in backend replacement, it does not modify the application API. Existing DF and DTFx applications can switch to this backend with little effort. +However, we do not support migrating existing task hub contents between different backends. ## Getting Started @@ -62,7 +63,7 @@ For some other considerations about how to choose the engine, see [the documenta ## Status -The current version of Netherite is *1.3.1*. Netherite supports almost all of the DT and DF APIs. +The current version of Netherite is *1.3.2*. Netherite supports almost all of the DT and DF APIs. Some notable differences to the default Azure Table storage provider include: - Instance queries and purge requests are not issued directly against Azure Storage, but are processed by the function app. Thus, the performance (latency and throughput) of queries heavily depends on diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index 2f3bbb8..49464ad 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -26,7 +26,7 @@ 1 3 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs b/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs index b0e0c92..5c96ac0 100644 --- a/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs +++ b/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs @@ -56,6 +56,12 @@ namespace DurableTask.Netherite /// /// The partition id. IPartitionErrorHandler CreateErrorHandler(uint partitionId); + + /// + /// Trace a warning to the host logs + /// + /// + void TraceWarning(string message); } /// diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 5bcf854..a01c87f 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -26,7 +26,7 @@ 1 3 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index 3d27f3c..1f480b4 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -366,6 +366,11 @@ namespace DurableTask.Netherite (this.ContainerName, this.PathPrefix) = this.storage.GetTaskhubPathPrefix(this.TaskhubParameters); this.NumberPartitions = (uint) this.TaskhubParameters.PartitionCount; + if (this.Settings.PartitionCount != this.NumberPartitions) + { + this.TraceHelper.TraceWarning($"Ignoring configuration setting partitionCount={this.Settings.PartitionCount} because existing TaskHub has {this.NumberPartitions} partitions"); + } + await this.transport.StartClientAsync(); System.Diagnostics.Debug.Assert(this.client != null, "transport layer should have added client"); @@ -422,11 +427,6 @@ namespace DurableTask.Netherite await this.transport.StartWorkersAsync(); - if (this.Settings.PartitionCount != this.NumberPartitions) - { - this.TraceHelper.TraceWarning($"Ignoring configuration setting partitionCount={this.Settings.PartitionCount} because existing TaskHub has {this.NumberPartitions} partitions"); - } - if (this.threadWatcher == null) { this.threadWatcher = new Timer(this.WatchThreads, null, 0, 120000); @@ -557,6 +557,11 @@ namespace DurableTask.Netherite return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName); } + void TransportAbstraction.IHost.TraceWarning(string message) + { + this.TraceHelper.TraceWarning(message); + } + /******************************/ // client methods /******************************/ diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index 8323423..6c580b7 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -20,6 +20,7 @@ namespace DurableTask.Netherite.Faster using Azure.Storage.Blobs.Models; using System.Net; using System.Text; + using DurableTask.Netherite.Abstractions; /// /// Provides management of blobs and blob names associated with a partition, and logic for partition lease maintenance and termination. @@ -91,25 +92,26 @@ namespace DurableTask.Netherite.Faster public int? NumPagesToPreload; } - public FasterLogSettings GetDefaultEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters) => new FasterLogSettings + public FasterLogSettings GetEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters) { - LogDevice = this.EventLogDevice, - LogCommitManager = this.UseLocalFiles - ? null // TODO: fix this: new LocalLogCommitManager($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}\\{CommitBlobName}") - : (ILogCommitManager)this, - PageSizeBits = tuningParameters?.EventLogPageSizeBits ?? 21, // 2MB - SegmentSizeBits = tuningParameters?.EventLogSegmentSizeBits ?? - (useSeparatePageBlobStorage ? 35 // 32 GB - : 26), // 64 MB - MemorySizeBits = tuningParameters?.EventLogMemorySizeBits ?? 22, // 2MB - }; + (int pageSizeBits, int segmentSizeBits) = BlobManager.GetImmutableEventLogParameters(useSeparatePageBlobStorage, tuningParameters); - public LogSettings GetDefaultStoreLogSettings( - bool useSeparatePageBlobStorage, - long upperBoundOnAvailable, - FasterTuningParameters tuningParameters) + return new FasterLogSettings + { + PageSizeBits = pageSizeBits, + SegmentSizeBits = segmentSizeBits, + LogDevice = this.EventLogDevice, + LogCommitManager = this.UseLocalFiles + ? null // TODO: fix this: new LocalLogCommitManager($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}\\{CommitBlobName}") + : (ILogCommitManager)this, + MemorySizeBits = tuningParameters?.EventLogMemorySizeBits ?? 22, // 2MB + }; + } + + public LogSettings GetStoreLogSettings(bool useSeparatePageBlobStorage, long upperBoundOnAvailable, FasterTuningParameters tuningParameters) { - int pageSizeBits = tuningParameters?.StoreLogPageSizeBits ?? 10; // default page size is 1k + // start with page and segment sizes + (int pageSizeBits, int segmentSizeBits) = BlobManager.GetImmutableStoreLogParameters(tuningParameters); // compute a reasonable memory size for the log considering maximally available memory, and expansion factor int memorybits = 0; @@ -134,11 +136,9 @@ namespace DurableTask.Netherite.Faster { LogDevice = this.HybridLogDevice, ObjectLogDevice = this.ObjectLogDevice, - PageSizeBits = pageSizeBits, + PageSizeBits = pageSizeBits, MutableFraction = tuningParameters?.StoreLogMutableFraction ?? 0.9, - SegmentSizeBits = tuningParameters?.StoreLogSegmentSizeBits ?? - (useSeparatePageBlobStorage ? 35 // 32 GB - : 32), // 4 GB + SegmentSizeBits = segmentSizeBits, PreallocateLog = false, ReadFlags = ReadFlags.None, ReadCacheSettings = null, // no read cache @@ -146,6 +146,24 @@ namespace DurableTask.Netherite.Faster }; } + public static (int pageSizeBits, int segmentSizeBits) GetImmutableEventLogParameters(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters) + { + int pageSizeBits = tuningParameters?.EventLogPageSizeBits ?? 21; // 2MB + + int segmentSizeBits = tuningParameters?.EventLogSegmentSizeBits ?? + (useSeparatePageBlobStorage ? 35 // 32 GB + : 26); // 64 MB + + return (pageSizeBits, segmentSizeBits); + } + + public static (int pageSizeBits, int segmentSizeBits) GetImmutableStoreLogParameters(FasterTuningParameters tuningParameters) + { + int pageSizeBits = tuningParameters?.StoreLogPageSizeBits ?? 10; // 1kB + int segmentSizeBits = tuningParameters?.StoreLogSegmentSizeBits ?? 19; // 512 kB + + return (pageSizeBits, segmentSizeBits); + } static readonly int[] StorageFormatVersion = new int[] { 1, //initial version @@ -157,10 +175,21 @@ namespace DurableTask.Netherite.Faster public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings) { + var eventLogSettings = BlobManager.GetImmutableEventLogParameters(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters); + var storeLogSettings = BlobManager.GetImmutableStoreLogParameters(settings.FasterTuningParameters); + int[] pageAndSegmentSizes = new int[4] + { + eventLogSettings.pageSizeBits, + eventLogSettings.segmentSizeBits, + storeLogSettings.pageSizeBits, + storeLogSettings.segmentSizeBits + }; + return JsonConvert.SerializeObject(new StorageFormatSettings() { UseAlternateObjectStore = settings.UseAlternateObjectStore, FormatVersion = StorageFormatVersion.Last(), + PageAndSegmentSizes = pageAndSegmentSizes, }, serializerSettings); } @@ -177,6 +206,9 @@ namespace DurableTask.Netherite.Faster [JsonProperty("UseAlternateObjectStore", DefaultValueHandling = DefaultValueHandling.Ignore)] public bool? UseAlternateObjectStore { get; set; } + + [JsonProperty("PageAndSegmentSizes", DefaultValueHandling = DefaultValueHandling.Ignore)] + public int[] PageAndSegmentSizes { get; set; } } static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings() @@ -187,7 +219,7 @@ namespace DurableTask.Netherite.Faster Formatting = Formatting.None, }; - public static void CheckStorageFormat(string format, NetheriteOrchestrationServiceSettings settings) + public static void LoadAndCheckStorageFormat(string format, NetheriteOrchestrationServiceSettings settings, Action traceWarning) { try { @@ -201,6 +233,34 @@ namespace DurableTask.Netherite.Faster { throw new NetheriteConfigurationException($"The current storage format version (={StorageFormatVersion.Last()}) is incompatible with the existing taskhub (={taskhubFormat.FormatVersion})."); } + + // take the parameters we loaded from storage, and use them as a tuning parameter for FASTER + settings.FasterTuningParameters ??= new FasterTuningParameters(); + + Set(nameof(FasterTuningParameters.EventLogPageSizeBits), ref settings.FasterTuningParameters.EventLogPageSizeBits, taskhubFormat.PageAndSegmentSizes?[0], 21); + Set(nameof(FasterTuningParameters.EventLogSegmentSizeBits), ref settings.FasterTuningParameters.EventLogSegmentSizeBits, taskhubFormat.PageAndSegmentSizes?[1], 26); + Set(nameof(FasterTuningParameters.StoreLogPageSizeBits), ref settings.FasterTuningParameters.StoreLogPageSizeBits, taskhubFormat.PageAndSegmentSizes?[2], 10); + Set(nameof(FasterTuningParameters.StoreLogSegmentSizeBits), ref settings.FasterTuningParameters.StoreLogSegmentSizeBits, taskhubFormat.PageAndSegmentSizes?[3], 32); + + void Set(string name, ref int? specified, int? loaded, int oldDefault) + { + if (specified.HasValue) + { + if (loaded.HasValue && loaded.Value != specified.Value) + { + // we ignore the user-specified setting because this parameter cannot be changed after a task hub is created + traceWarning($"Ignoring configuration setting FasterTuningParameters.{name}={specified.Value} because existing TaskHub uses value {loaded.Value}."); + specified = loaded; + } + } + else + { + // we use the loaded value, but if there is none, for backward compatibility, use the default that + // was in place before we started storing this parameter inside the taskhubFormat record + specified = loaded ?? oldDefault; + } + } + } catch (Exception e) { diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 06dfad1..4b6b07a 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -86,7 +86,7 @@ namespace DurableTask.Netherite.Faster partition.ErrorHandler.Token.ThrowIfCancellationRequested(); - this.storelogsettings = blobManager.GetDefaultStoreLogSettings( + this.storelogsettings = blobManager.GetStoreLogSettings( partition.Settings.UseSeparatePageBlobStorage, memoryTracker.MaxCacheSize, partition.Settings.FasterTuningParameters); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs index 59ca14b..2094973 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs @@ -18,7 +18,7 @@ namespace DurableTask.Netherite.Faster public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings settings) { this.blobManager = blobManager; - var eventlogsettings = blobManager.GetDefaultEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters); + var eventlogsettings = blobManager.GetEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters); this.log = new FASTER.core.FasterLog(eventlogsettings); blobManager.PartitionErrorHandler.OnShutdown += this.Shutdown; this.terminationToken = blobManager.PartitionErrorHandler.Token; diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs index 1721441..eb6bca5 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs @@ -35,7 +35,7 @@ namespace DurableTask.Netherite.Faster this.traceHelper = traceHelper; this.intakeWorker = new IntakeWorker(cancellationToken, this, partition.TraceHelper); - this.maxFragmentSize = (int) this.blobManager.GetDefaultEventLogSettings(partition.Settings.UseSeparatePageBlobStorage, partition.Settings.FasterTuningParameters).PageSize - 64; // faster needs some room for header, 64 bytes is conservative + this.maxFragmentSize = (int) this.blobManager.GetEventLogSettings(partition.Settings.UseSeparatePageBlobStorage, partition.Settings.FasterTuningParameters).PageSize - 64; // faster needs some room for header, 64 bytes is conservative } public const byte first = 0x1; diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs index 87bc6ab..f447fde 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs @@ -99,8 +99,8 @@ namespace DurableTask.Netherite.EventHubsTransport this.cloudBlobContainer = cloudBlobClient.GetContainerReference(containerName); this.partitionScript = this.cloudBlobContainer.GetBlockBlobReference("partitionscript.json"); - // check that the storage format is supported - BlobManager.CheckStorageFormat(this.parameters.StorageFormat, this.settings); + // check that the storage format is supported, and load the relevant FASTER tuning parameters + BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning); this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub) { diff --git a/src/DurableTask.Netherite/Util/BatchTimer.cs b/src/DurableTask.Netherite/Util/BatchTimer.cs index 6af684c..7c8a04c 100644 --- a/src/DurableTask.Netherite/Util/BatchTimer.cs +++ b/src/DurableTask.Netherite/Util/BatchTimer.cs @@ -90,7 +90,7 @@ namespace DurableTask.Netherite while (!this.cancellationToken.IsCancellationRequested) { // wait for the next expiration time or cleanup, but cut the wait short if notified - if (this.RequiresDelay(out var delay, out var due)) + if (this.RequiresDelay(out int delay, out var due)) { var startWait = DateTime.UtcNow; this.notify.Wait(delay); // blocks thread until delay is over, or until notified @@ -140,13 +140,15 @@ namespace DurableTask.Netherite } } - bool RequiresDelay(out TimeSpan delay, out DateTime due) + static readonly TimeSpan MaxDelay = TimeSpan.FromDays(1); // we cap delays at 1 day, so we will never exceed int.MaxValue milliseconds + + bool RequiresDelay(out int delay, out DateTime due) { lock (this.thisLock) { if (this.schedule.Count == 0) { - delay = TimeSpan.FromMilliseconds(-1); // represents infinite delay + delay = -1; // represents infinite delay due = DateTime.MaxValue; return true; } @@ -157,7 +159,13 @@ namespace DurableTask.Netherite if (next.Key.due > now) { due = next.Key.due; - delay = due - now; + + if (due - now > MaxDelay) + { + due = now + MaxDelay; + } + + delay = (int) (due - now).TotalMilliseconds; return true; } else