tune Azure Storage accesses and FasterKV parameters.
This commit is contained in:
Родитель
beb0a6cf6b
Коммит
c30a121db4
|
@ -34,6 +34,7 @@ namespace DurableTask.Netherite.Faster
|
|||
// we use an even smaller value to improve retry/timeout behavior in highly contended situations
|
||||
// Also, this allows us to use aggressive timeouts to kill stragglers
|
||||
const uint MAX_UPLOAD_SIZE = 1024 * 1024;
|
||||
const uint MAX_DOWNLOAD_SIZE = 1024 * 1024;
|
||||
|
||||
const long MAX_PAGEBLOB_SIZE = 512L * 1024 * 1024 * 1024; // set this at 512 GB for now TODO consider implications
|
||||
|
||||
|
@ -319,53 +320,62 @@ namespace DurableTask.Netherite.Faster
|
|||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return (long) length;
|
||||
return (long)length;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
unsafe Task ReadFromBlobUnsafeAsync(CloudPageBlob blob, long sourceAddress, long destinationAddress, uint readLength)
|
||||
{
|
||||
return this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, destinationAddress, readLength);
|
||||
return this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength);
|
||||
}
|
||||
|
||||
async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, CloudPageBlob blob, long sourceAddress, long destinationAddress, uint readLength)
|
||||
async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, CloudPageBlob blob, long sourceAddress, uint readLength)
|
||||
{
|
||||
using (stream)
|
||||
{
|
||||
await this.BlobManager.PerformWithRetriesAsync(
|
||||
BlobManager.AsynchronousStorageReadMaxConcurrency,
|
||||
true,
|
||||
"CloudPageBlob.DownloadRangeToStreamAsync",
|
||||
"ReadFromDevice",
|
||||
$"readLength={readLength} sourceAddress={sourceAddress}",
|
||||
blob.Name,
|
||||
1000 + (int) readLength / 1000,
|
||||
true,
|
||||
async (numAttempts) =>
|
||||
{
|
||||
if (numAttempts > 0)
|
||||
long offset = 0;
|
||||
while (readLength > 0)
|
||||
{
|
||||
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);
|
||||
|
||||
await this.BlobManager.PerformWithRetriesAsync(
|
||||
BlobManager.AsynchronousStorageReadMaxConcurrency,
|
||||
true,
|
||||
"CloudPageBlob.DownloadRangeToStreamAsync",
|
||||
"ReadFromDevice",
|
||||
$"readLength={length} sourceAddress={sourceAddress + offset}",
|
||||
blob.Name,
|
||||
1000 + (int)length / 1000,
|
||||
true,
|
||||
async (numAttempts) =>
|
||||
{
|
||||
stream.Seek(0, SeekOrigin.Begin); // must go back to original position before retrying
|
||||
}
|
||||
if (numAttempts > 0)
|
||||
{
|
||||
stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying
|
||||
}
|
||||
|
||||
if (readLength > 0)
|
||||
{
|
||||
var blobRequestOptions = (numAttempts > 1 || readLength > MAX_UPLOAD_SIZE)
|
||||
? BlobManager.BlobRequestOptionsDefault : BlobManager.BlobRequestOptionsAggressiveTimeout;
|
||||
if (length > 0)
|
||||
{
|
||||
var blobRequestOptions = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE)
|
||||
? BlobManager.BlobRequestOptionsDefault : BlobManager.BlobRequestOptionsAggressiveTimeout;
|
||||
|
||||
await blob
|
||||
.DownloadRangeToStreamAsync(stream, sourceAddress, readLength, accessCondition: null, options: blobRequestOptions, operationContext: null, cancellationToken: this.PartitionErrorHandler.Token)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
await blob
|
||||
.DownloadRangeToStreamAsync(stream, sourceAddress + offset, length, accessCondition: null, options: blobRequestOptions, operationContext: null, cancellationToken: this.PartitionErrorHandler.Token)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (stream.Position != readLength)
|
||||
{
|
||||
throw new InvalidDataException($"wrong amount of data received from page blob, expected={readLength}, actual={stream.Position}");
|
||||
}
|
||||
if (stream.Position != offset + length)
|
||||
{
|
||||
throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}");
|
||||
}
|
||||
|
||||
return readLength;
|
||||
});
|
||||
return length;
|
||||
});
|
||||
|
||||
readLength -= length;
|
||||
offset += length;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -62,11 +62,14 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
public IPartitionErrorHandler PartitionErrorHandler { get; private set; }
|
||||
|
||||
internal static SemaphoreSlim AsynchronousStorageReadMaxConcurrency = new SemaphoreSlim(Environment.ProcessorCount * 25);
|
||||
internal static SemaphoreSlim AsynchronousStorageWriteMaxConcurrency = new SemaphoreSlim(Environment.ProcessorCount * 25);
|
||||
internal static SemaphoreSlim AsynchronousStorageReadMaxConcurrency = new SemaphoreSlim(Math.Min(100, Environment.ProcessorCount * 10));
|
||||
internal static SemaphoreSlim AsynchronousStorageWriteMaxConcurrency = new SemaphoreSlim(Math.Min(50, Environment.ProcessorCount * 7));
|
||||
|
||||
volatile System.Diagnostics.Stopwatch leaseTimer;
|
||||
|
||||
internal const long HashTableSize = 1L << 14; // 16 k buckets, 1 GB
|
||||
//internal const long HashTableSize = 1L << 14; // 8 M buckets, 512 GB
|
||||
|
||||
public FasterLogSettings EventLogSettings(bool usePremiumStorage) => new FasterLogSettings
|
||||
{
|
||||
LogDevice = this.EventLogDevice,
|
||||
|
|
|
@ -24,8 +24,6 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
ClientSession<Key, Value, EffectTracker, TrackedObject, object, IFunctions<Key, Value, EffectTracker, TrackedObject, object>> mainSession;
|
||||
|
||||
internal const long HashTableSize = 1L << 16;
|
||||
|
||||
#if FASTER_SUPPORTS_PSF
|
||||
// We currently place all PSFs into a single group with a single TPSFKey type
|
||||
internal const int PSFCount = 1;
|
||||
|
@ -42,7 +40,7 @@ namespace DurableTask.Netherite.Faster
|
|||
partition.ErrorHandler.Token.ThrowIfCancellationRequested();
|
||||
|
||||
this.fht = new FasterKV<Key, Value>(
|
||||
HashTableSize,
|
||||
BlobManager.HashTableSize,
|
||||
blobManager.StoreLogSettings(partition.Settings.UsePremiumStorage, partition.NumberPartitions()),
|
||||
blobManager.StoreCheckpointSettings,
|
||||
new SerializerSettings<Key, Value>
|
||||
|
|
|
@ -73,7 +73,7 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
|
||||
// Empirically observed transient cancellation exceptions that are not application initiated
|
||||
if (e.InnerException is TaskCanceledException && !token.IsCancellationRequested)
|
||||
if (e.InnerException is OperationCanceledException && !token.IsCancellationRequested)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче