This commit is contained in:
sebastianburckhardt 2023-03-09 14:22:34 -08:00
Родитель c13929a9f4 6d159c6d90
Коммит 6f869c7ec4
11 изменённых файлов: 118 добавлений и 38 удалений

Просмотреть файл

@ -7,6 +7,7 @@ Netherite is a distributed workflow execution engine for [Durable Functions](htt
It is of potential interest to anyone developing applications on those platforms who has an appetite for performance, scalability, and reliability.
As Netherite is intended to be a drop-in backend replacement, it does not modify the application API. Existing DF and DTFx applications can switch to this backend with little effort.
However, we do not support migrating existing task hub contents between different backends.
## Getting Started
@ -62,7 +63,7 @@ For some other considerations about how to choose the engine, see [the documenta
## Status
The current version of Netherite is *1.3.1*. Netherite supports almost all of the DT and DF APIs.
The current version of Netherite is *1.3.2*. Netherite supports almost all of the DT and DF APIs.
Some notable differences to the default Azure Table storage provider include:
- Instance queries and purge requests are not issued directly against Azure Storage, but are processed by the function app. Thus, the performance (latency and throughput) of queries heavily depends on

Просмотреть файл

@ -26,7 +26,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>

Просмотреть файл

@ -56,6 +56,12 @@ namespace DurableTask.Netherite
/// </summary>
/// <param name="partitionId">The partition id.</param>
IPartitionErrorHandler CreateErrorHandler(uint partitionId);
/// <summary>
/// Trace a warning to the host logs
/// </summary>
/// <param name="message"></param>
void TraceWarning(string message);
}
/// <summary>

Просмотреть файл

@ -26,7 +26,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>

Просмотреть файл

@ -366,6 +366,11 @@ namespace DurableTask.Netherite
(this.ContainerName, this.PathPrefix) = this.storage.GetTaskhubPathPrefix(this.TaskhubParameters);
this.NumberPartitions = (uint) this.TaskhubParameters.PartitionCount;
if (this.Settings.PartitionCount != this.NumberPartitions)
{
this.TraceHelper.TraceWarning($"Ignoring configuration setting partitionCount={this.Settings.PartitionCount} because existing TaskHub has {this.NumberPartitions} partitions");
}
await this.transport.StartClientAsync();
System.Diagnostics.Debug.Assert(this.client != null, "transport layer should have added client");
@ -422,11 +427,6 @@ namespace DurableTask.Netherite
await this.transport.StartWorkersAsync();
if (this.Settings.PartitionCount != this.NumberPartitions)
{
this.TraceHelper.TraceWarning($"Ignoring configuration setting partitionCount={this.Settings.PartitionCount} because existing TaskHub has {this.NumberPartitions} partitions");
}
if (this.threadWatcher == null)
{
this.threadWatcher = new Timer(this.WatchThreads, null, 0, 120000);
@ -557,6 +557,11 @@ namespace DurableTask.Netherite
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName);
}
void TransportAbstraction.IHost.TraceWarning(string message)
{
this.TraceHelper.TraceWarning(message);
}
/******************************/
// client methods
/******************************/

Просмотреть файл

