Overhaul FASTER page and segment size parameters (#230)

* record FASTER page and segment sizes in task hubs, and change default for hybrid log segment size

* address PR feedback

* move partition count warning to an earlier location (so it is generated even if using client only)
This commit is contained in:
Sebastian Burckhardt 2023-03-09 14:17:57 -08:00 коммит произвёл GitHub
Родитель 8c6afeb3e4
Коммит 1bc736b896
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 102 добавлений и 31 удалений

Просмотреть файл

@ -56,6 +56,12 @@ namespace DurableTask.Netherite
/// </summary>
/// <param name="partitionId">The partition id.</param>
IPartitionErrorHandler CreateErrorHandler(uint partitionId);
/// <summary>
/// Trace a warning to the host logs
/// </summary>
/// <param name="message"></param>
void TraceWarning(string message);
}
/// <summary>

Просмотреть файл

@ -366,6 +366,11 @@ namespace DurableTask.Netherite
(this.ContainerName, this.PathPrefix) = this.storage.GetTaskhubPathPrefix(this.TaskhubParameters);
this.NumberPartitions = (uint) this.TaskhubParameters.PartitionCount;
if (this.Settings.PartitionCount != this.NumberPartitions)
{
this.TraceHelper.TraceWarning($"Ignoring configuration setting partitionCount={this.Settings.PartitionCount} because existing TaskHub has {this.NumberPartitions} partitions");
}
await this.transport.StartClientAsync();
System.Diagnostics.Debug.Assert(this.client != null, "transport layer should have added client");
@ -422,11 +427,6 @@ namespace DurableTask.Netherite
await this.transport.StartWorkersAsync();
if (this.Settings.PartitionCount != this.NumberPartitions)
{
this.TraceHelper.TraceWarning($"Ignoring configuration setting partitionCount={this.Settings.PartitionCount} because existing TaskHub has {this.NumberPartitions} partitions");
}
if (this.threadWatcher == null)
{
this.threadWatcher = new Timer(this.WatchThreads, null, 0, 120000);
@ -557,6 +557,11 @@ namespace DurableTask.Netherite
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName);
}
void TransportAbstraction.IHost.TraceWarning(string message)
{
this.TraceHelper.TraceWarning(message);
}
/******************************/
// client methods
/******************************/

Просмотреть файл

