Improve error handling for FASTER IO completion callbacks (#349)
* wrap FASTER callbacks so exceptions are logged, and release semaphores prior to callback. * address PR feedback, add two more lines of tracing, and use simpler trace message * refactor control path for FASTER callbacks. Eliminate callbacks with nonzero error code. * immediately cancel all requests waiting for the semaphore when partition is terminated * cancel faster callbacks prior to disposing FasterKV and FasterLog (to prevent hangs) * revert reorganization of cancellation since it appears to lead to issues with hanging dispose calls
This commit is contained in:
Родитель
a74178bc6b
Коммит
7e16a84d12
|
@ -210,6 +210,12 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
internal void DetectHangs(object _)
|
||||
{
|
||||
if (this.PartitionErrorHandler.IsTerminated)
|
||||
{
|
||||
this.hangCheckTimer.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
DateTime threshold = DateTime.UtcNow - (Debugger.IsAttached ? TimeSpan.FromMinutes(30) : this.limit);
|
||||
|
||||
foreach (var kvp in this.pendingReadWriteOperations)
|
||||
|
@ -236,23 +242,32 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
if (this.pendingReadWriteOperations.TryRemove(id, out var request))
|
||||
{
|
||||
if (request.IsRead)
|
||||
try
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Canceled)");
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned (Cancelled) id={id}");
|
||||
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"FasterCallbackCancellation Completed id={id}");
|
||||
}
|
||||
else
|
||||
catch (Exception ex)
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Canceled)");
|
||||
this.BlobManager.StorageTracer?.FasterStorageError($"FasterCallbackCancellation Failed id={id}", ex);
|
||||
}
|
||||
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
|
||||
}
|
||||
}
|
||||
foreach (var id in this.pendingRemoveOperations.Keys.ToList())
|
||||
{
|
||||
if (this.pendingRemoveOperations.TryRemove(id, out var request))
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.RemoveSegmentAsync id={id} (Canceled)");
|
||||
request.Callback(request.Result);
|
||||
try
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned (Cancelled) id={id}");
|
||||
request.Callback(request.Result);
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"FasterCallbackCancellation Completed id={id}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
this.BlobManager.StorageTracer?.FasterStorageError($"FasterCallbackCancellation Failed id={id}", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -383,23 +398,9 @@ namespace DurableTask.Netherite.Faster
|
|||
throw exception;
|
||||
}
|
||||
|
||||
this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id)
|
||||
.ContinueWith((Task t) =>
|
||||
{
|
||||
if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)");
|
||||
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}");
|
||||
request.Callback(0, request.NumBytes, request.Context);
|
||||
}
|
||||
}
|
||||
}, TaskContinuationOptions.ExecuteSynchronously);
|
||||
// we are not awaiting this task because it uses FASTER's callback mechanism
|
||||
// when the access is completed.
|
||||
this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -437,7 +438,7 @@ namespace DurableTask.Netherite.Faster
|
|||
// Otherwise, some other thread beat us to it. Okay to use their blobs.
|
||||
blobEntry = this.blobs[segmentId];
|
||||
}
|
||||
this.TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
|
||||
this.TryWriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
|
||||
}
|
||||
|
||||
//---- The actual read and write accesses to the page blobs
|
||||
|
@ -496,9 +497,9 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
}
|
||||
|
||||
unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id)
|
||||
unsafe void ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id)
|
||||
{
|
||||
return this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id);
|
||||
Task _ = this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id);
|
||||
}
|
||||
|
||||
async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id)
|
||||
|
@ -508,114 +509,164 @@ namespace DurableTask.Netherite.Faster
|
|||
string operationReadRange = $"[{readRangeStart}, {readRangeEnd}]";
|
||||
using (stream)
|
||||
{
|
||||
long offset = 0;
|
||||
while (readLength > 0)
|
||||
try
|
||||
{
|
||||
var position = sourceAddress + offset;
|
||||
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);
|
||||
long offset = 0;
|
||||
while (readLength > 0)
|
||||
{
|
||||
var position = sourceAddress + offset;
|
||||
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);
|
||||
|
||||
await this.BlobManager.PerformWithRetriesAsync(
|
||||
BlobManager.AsynchronousStorageReadMaxConcurrency,
|
||||
true,
|
||||
"PageBlobClient.DownloadStreamingAsync",
|
||||
"ReadFromDevice",
|
||||
$"id={id} position={position} length={length} operationReadRange={operationReadRange}",
|
||||
blob.Default.Name,
|
||||
1000 + (int)length / 1000,
|
||||
true,
|
||||
failIfReadonly: false,
|
||||
async (numAttempts) =>
|
||||
{
|
||||
if (numAttempts > 0)
|
||||
await this.BlobManager.PerformWithRetriesAsync(
|
||||
BlobManager.AsynchronousStorageReadMaxConcurrency,
|
||||
true,
|
||||
"PageBlobClient.DownloadStreamingAsync",
|
||||
"ReadFromDevice",
|
||||
$"id={id} position={position} length={length} operationReadRange={operationReadRange}",
|
||||
blob.Default.Name,
|
||||
1000 + (int)length / 1000,
|
||||
true,
|
||||
failIfReadonly: false,
|
||||
async (numAttempts) =>
|
||||
{
|
||||
stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying
|
||||
}
|
||||
|
||||
if (length > 0)
|
||||
{
|
||||
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
|
||||
|
||||
var response = await client.DownloadStreamingAsync(
|
||||
range: new Azure.HttpRange(sourceAddress + offset, length),
|
||||
conditions: null,
|
||||
rangeGetContentHash: false,
|
||||
cancellationToken: this.PartitionErrorHandler.Token)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
using (var streamingResult = response.Value)
|
||||
if (numAttempts > 0)
|
||||
{
|
||||
await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false);
|
||||
stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying
|
||||
}
|
||||
}
|
||||
|
||||
if (stream.Position != offset + length)
|
||||
{
|
||||
throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}");
|
||||
}
|
||||
if (length > 0)
|
||||
{
|
||||
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
|
||||
|
||||
return length;
|
||||
});
|
||||
var response = await client.DownloadStreamingAsync(
|
||||
range: new Azure.HttpRange(sourceAddress + offset, length),
|
||||
conditions: null,
|
||||
rangeGetContentHash: false,
|
||||
cancellationToken: this.PartitionErrorHandler.Token)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
readLength -= length;
|
||||
offset += length;
|
||||
using (var streamingResult = response.Value)
|
||||
{
|
||||
await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (stream.Position != offset + length)
|
||||
{
|
||||
throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}");
|
||||
}
|
||||
|
||||
return length;
|
||||
});
|
||||
|
||||
readLength -= length;
|
||||
offset += length;
|
||||
}
|
||||
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} ({e.GetType().Name})");
|
||||
|
||||
// the partition should have already been terminated if there was an exception but just in case this did not happen, terminate it now
|
||||
if (!this.PartitionErrorHandler.IsTerminated)
|
||||
{
|
||||
this.PartitionErrorHandler.HandleError(nameof(WriteToBlobAsync), $"unexpected exception id={id}", e, true, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.PartitionErrorHandler.IsTerminated && this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
|
||||
{
|
||||
try
|
||||
{
|
||||
request.Callback(0, request.NumBytes, request.Context);
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.PartitionErrorHandler.HandleError(nameof(ReadFromBlobAsync), $"FASTER callback failed id={id}", e, true, this.PartitionErrorHandler.IsTerminated);
|
||||
}
|
||||
}
|
||||
}
|
||||
// this task is not awaited, so it must never throw exceptions.
|
||||
}
|
||||
|
||||
void TryWriteAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)
|
||||
|
||||
void TryWriteToBlob(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)
|
||||
{
|
||||
// If pageBlob is null, it is being created. Attempt to queue the write for the creator to complete after it is done
|
||||
if (blobEntry.PageBlob.Default == null
|
||||
&& blobEntry.TryQueueAction(() => this.WriteToBlobAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id)))
|
||||
&& blobEntry.TryQueueAction(() => this.WriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id)))
|
||||
{
|
||||
return;
|
||||
}
|
||||
// Otherwise, invoke directly.
|
||||
this.WriteToBlobAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
|
||||
this.WriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
|
||||
}
|
||||
|
||||
unsafe void WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)
|
||||
unsafe void WriteToBlob(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)
|
||||
{
|
||||
this.WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id)
|
||||
.ContinueWith((Task t) =>
|
||||
{
|
||||
if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)");
|
||||
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}");
|
||||
request.Callback(0, request.NumBytes, request.Context);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.underLease)
|
||||
{
|
||||
this.SingleWriterSemaphore.Release();
|
||||
}
|
||||
|
||||
}, TaskContinuationOptions.ExecuteSynchronously);
|
||||
// we are not awaiting this task because it uses FASTER's callback mechanism
|
||||
// when the access is completed.
|
||||
Task _ = this.WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id);
|
||||
}
|
||||
|
||||
async Task WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, long destinationAddress, uint numBytesToWrite, long id)
|
||||
{
|
||||
if (this.underLease)
|
||||
{
|
||||
// this semaphore is needed to avoid ambiguous e-tags under concurrent writes
|
||||
await this.SingleWriterSemaphore.WaitAsync();
|
||||
}
|
||||
|
||||
long offset = 0;
|
||||
while (numBytesToWrite > 0)
|
||||
try
|
||||
{
|
||||
var length = Math.Min(numBytesToWrite, MAX_UPLOAD_SIZE);
|
||||
await this.WritePortionToBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, offset, length, id).ConfigureAwait(false);
|
||||
numBytesToWrite -= length;
|
||||
offset += length;
|
||||
long offset = 0;
|
||||
while (numBytesToWrite > 0)
|
||||
{
|
||||
var length = Math.Min(numBytesToWrite, MAX_UPLOAD_SIZE);
|
||||
await this.WritePortionToBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, offset, length, id).ConfigureAwait(false);
|
||||
numBytesToWrite -= length;
|
||||
offset += length;
|
||||
}
|
||||
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} ({e.GetType().Name})");
|
||||
|
||||
// the partition should have already been terminated if there was an exception but just in case this did not happen, terminate it now
|
||||
if (!this.PartitionErrorHandler.IsTerminated)
|
||||
{
|
||||
this.PartitionErrorHandler.HandleError(nameof(WriteToBlobAsync), $"unexpected exception id={id}", e, true, false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (this.underLease)
|
||||
{
|
||||
// always release this semaphore again
|
||||
this.SingleWriterSemaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
// now that the write to storage has completed, invoke the FASTER callback to signal completion
|
||||
if (!this.PartitionErrorHandler.IsTerminated && this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
|
||||
{
|
||||
try
|
||||
{
|
||||
request.Callback(0, request.NumBytes, request.Context);
|
||||
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
this.PartitionErrorHandler.HandleError(nameof(WriteToBlobAsync), $"FASTER callback failed id={id}", e, true, this.PartitionErrorHandler.IsTerminated);
|
||||
}
|
||||
}
|
||||
|
||||
// this task is not awaited, so it must never throw exceptions.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
if (semaphore != null)
|
||||
{
|
||||
await semaphore.WaitAsync();
|
||||
await semaphore.WaitAsync(this.PartitionErrorHandler.Token);
|
||||
}
|
||||
|
||||
Stopwatch stopwatch = new Stopwatch();
|
||||
|
|
Загрузка…
Ссылка в новой задаче