diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs index f6f6d6e..56c5444 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs @@ -210,6 +210,12 @@ namespace DurableTask.Netherite.Faster internal void DetectHangs(object _) { + if (this.PartitionErrorHandler.IsTerminated) + { + this.hangCheckTimer.Dispose(); + return; + } + DateTime threshold = DateTime.UtcNow - (Debugger.IsAttached ? TimeSpan.FromMinutes(30) : this.limit); foreach (var kvp in this.pendingReadWriteOperations) @@ -236,23 +242,32 @@ namespace DurableTask.Netherite.Faster { if (this.pendingReadWriteOperations.TryRemove(id, out var request)) { - if (request.IsRead) + try { - this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Canceled)"); + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned (Cancelled) id={id}"); + request.Callback(uint.MaxValue, request.NumBytes, request.Context); + this.BlobManager?.StorageTracer?.FasterStorageProgress($"FasterCallbackCancellation Completed id={id}"); } - else + catch (Exception ex) { - this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Canceled)"); + this.BlobManager.StorageTracer?.FasterStorageError($"FasterCallbackCancellation Failed id={id}", ex); } - request.Callback(uint.MaxValue, request.NumBytes, request.Context); } } foreach (var id in this.pendingRemoveOperations.Keys.ToList()) { if (this.pendingRemoveOperations.TryRemove(id, out var request)) { - this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.RemoveSegmentAsync id={id} (Canceled)"); - request.Callback(request.Result); + try + { + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned (Cancelled) id={id}"); + request.Callback(request.Result); + this.BlobManager?.StorageTracer?.FasterStorageProgress($"FasterCallbackCancellation Completed id={id}"); + } + catch (Exception ex) + { + this.BlobManager.StorageTracer?.FasterStorageError($"FasterCallbackCancellation Failed id={id}", ex); + } } } } @@ -383,23 +398,9 @@ namespace DurableTask.Netherite.Faster throw exception; } - this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id) - .ContinueWith((Task t) => - { - if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) - { - if (t.IsFaulted) - { - this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)"); - request.Callback(uint.MaxValue, request.NumBytes, request.Context); - } - else - { - this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}"); - request.Callback(0, request.NumBytes, request.Context); - } - } - }, TaskContinuationOptions.ExecuteSynchronously); + // we are not awaiting this task because it uses FASTER's callback mechanism + // when the access is completed. + this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id); } /// @@ -437,7 +438,7 @@ namespace DurableTask.Netherite.Faster // Otherwise, some other thread beat us to it. Okay to use their blobs. blobEntry = this.blobs[segmentId]; } - this.TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id); + this.TryWriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id); } //---- The actual read and write accesses to the page blobs @@ -496,9 +497,9 @@ namespace DurableTask.Netherite.Faster } } - unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id) + unsafe void ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id) { - return this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id); + Task _ = this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id); } async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id) @@ -508,114 +509,164 @@ namespace DurableTask.Netherite.Faster string operationReadRange = $"[{readRangeStart}, {readRangeEnd}]"; using (stream) { - long offset = 0; - while (readLength > 0) + try { - var position = sourceAddress + offset; - var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE); + long offset = 0; + while (readLength > 0) + { + var position = sourceAddress + offset; + var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE); - await this.BlobManager.PerformWithRetriesAsync( - BlobManager.AsynchronousStorageReadMaxConcurrency, - true, - "PageBlobClient.DownloadStreamingAsync", - "ReadFromDevice", - $"id={id} position={position} length={length} operationReadRange={operationReadRange}", - blob.Default.Name, - 1000 + (int)length / 1000, - true, - failIfReadonly: false, - async (numAttempts) => - { - if (numAttempts > 0) + await this.BlobManager.PerformWithRetriesAsync( + BlobManager.AsynchronousStorageReadMaxConcurrency, + true, + "PageBlobClient.DownloadStreamingAsync", + "ReadFromDevice", + $"id={id} position={position} length={length} operationReadRange={operationReadRange}", + blob.Default.Name, + 1000 + (int)length / 1000, + true, + failIfReadonly: false, + async (numAttempts) => { - stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying - } - - if (length > 0) - { - var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive; - - var response = await client.DownloadStreamingAsync( - range: new Azure.HttpRange(sourceAddress + offset, length), - conditions: null, - rangeGetContentHash: false, - cancellationToken: this.PartitionErrorHandler.Token) - .ConfigureAwait(false); - - using (var streamingResult = response.Value) + if (numAttempts > 0) { - await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false); + stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying } - } - if (stream.Position != offset + length) - { - throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}"); - } + if (length > 0) + { + var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive; - return length; - }); + var response = await client.DownloadStreamingAsync( + range: new Azure.HttpRange(sourceAddress + offset, length), + conditions: null, + rangeGetContentHash: false, + cancellationToken: this.PartitionErrorHandler.Token) + .ConfigureAwait(false); - readLength -= length; - offset += length; + using (var streamingResult = response.Value) + { + await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false); + } + } + + if (stream.Position != offset + length) + { + throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}"); + } + + return length; + }); + + readLength -= length; + offset += length; + } + + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}"); + } + catch (Exception e) + { + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} ({e.GetType().Name})"); + + // the partition should have already been terminated if there was an exception but just in case this did not happen, terminate it now + if (!this.PartitionErrorHandler.IsTerminated) + { + this.PartitionErrorHandler.HandleError(nameof(WriteToBlobAsync), $"unexpected exception id={id}", e, true, false); + } + } + + if (!this.PartitionErrorHandler.IsTerminated && this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) + { + try + { + request.Callback(0, request.NumBytes, request.Context); + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}"); + } + catch (Exception e) + { + this.PartitionErrorHandler.HandleError(nameof(ReadFromBlobAsync), $"FASTER callback failed id={id}", e, true, this.PartitionErrorHandler.IsTerminated); + } } } + // this task is not awaited, so it must never throw exceptions. } - void TryWriteAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id) + + void TryWriteToBlob(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id) { // If pageBlob is null, it is being created. Attempt to queue the write for the creator to complete after it is done if (blobEntry.PageBlob.Default == null - && blobEntry.TryQueueAction(() => this.WriteToBlobAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id))) + && blobEntry.TryQueueAction(() => this.WriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id))) { return; } // Otherwise, invoke directly. - this.WriteToBlobAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id); + this.WriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id); } - unsafe void WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id) + unsafe void WriteToBlob(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id) { - this.WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id) - .ContinueWith((Task t) => - { - if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) - { - if (t.IsFaulted) - { - this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)"); - request.Callback(uint.MaxValue, request.NumBytes, request.Context); - } - else - { - this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}"); - request.Callback(0, request.NumBytes, request.Context); - } - } - - if (this.underLease) - { - this.SingleWriterSemaphore.Release(); - } - - }, TaskContinuationOptions.ExecuteSynchronously); + // we are not awaiting this task because it uses FASTER's callback mechanism + // when the access is completed. + Task _ = this.WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id); } async Task WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, long destinationAddress, uint numBytesToWrite, long id) { if (this.underLease) { + // this semaphore is needed to avoid ambiguous e-tags under concurrent writes await this.SingleWriterSemaphore.WaitAsync(); } - long offset = 0; - while (numBytesToWrite > 0) + try { - var length = Math.Min(numBytesToWrite, MAX_UPLOAD_SIZE); - await this.WritePortionToBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, offset, length, id).ConfigureAwait(false); - numBytesToWrite -= length; - offset += length; + long offset = 0; + while (numBytesToWrite > 0) + { + var length = Math.Min(numBytesToWrite, MAX_UPLOAD_SIZE); + await this.WritePortionToBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, offset, length, id).ConfigureAwait(false); + numBytesToWrite -= length; + offset += length; + } + + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}"); } + catch (Exception e) + { + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} ({e.GetType().Name})"); + + // the partition should have already been terminated if there was an exception but just in case this did not happen, terminate it now + if (!this.PartitionErrorHandler.IsTerminated) + { + this.PartitionErrorHandler.HandleError(nameof(WriteToBlobAsync), $"unexpected exception id={id}", e, true, false); + } + } + finally + { + if (this.underLease) + { + // always release this semaphore again + this.SingleWriterSemaphore.Release(); + } + } + + // now that the write to storage has completed, invoke the FASTER callback to signal completion + if (!this.PartitionErrorHandler.IsTerminated && this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) + { + try + { + request.Callback(0, request.NumBytes, request.Context); + this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}"); + } + catch (Exception e) + { + this.PartitionErrorHandler.HandleError(nameof(WriteToBlobAsync), $"FASTER callback failed id={id}", e, true, this.PartitionErrorHandler.IsTerminated); + } + } + + // this task is not awaited, so it must never throw exceptions. } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs index 7fc6505..91367c7 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs @@ -41,7 +41,7 @@ namespace DurableTask.Netherite.Faster { if (semaphore != null) { - await semaphore.WaitAsync(); + await semaphore.WaitAsync(this.PartitionErrorHandler.Token); } Stopwatch stopwatch = new Stopwatch();