@ -20,6 +20,7 @@ namespace DurableTask.Netherite.Faster
using Azure.Storage.Blobs.Models;
using System.Net;
using System.Text;
using DurableTask.Netherite.Abstractions;
/// <summary>
/// Provides management of blobs and blob names associated with a partition, and logic for partition lease maintenance and termination.
@ -91,25 +92,26 @@ namespace DurableTask.Netherite.Faster
public int? NumPagesToPreload;
}
public FasterLogSettings GetDefaultEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters) => new FasterLogSettings
public FasterLogSettings GetEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters)
{
(int pageSizeBits, int segmentSizeBits) = BlobManager.GetImmutableEventLogParameters(useSeparatePageBlobStorage, tuningParameters);
return new FasterLogSettings
{
PageSizeBits = pageSizeBits,
SegmentSizeBits = segmentSizeBits,
LogDevice = this.EventLogDevice,
LogCommitManager = this.UseLocalFiles
? null // TODO: fix this: new LocalLogCommitManager($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}\\{CommitBlobName}")
: (ILogCommitManager)this,
PageSizeBits = tuningParameters?.EventLogPageSizeBits ?? 21, // 2MB
SegmentSizeBits = tuningParameters?.EventLogSegmentSizeBits ??
(useSeparatePageBlobStorage ? 35 // 32 GB
: 26), // 64 MB
MemorySizeBits = tuningParameters?.EventLogMemorySizeBits ?? 22, // 2MB
};
}
public LogSettings GetDefaultStoreLogSettings(
bool useSeparatePageBlobStorage,
long upperBoundOnAvailable,
FasterTuningParameters tuningParameters)
public LogSettings GetStoreLogSettings(bool useSeparatePageBlobStorage, long upperBoundOnAvailable, FasterTuningParameters tuningParameters)
{
int pageSizeBits = tuningParameters?.StoreLogPageSizeBits ?? 10; // default page size is 1k
// start with page and segment sizes
(int pageSizeBits, int segmentSizeBits) = BlobManager.GetImmutableStoreLogParameters(tuningParameters);
// compute a reasonable memory size for the log considering maximally available memory, and expansion factor
int memorybits = 0;
@ -136,9 +138,7 @@ namespace DurableTask.Netherite.Faster
ObjectLogDevice = this.ObjectLogDevice,
PageSizeBits = pageSizeBits,
MutableFraction = tuningParameters?.StoreLogMutableFraction ?? 0.9,
SegmentSizeBits = tuningParameters?.StoreLogSegmentSizeBits ??
(useSeparatePageBlobStorage ? 35 // 32 GB
: 32), // 4 GB
SegmentSizeBits = segmentSizeBits,
PreallocateLog = false,
ReadFlags = ReadFlags.None,
ReadCacheSettings = null, // no read cache
@ -146,6 +146,24 @@ namespace DurableTask.Netherite.Faster
};
}
public static (int pageSizeBits, int segmentSizeBits) GetImmutableEventLogParameters(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters)
{
int pageSizeBits = tuningParameters?.EventLogPageSizeBits ?? 21; // 2MB
int segmentSizeBits = tuningParameters?.EventLogSegmentSizeBits ??
(useSeparatePageBlobStorage ? 35 // 32 GB
: 26); // 64 MB
return (pageSizeBits, segmentSizeBits);
}
public static (int pageSizeBits, int segmentSizeBits) GetImmutableStoreLogParameters(FasterTuningParameters tuningParameters)
{
int pageSizeBits = tuningParameters?.StoreLogPageSizeBits ?? 10; // 1kB
int segmentSizeBits = tuningParameters?.StoreLogSegmentSizeBits ?? 19; // 512 kB
return (pageSizeBits, segmentSizeBits);
}
static readonly int[] StorageFormatVersion = new int[] {
1, //initial version
@ -157,10 +175,21 @@ namespace DurableTask.Netherite.Faster
public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings)
{
var eventLogSettings = BlobManager.GetImmutableEventLogParameters(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
var storeLogSettings = BlobManager.GetImmutableStoreLogParameters(settings.FasterTuningParameters);
int[] pageAndSegmentSizes = new int[4]
{
eventLogSettings.pageSizeBits,
eventLogSettings.segmentSizeBits,
storeLogSettings.pageSizeBits,
storeLogSettings.segmentSizeBits
};
return JsonConvert.SerializeObject(new StorageFormatSettings()
{
UseAlternateObjectStore = settings.UseAlternateObjectStore,
FormatVersion = StorageFormatVersion.Last(),
PageAndSegmentSizes = pageAndSegmentSizes,
},
serializerSettings);
}
@ -177,6 +206,9 @@ namespace DurableTask.Netherite.Faster
[JsonProperty("UseAlternateObjectStore", DefaultValueHandling = DefaultValueHandling.Ignore)]
public bool? UseAlternateObjectStore { get; set; }
[JsonProperty("PageAndSegmentSizes", DefaultValueHandling = DefaultValueHandling.Ignore)]
public int[] PageAndSegmentSizes { get; set; }
}
static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
@ -187,7 +219,7 @@ namespace DurableTask.Netherite.Faster
Formatting = Formatting.None,
};
public static void CheckStorageFormat(string format, NetheriteOrchestrationServiceSettings settings)
public static void LoadAndCheckStorageFormat(string format, NetheriteOrchestrationServiceSettings settings, Action<string> traceWarning)
{
try
{
@ -201,6 +233,34 @@ namespace DurableTask.Netherite.Faster
{
throw new NetheriteConfigurationException($"The current storage format version (={StorageFormatVersion.Last()}) is incompatible with the existing taskhub (={taskhubFormat.FormatVersion}).");
}
// take the parameters we loaded from storage, and use them as a tuning parameter for FASTER
settings.FasterTuningParameters ??= new FasterTuningParameters();
Set(nameof(FasterTuningParameters.EventLogPageSizeBits), ref settings.FasterTuningParameters.EventLogPageSizeBits, taskhubFormat.PageAndSegmentSizes?[0], 21);
Set(nameof(FasterTuningParameters.EventLogSegmentSizeBits), ref settings.FasterTuningParameters.EventLogSegmentSizeBits, taskhubFormat.PageAndSegmentSizes?[1], 26);
Set(nameof(FasterTuningParameters.StoreLogPageSizeBits), ref settings.FasterTuningParameters.StoreLogPageSizeBits, taskhubFormat.PageAndSegmentSizes?[2], 10);
Set(nameof(FasterTuningParameters.StoreLogSegmentSizeBits), ref settings.FasterTuningParameters.StoreLogSegmentSizeBits, taskhubFormat.PageAndSegmentSizes?[3], 32);
void Set(string name, ref int? specified, int? loaded, int oldDefault)
{
if (specified.HasValue)
{
if (loaded.HasValue && loaded.Value != specified.Value)
{
// we ignore the user-specified setting because this parameter cannot be changed after a task hub is created
traceWarning($"Ignoring configuration setting FasterTuningParameters.{name}={specified.Value} because existing TaskHub uses value {loaded.Value}.");
specified = loaded;
}
}
else
{
// we use the loaded value, but if there is none, for backward compatibility, use the default that
// was in place before we started storing this parameter inside the taskhubFormat record
specified = loaded ?? oldDefault;
}
}
}
catch (Exception e)
{

Просмотреть файл

@ -86,7 +86,7 @@ namespace DurableTask.Netherite.Faster
partition.ErrorHandler.Token.ThrowIfCancellationRequested();
this.storelogsettings = blobManager.GetDefaultStoreLogSettings(
this.storelogsettings = blobManager.GetStoreLogSettings(
partition.Settings.UseSeparatePageBlobStorage,
memoryTracker.MaxCacheSize,
partition.Settings.FasterTuningParameters);

Просмотреть файл

@ -18,7 +18,7 @@ namespace DurableTask.Netherite.Faster
public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings settings)
{
this.blobManager = blobManager;
var eventlogsettings = blobManager.GetDefaultEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
var eventlogsettings = blobManager.GetEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
this.log = new FASTER.core.FasterLog(eventlogsettings);
blobManager.PartitionErrorHandler.OnShutdown += this.Shutdown;
this.terminationToken = blobManager.PartitionErrorHandler.Token;

Просмотреть файл

@ -35,7 +35,7 @@ namespace DurableTask.Netherite.Faster
this.traceHelper = traceHelper;
this.intakeWorker = new IntakeWorker(cancellationToken, this, partition.TraceHelper);
this.maxFragmentSize = (int) this.blobManager.GetDefaultEventLogSettings(partition.Settings.UseSeparatePageBlobStorage, partition.Settings.FasterTuningParameters).PageSize - 64; // faster needs some room for header, 64 bytes is conservative
this.maxFragmentSize = (int) this.blobManager.GetEventLogSettings(partition.Settings.UseSeparatePageBlobStorage, partition.Settings.FasterTuningParameters).PageSize - 64; // faster needs some room for header, 64 bytes is conservative
}
public const byte first = 0x1;

Просмотреть файл

@ -99,8 +99,8 @@ namespace DurableTask.Netherite.EventHubsTransport
this.cloudBlobContainer = cloudBlobClient.GetContainerReference(containerName);
this.partitionScript = this.cloudBlobContainer.GetBlockBlobReference("partitionscript.json");
// check that the storage format is supported
BlobManager.CheckStorageFormat(this.parameters.StorageFormat, this.settings);
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
{

Просмотреть файл

@ -90,7 +90,7 @@ namespace DurableTask.Netherite
while (!this.cancellationToken.IsCancellationRequested)
{
// wait for the next expiration time or cleanup, but cut the wait short if notified
if (this.RequiresDelay(out var delay, out var due))
if (this.RequiresDelay(out int delay, out var due))
{
var startWait = DateTime.UtcNow;
this.notify.Wait(delay); // blocks thread until delay is over, or until notified
@ -140,13 +140,15 @@ namespace DurableTask.Netherite
}
}
bool RequiresDelay(out TimeSpan delay, out DateTime due)
static readonly TimeSpan MaxDelay = TimeSpan.FromDays(1); // we cap delays at 1 day, so we will never exceed int.MaxValue milliseconds
bool RequiresDelay(out int delay, out DateTime due)
{
lock (this.thisLock)
{
if (this.schedule.Count == 0)
{
delay = TimeSpan.FromMilliseconds(-1); // represents infinite delay
delay = -1; // represents infinite delay
due = DateTime.MaxValue;
return true;
}
@ -157,7 +159,13 @@ namespace DurableTask.Netherite
if (next.Key.due > now)
{
due = next.Key.due;
delay = due - now;
if (due - now > MaxDelay)
{
due = now + MaxDelay;
}
delay = (int) (due - now).TotalMilliseconds;
return true;
}
else