@ -20,6 +20,7 @@ namespace DurableTask.Netherite.Faster
using Azure.Storage.Blobs.Models;
using System.Net;
using System.Text;
using DurableTask.Netherite.Abstractions;
/// <summary>
/// Provides management of blobs and blob names associated with a partition, and logic for partition lease maintenance and termination.
@ -91,25 +92,26 @@ namespace DurableTask.Netherite.Faster
public int? NumPagesToPreload;
}
public FasterLogSettings GetDefaultEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters) => new FasterLogSettings
public FasterLogSettings GetEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters)
{
LogDevice = this.EventLogDevice,
LogCommitManager = this.UseLocalFiles
? null // TODO: fix this: new LocalLogCommitManager($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}\\{CommitBlobName}")
: (ILogCommitManager)this,
PageSizeBits = tuningParameters?.EventLogPageSizeBits ?? 21, // 2MB
SegmentSizeBits = tuningParameters?.EventLogSegmentSizeBits ??
(useSeparatePageBlobStorage ? 35 // 32 GB
: 26), // 64 MB
MemorySizeBits = tuningParameters?.EventLogMemorySizeBits ?? 22, // 2MB
};
(int pageSizeBits, int segmentSizeBits) = BlobManager.GetImmutableEventLogParameters(useSeparatePageBlobStorage, tuningParameters);
public LogSettings GetDefaultStoreLogSettings(
bool useSeparatePageBlobStorage,
long upperBoundOnAvailable,
FasterTuningParameters tuningParameters)
return new FasterLogSettings
{
PageSizeBits = pageSizeBits,
SegmentSizeBits = segmentSizeBits,
LogDevice = this.EventLogDevice,
LogCommitManager = this.UseLocalFiles
? null // TODO: fix this: new LocalLogCommitManager($"{this.LocalDirectoryPath}\\{this.PartitionFolderName}\\{CommitBlobName}")
: (ILogCommitManager)this,
MemorySizeBits = tuningParameters?.EventLogMemorySizeBits ?? 22, // 2MB
};
}
public LogSettings GetStoreLogSettings(bool useSeparatePageBlobStorage, long upperBoundOnAvailable, FasterTuningParameters tuningParameters)
{
int pageSizeBits = tuningParameters?.StoreLogPageSizeBits ?? 10; // default page size is 1k
// start with page and segment sizes
(int pageSizeBits, int segmentSizeBits) = BlobManager.GetImmutableStoreLogParameters(tuningParameters);
// compute a reasonable memory size for the log considering maximally available memory, and expansion factor
int memorybits = 0;
@ -134,11 +136,9 @@ namespace DurableTask.Netherite.Faster
{
LogDevice = this.HybridLogDevice,
ObjectLogDevice = this.ObjectLogDevice,
PageSizeBits = pageSizeBits,
PageSizeBits = pageSizeBits,
MutableFraction = tuningParameters?.StoreLogMutableFraction ?? 0.9,
SegmentSizeBits = tuningParameters?.StoreLogSegmentSizeBits ??
(useSeparatePageBlobStorage ? 35 // 32 GB
: 32), // 4 GB
SegmentSizeBits = segmentSizeBits,
PreallocateLog = false,
ReadFlags = ReadFlags.None,
ReadCacheSettings = null, // no read cache
@ -146,6 +146,24 @@ namespace DurableTask.Netherite.Faster
};
}
public static (int pageSizeBits, int segmentSizeBits) GetImmutableEventLogParameters(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters)
{
int pageSizeBits = tuningParameters?.EventLogPageSizeBits ?? 21; // 2MB
int segmentSizeBits = tuningParameters?.EventLogSegmentSizeBits ??
(useSeparatePageBlobStorage ? 35 // 32 GB
: 26); // 64 MB
return (pageSizeBits, segmentSizeBits);
}
public static (int pageSizeBits, int segmentSizeBits) GetImmutableStoreLogParameters(FasterTuningParameters tuningParameters)
{
int pageSizeBits = tuningParameters?.StoreLogPageSizeBits ?? 10; // 1kB
int segmentSizeBits = tuningParameters?.StoreLogSegmentSizeBits ?? 19; // 512 kB
return (pageSizeBits, segmentSizeBits);
}
static readonly int[] StorageFormatVersion = new int[] {
1, //initial version
@ -157,10 +175,21 @@ namespace DurableTask.Netherite.Faster
public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings)
{
var eventLogSettings = BlobManager.GetImmutableEventLogParameters(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
var storeLogSettings = BlobManager.GetImmutableStoreLogParameters(settings.FasterTuningParameters);
int[] pageAndSegmentSizes = new int[4]
{
eventLogSettings.pageSizeBits,
eventLogSettings.segmentSizeBits,
storeLogSettings.pageSizeBits,
storeLogSettings.segmentSizeBits
};
return JsonConvert.SerializeObject(new StorageFormatSettings()
{
UseAlternateObjectStore = settings.UseAlternateObjectStore,
FormatVersion = StorageFormatVersion.Last(),
PageAndSegmentSizes = pageAndSegmentSizes,
},
serializerSettings);
}
@ -177,6 +206,9 @@ namespace DurableTask.Netherite.Faster
[JsonProperty("UseAlternateObjectStore", DefaultValueHandling = DefaultValueHandling.Ignore)]
public bool? UseAlternateObjectStore { get; set; }
[JsonProperty("PageAndSegmentSizes", DefaultValueHandling = DefaultValueHandling.Ignore)]
public int[] PageAndSegmentSizes { get; set; }
}
static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
@ -187,7 +219,7 @@ namespace DurableTask.Netherite.Faster
Formatting = Formatting.None,
};
public static void CheckStorageFormat(string format, NetheriteOrchestrationServiceSettings settings)
public static void LoadAndCheckStorageFormat(string format, NetheriteOrchestrationServiceSettings settings, Action<string> traceWarning)
{
try
{
@ -201,6 +233,34 @@ namespace DurableTask.Netherite.Faster
{
throw new NetheriteConfigurationException($"The current storage format version (={StorageFormatVersion.Last()}) is incompatible with the existing taskhub (={taskhubFormat.FormatVersion}).");
}
// take the parameters we loaded from storage, and use them as a tuning parameter for FASTER
settings.FasterTuningParameters ??= new FasterTuningParameters();
Set(nameof(FasterTuningParameters.EventLogPageSizeBits), ref settings.FasterTuningParameters.EventLogPageSizeBits, taskhubFormat.PageAndSegmentSizes?[0], 21);
Set(nameof(FasterTuningParameters.EventLogSegmentSizeBits), ref settings.FasterTuningParameters.EventLogSegmentSizeBits, taskhubFormat.PageAndSegmentSizes?[1], 26);
Set(nameof(FasterTuningParameters.StoreLogPageSizeBits), ref settings.FasterTuningParameters.StoreLogPageSizeBits, taskhubFormat.PageAndSegmentSizes?[2], 10);
Set(nameof(FasterTuningParameters.StoreLogSegmentSizeBits), ref settings.FasterTuningParameters.StoreLogSegmentSizeBits, taskhubFormat.PageAndSegmentSizes?[3], 32);
void Set(string name, ref int? specified, int? loaded, int oldDefault)
{
if (specified.HasValue)
{
if (loaded.HasValue && loaded.Value != specified.Value)
{
// we ignore the user-specified setting because this parameter cannot be changed after a task hub is created
traceWarning($"Ignoring configuration setting FasterTuningParameters.{name}={specified.Value} because existing TaskHub uses value {loaded.Value}.");
specified = loaded;
}
}
else
{
// we use the loaded value, but if there is none, for backward compatibility, use the default that
// was in place before we started storing this parameter inside the taskhubFormat record
specified = loaded ?? oldDefault;
}
}
}
catch (Exception e)
{

Просмотреть файл

@ -86,7 +86,7 @@ namespace DurableTask.Netherite.Faster
partition.ErrorHandler.Token.ThrowIfCancellationRequested();
this.storelogsettings = blobManager.GetDefaultStoreLogSettings(
this.storelogsettings = blobManager.GetStoreLogSettings(
partition.Settings.UseSeparatePageBlobStorage,
memoryTracker.MaxCacheSize,
partition.Settings.FasterTuningParameters);

Просмотреть файл

@ -18,7 +18,7 @@ namespace DurableTask.Netherite.Faster
public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings settings)
{
this.blobManager = blobManager;
var eventlogsettings = blobManager.GetDefaultEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
var eventlogsettings = blobManager.GetEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
this.log = new FASTER.core.FasterLog(eventlogsettings);
blobManager.PartitionErrorHandler.OnShutdown += this.Shutdown;
this.terminationToken = blobManager.PartitionErrorHandler.Token;

Просмотреть файл

@ -35,7 +35,7 @@ namespace DurableTask.Netherite.Faster
this.traceHelper = traceHelper;
this.intakeWorker = new IntakeWorker(cancellationToken, this, partition.TraceHelper);
this.maxFragmentSize = (int) this.blobManager.GetDefaultEventLogSettings(partition.Settings.UseSeparatePageBlobStorage, partition.Settings.FasterTuningParameters).PageSize - 64; // faster needs some room for header, 64 bytes is conservative
this.maxFragmentSize = (int) this.blobManager.GetEventLogSettings(partition.Settings.UseSeparatePageBlobStorage, partition.Settings.FasterTuningParameters).PageSize - 64; // faster needs some room for header, 64 bytes is conservative
}
public const byte first = 0x1;

Просмотреть файл

@ -99,8 +99,8 @@ namespace DurableTask.Netherite.EventHubsTransport
this.cloudBlobContainer = cloudBlobClient.GetContainerReference(containerName);
this.partitionScript = this.cloudBlobContainer.GetBlockBlobReference("partitionscript.json");
// check that the storage format is supported
BlobManager.CheckStorageFormat(this.parameters.StorageFormat, this.settings);
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
{