This commit is contained in:
sebastianburckhardt 2023-07-07 12:59:46 -07:00
Родитель 21d7b5a18c ee47f0c9f8
Коммит 0b8291a105
45 изменённых файлов: 1136 добавлений и 438 удалений

Просмотреть файл

@ -63,7 +63,7 @@ For some other considerations about how to choose the engine, see [the documenta
## Status
The current version of Netherite is *1.3.5*. Netherite supports almost all of the DT and DF APIs.
The current version of Netherite is *1.4.0*. Netherite supports almost all of the DT and DF APIs.
Some notable differences to the default Azure Table storage provider include:
- Instance queries and purge requests are not issued directly against Azure Storage, but are processed by the function app. Thus, the performance (latency and throughput) of queries heavily depends on

Просмотреть файл

@ -2,7 +2,7 @@
<PropertyGroup>
<!--The target netcoreapp2.2 is not functional, but just generates runtime error when used.-->
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp2.2;netcoreapp3.1;net5.0</TargetFrameworks>
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp2.2;netcoreapp3.1</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<IncludeSymbols>true</IncludeSymbols>
@ -25,8 +25,8 @@
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>5</PatchVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
@ -51,8 +51,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.11.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.8.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.13.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.9.6" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' != 'netcoreapp2.2' ">

Просмотреть файл

@ -49,7 +49,7 @@ namespace DurableTask.Netherite
public abstract ValueTask RemoveFromStore(IEnumerable<TrackedObjectKey> keys);
public abstract (long, long) GetPositions();
public abstract (long, (long,int)) GetPositions();
public abstract Partition Partition { get; }
@ -187,7 +187,7 @@ namespace DurableTask.Netherite
public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key, TrackedObject target)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
(long commitLogPosition, (long,int) inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying, "read events are not part of the replay");
double startedTimestamp = this.CurrentTimeMs;
@ -254,7 +254,7 @@ namespace DurableTask.Netherite
public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<(string, OrchestrationState)> instances, DateTime attempt)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
(long commitLogPosition, (long,int) inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying, "query events are never part of the replay");
double startedTimestamp = this.CurrentTimeMs;

Просмотреть файл

@ -21,7 +21,7 @@ namespace DurableTask.Netherite
/// <param name="inputQueueFingerprint">A fingerprint for the input queue.</param>
/// <returns>the input queue position from which to resume input processing</returns>
/// <exception cref="OperationCanceledException">Indicates that termination was signaled before the operation completed.</exception>
Task<long> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint);
Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint);
/// <summary>
/// Starts processing, after creating or restoring the partition state.

Просмотреть файл

@ -62,6 +62,11 @@ namespace DurableTask.Netherite
/// </summary>
/// <param name="message"></param>
void TraceWarning(string message);
/// <summary>
/// Called when some component observed a fatal exception. Host may take action to initiate a fast shutdown.
/// </summary>
void OnFatalExceptionObserved(Exception e);
}
/// <summary>
@ -85,7 +90,7 @@ namespace DurableTask.Netherite
/// Also, it can be used to detect that the partition has terminated for any other reason,
/// be it cleanly (after StopAsync) or uncleanly (after losing a lease or hitting a fatal error).
/// </remarks>
Task<long> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint);
Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint);
/// <summary>
/// Clean shutdown: stop processing, save partition state to storage, and release ownership.

Просмотреть файл

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp3.1;net5.0</TargetFrameworks>
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp3.1</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<IncludeSymbols>true</IncludeSymbols>
@ -25,8 +25,8 @@
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>5</PatchVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
@ -50,14 +50,14 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.25.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.6.1" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.7.2" />
<PackageReference Include="Azure.Core" Version="1.33.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.8.0" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.13.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.16" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.11.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.13.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />

Просмотреть файл

@ -3,6 +3,7 @@
namespace DurableTask.Netherite
{
using Azure.Storage.Blobs.Models;
using DurableTask.Core.Common;
using DurableTask.Core.Exceptions;
using Newtonsoft.Json;
@ -18,49 +19,114 @@ namespace DurableTask.Netherite
/// </summary>
static class Packet
{
// we prefix packets with a byte indicating the version, to facilitate format changes in the future
static readonly byte version = 2;
// we prefix packets with a byte indicating the packet type and whether it contains a guid
// (we can also use this for version changes over time)
static readonly byte eventWithGuid = 2;
static readonly byte batchWithGuid = 3;
static readonly byte eventWithoutGuid = 4;
public static void Serialize(Event evt, Stream stream, byte[] taskHubGuid)
public static void Serialize(Event evt, Stream stream, byte[] guid)
{
var writer = new BinaryWriter(stream, Encoding.UTF8);
// first come the version and the taskhub
writer.Write(Packet.version);
writer.Write(taskHubGuid);
// first write the packet type and the taskhub
writer.Write(Packet.eventWithGuid);
writer.Write(guid);
writer.Flush();
// then we write the binary serialization to the stream
Serializer.SerializeEvent(evt, stream);
}
public static void Deserialize<TEvent>(Stream stream, out TEvent evt, byte[] taskHubGuid) where TEvent : Event
public static void Serialize(Event evt, Stream stream)
{
var writer = new BinaryWriter(stream, Encoding.UTF8);
// first write the packet type and the taskhub
writer.Write(Packet.eventWithoutGuid);
writer.Flush();
// then we write the binary serialization to the stream
Serializer.SerializeEvent(evt, stream);
}
public static void Serialize(string blobAddress, List<int> packetOffsets, Stream stream, byte[] guid)
{
var writer = new BinaryWriter(stream, Encoding.UTF8);
// first write the packet type and the taskhub
writer.Write(Packet.batchWithGuid);
writer.Write(guid);
// then write the blob Address and the positions
writer.Write(blobAddress);
writer.Write(packetOffsets.Count);
foreach(var p in packetOffsets)
{
writer.Write(p);
}
writer.Flush();
}
public class BlobReference
{
public string BlobName;
public List<int> PacketOffsets;
}
public static void Deserialize<TEvent>(Stream stream, out TEvent evt, out BlobReference blobReference, byte[] guid) where TEvent : Event
{
var reader = new BinaryReader(stream);
var version = reader.ReadByte();
var destinationTaskHubGuid = reader.ReadBytes(16);
var packetType = reader.ReadByte();
evt = null;
blobReference = null;
if (taskHubGuid != null && !GuidMatches(taskHubGuid, destinationTaskHubGuid))
if (packetType == Packet.eventWithGuid)
{
evt = null;
return;
byte[] destinationTaskHubId = reader.ReadBytes(16);
if (guid != null && !GuidMatches(guid, destinationTaskHubId))
{
return;
}
evt = (TEvent)Serializer.DeserializeEvent(stream);
}
if (version == Packet.version)
else if (packetType == Packet.batchWithGuid)
{
byte[] destinationTaskHubId = reader.ReadBytes(16);
if (guid != null && !GuidMatches(guid, destinationTaskHubId))
{
return;
}
string blobName = reader.ReadString();
int numEvents = reader.ReadInt32();
List<int> packetOffsets = new List<int>(numEvents);
for (int i = 0; i < numEvents; i++)
{
packetOffsets.Add(reader.ReadInt32());
}
blobReference = new BlobReference()
{
BlobName = blobName,
PacketOffsets = packetOffsets
};
}
else if (packetType == Packet.eventWithoutGuid)
{
evt = (TEvent)Serializer.DeserializeEvent(stream);
}
else
{
throw new VersionNotFoundException($"Received packet with unsupported version {version} - likely a versioning issue");
}
throw new VersionNotFoundException($"Received packet with unsupported packet type {packetType} - likely a versioning issue");
}
}
public static void Deserialize<TEvent>(ArraySegment<byte> arraySegment, out TEvent evt, byte[] taskHubGuid) where TEvent : Event
public static void Deserialize<TEvent>(ArraySegment<byte> arraySegment, out TEvent evt, out BlobReference blobReference, byte[] taskHubGuid) where TEvent : Event
{
using (var stream = new MemoryStream(arraySegment.Array, arraySegment.Offset, arraySegment.Count, false))
{
Packet.Deserialize(stream, out evt, taskHubGuid);
Packet.Deserialize(stream, out evt, out blobReference, taskHubGuid);
}
}

Просмотреть файл

@ -21,6 +21,15 @@ namespace DurableTask.Netherite
[DataMember]
public long NextInputQueuePosition { get; set; }
/// <summary>
/// For events coming from batches in the input queue, the batch position.
/// </summary>
[DataMember(EmitDefaultValue = false)]
public int NextInputQueueBatchPosition { get; set; }
[IgnoreDataMember]
public (long,int) NextInputQueuePositionTuple => (this.NextInputQueuePosition, this.NextInputQueueBatchPosition);
[IgnoreDataMember]
public double ReceivedTimestamp { get; set; }

Просмотреть файл

@ -80,8 +80,6 @@ namespace DurableTask.Netherite
// cancel the token, if not already cancelled.
this.cts.Cancel();
await this.ResponseTimeouts.StopAsync();
// We now enter the final stage of client shutdown, where we forcefully cancel
// all requests that have not completed yet.
this.allRemainingRequestsAreNowBeingCancelled = true;
@ -98,6 +96,8 @@ namespace DurableTask.Netherite
}
}
await this.ResponseTimeouts.StopAsync();
this.cts.Dispose();
this.traceHelper.TraceProgress("Stopped");

Просмотреть файл

@ -472,7 +472,7 @@ namespace DurableTask.Netherite
this.serviceShutdownSource.Dispose();
this.serviceShutdownSource = null;
await this.transport.StopAsync();
await this.transport.StopAsync(fatalExceptionObserved: false);
this.ActivityWorkItemQueue.Dispose();
this.OrchestrationWorkItemQueue.Dispose();
@ -551,7 +551,7 @@ namespace DurableTask.Netherite
IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId)
{
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName);
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this);
}
void TransportAbstraction.IHost.TraceWarning(string message)
@ -559,6 +559,25 @@ namespace DurableTask.Netherite
this.TraceHelper.TraceWarning(message);
}
void TransportAbstraction.IHost.OnFatalExceptionObserved(Exception e)
{
if (this.Settings.EmergencyShutdownOnFatalExceptions)
{
Task.Run(async() =>
{
this.TraceHelper.TraceError($"OrchestrationService is initiating an emergency shutdown due to a fatal {e.GetType().FullName}", e);
// try to stop the transport as quickly as possible, and don't wait longer than 30 seconds
await Task.WhenAny(this.transport.StopAsync(fatalExceptionObserved: true), Task.Delay(TimeSpan.FromSeconds(30)));
this.TraceHelper.TraceWarning($"OrchestrationService is killing process in 10 seconds");
await Task.Delay(TimeSpan.FromSeconds(10));
System.Environment.Exit(333);
});
}
}
/******************************/
// client methods
/******************************/

Просмотреть файл

@ -150,6 +150,12 @@ namespace DurableTask.Netherite
/// </summary>
public bool KeepInstanceIdsInMemory = true;
/// <summary>
/// Whether to immediately shut down the transport layer and terminate the process when a fatal exception is observed.
/// This is true by default, to enable failing hosts to leave quickly which allows other hosts to recover the partitions more quickly.
/// </summary>
public bool EmergencyShutdownOnFatalExceptions = true;
/// <summary>
/// Forces steps to pe persisted before applying their effects, disabling all pipelining.
/// </summary>

Просмотреть файл

@ -88,7 +88,7 @@ namespace DurableTask.Netherite
this.LastTransition = this.CurrentTimeMs;
}
public async Task<long> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint)
public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint)
{
EventTraceContext.Clear();
@ -129,7 +129,7 @@ namespace DurableTask.Netherite
// start processing the worker queues
this.State.StartProcessing();
this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition}");
this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition.Item1}.{inputQueuePosition.Item2}");
return inputQueuePosition;
}
catch (OperationCanceledException) when (errorHandler.IsTerminated)

Просмотреть файл

@ -4,8 +4,10 @@
namespace DurableTask.Netherite
{
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Common;
using Microsoft.Extensions.Logging;
// For indicating and initiating termination, and for tracing errors and warnings relating to a partition.
@ -19,6 +21,7 @@ namespace DurableTask.Netherite
readonly string account;
readonly string taskHub;
readonly TaskCompletionSource<object> shutdownComplete;
readonly TransportAbstraction.IHost host;
public event Action OnShutdown;
@ -46,7 +49,7 @@ namespace DurableTask.Netherite
const int TerminatedWithError = 1;
const int TerminatedNormally = 2;
public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelLimit, string storageAccountName, string taskHubName)
public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelLimit, string storageAccountName, string taskHubName, TransportAbstraction.IHost host)
{
this.cts = new CancellationTokenSource();
this.partitionId = partitionId;
@ -55,14 +58,19 @@ namespace DurableTask.Netherite
this.account = storageAccountName;
this.taskHub = taskHubName;
this.shutdownComplete = new TaskCompletionSource<object>();
this.host = host;
}
public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
{
bool isFatal = exception != null && Utils.IsFatal(exception);
isWarning = isWarning && !isFatal;
terminatePartition = terminatePartition || isFatal;
this.TraceError(isWarning, context, message, exception, terminatePartition);
// terminate this partition in response to the error
// if necessary, terminate this partition in response to the error
if (terminatePartition && this.terminationStatus == NotTerminated)
{
if (Interlocked.CompareExchange(ref this.terminationStatus, TerminatedWithError, NotTerminated) == NotTerminated)
@ -70,6 +78,12 @@ namespace DurableTask.Netherite
this.Terminate();
}
}
// if fatal, notify host, because it may start a quick shutdown path in response
if (isFatal)
{
this.host.OnFatalExceptionObserved(exception);
}
}
public void TerminateNormally()

Просмотреть файл

@ -19,7 +19,7 @@ namespace DurableTask.Netherite
public Dictionary<uint, (long Position, int SubPosition)> LastProcessed { get; set; } = new Dictionary<uint, (long,int)>();
[DataMember]
public (long, long) Positions; // used by FasterAlt to persist positions
public (long, (long,int)) Positions; // used by FasterAlt to persist positions
[IgnoreDataMember]
public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Dedup);

Просмотреть файл

@ -640,7 +640,7 @@ namespace DurableTask.Netherite.Faster
}
continue;
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.PartitionErrorHandler.HandleError(nameof(AcquireOwnership), "Could not acquire partition lease", e, true, false);
throw;
@ -720,7 +720,7 @@ namespace DurableTask.Netherite.Faster
// We lost the lease to someone else. Terminate ownership immediately.
this.PartitionErrorHandler.HandleError(nameof(MaintenanceLoopAsync), "Lost partition lease", ex, true, true);
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.PartitionErrorHandler.HandleError(nameof(MaintenanceLoopAsync), "Could not maintain partition lease", e, true, false);
}

Просмотреть файл

@ -22,6 +22,9 @@ namespace DurableTask.Netherite.Faster
[JsonProperty]
public long InputQueuePosition { get; set; }
[JsonProperty]
public int InputQueueBatchPosition { get; set; }
[JsonProperty]
public string InputQueueFingerprint { get; set; }

Просмотреть файл

@ -103,7 +103,7 @@ namespace DurableTask.Netherite.Faster
return Task.FromResult(!logIsEmpty);
}
public override Task<(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint)> RecoverAsync()
public override Task<(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint)> RecoverAsync()
{
foreach (var guid in this.ReadCheckpointIntentions())
{
@ -157,7 +157,7 @@ namespace DurableTask.Netherite.Faster
}
}
public override bool TakeFullCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid)
public override bool TakeFullCheckpoint(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid)
{
checkpointGuid = Guid.NewGuid();
this.StartStoreCheckpoint(commitLogPosition, inputQueuePosition, checkpointGuid);
@ -181,14 +181,14 @@ namespace DurableTask.Netherite.Faster
return default;
}
public override Guid? StartStoreCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, long? shiftBeginAddress)
public override Guid? StartStoreCheckpoint(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint, long? shiftBeginAddress)
{
var guid = Guid.NewGuid();
this.StartStoreCheckpoint(commitLogPosition, inputQueuePosition, guid);
return guid;
}
internal void StartStoreCheckpoint(long commitLogPosition, long inputQueuePosition, Guid guid)
internal void StartStoreCheckpoint(long commitLogPosition, (long,int) inputQueuePosition, Guid guid)
{
// update the positions
var dedupState = this.cache[TrackedObjectKey.Dedup];
@ -472,7 +472,7 @@ namespace DurableTask.Netherite.Faster
await Task.Delay(nextRetryIn);
continue;
}
catch (Exception exception) when (!Utils.IsFatal(exception))
catch (Exception exception)
{
this.blobManager.PartitionErrorHandler.HandleError(nameof(LoadAsync), "Could not read object from storage", exception, true, this.blobManager.PartitionErrorHandler.IsTerminated);
throw;
@ -562,7 +562,7 @@ namespace DurableTask.Netherite.Faster
await Task.Delay(nextRetryIn);
continue;
}
catch (Exception exception) when (!Utils.IsFatal(exception))
catch (Exception exception)
{
this.blobManager?.HandleStorageError(nameof(StoreAsync), "could not write object to storage", blob.Name, exception, true, this.blobManager.PartitionErrorHandler.IsTerminated);
throw;
@ -592,7 +592,7 @@ namespace DurableTask.Netherite.Faster
{
throw new OperationCanceledException("Partition was terminated.", this.terminationToken);
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.blobManager.PartitionErrorHandler.HandleError(nameof(WriteCheckpointIntention), "Failed to write checkpoint intention to storage", e, true, this.blobManager.PartitionErrorHandler.IsTerminated);
throw;
@ -616,7 +616,7 @@ namespace DurableTask.Netherite.Faster
{
throw new OperationCanceledException("Partition was terminated.", this.terminationToken);
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.blobManager.PartitionErrorHandler.HandleError(nameof(RemoveCheckpointIntention), "Failed to remove checkpoint intention from storage", e, true, false);
throw;
@ -648,7 +648,7 @@ namespace DurableTask.Netherite.Faster
{
throw new OperationCanceledException("Partition was terminated.", this.terminationToken);
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.blobManager.PartitionErrorHandler.HandleError(nameof(ReadCheckpointIntentions), "Failed to read checkpoint intentions from storage", e, true, false);
throw;

Просмотреть файл

@ -232,7 +232,7 @@ namespace DurableTask.Netherite.Faster
return this.blobManager.FindCheckpointsAsync(logIsEmpty);
}
public override async Task<(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint)> RecoverAsync()
public override async Task<(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint)> RecoverAsync()
{
try
{
@ -258,7 +258,10 @@ namespace DurableTask.Netherite.Faster
this.cacheTracker.MeasureCacheSize(true);
this.CheckInvariants();
return (this.blobManager.CheckpointInfo.CommitLogPosition, this.blobManager.CheckpointInfo.InputQueuePosition, this.blobManager.CheckpointInfo.InputQueueFingerprint);
return (
this.blobManager.CheckpointInfo.CommitLogPosition,
(this.blobManager.CheckpointInfo.InputQueuePosition, this.blobManager.CheckpointInfo.InputQueueBatchPosition),
this.blobManager.CheckpointInfo.InputQueueFingerprint);
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
@ -293,12 +296,13 @@ namespace DurableTask.Netherite.Faster
return this.mainSession.ReadyToCompletePendingAsync(token);
}
public override bool TakeFullCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid)
public override bool TakeFullCheckpoint(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid)
{
try
{
this.blobManager.CheckpointInfo.CommitLogPosition = commitLogPosition;
this.blobManager.CheckpointInfo.InputQueuePosition = inputQueuePosition;
this.blobManager.CheckpointInfo.InputQueuePosition = inputQueuePosition.Item1;
this.blobManager.CheckpointInfo.InputQueueBatchPosition = inputQueuePosition.Item2;
this.blobManager.CheckpointInfo.InputQueueFingerprint = inputQueueFingerprint;
if (this.fht.TryInitiateFullCheckpoint(out checkpointGuid, CheckpointType.FoldOver))
{
@ -398,12 +402,13 @@ namespace DurableTask.Netherite.Faster
}
}
public override Guid? StartStoreCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, long? shiftBeginAddress)
public override Guid? StartStoreCheckpoint(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint, long? shiftBeginAddress)
{
try
{
this.blobManager.CheckpointInfo.CommitLogPosition = commitLogPosition;
this.blobManager.CheckpointInfo.InputQueuePosition = inputQueuePosition;
this.blobManager.CheckpointInfo.InputQueuePosition = inputQueuePosition.Item1;
this.blobManager.CheckpointInfo.InputQueueBatchPosition = inputQueuePosition.Item2;
this.blobManager.CheckpointInfo.InputQueueFingerprint = inputQueueFingerprint;
if (shiftBeginAddress > this.fht.Log.BeginAddress)
@ -451,11 +456,10 @@ namespace DurableTask.Netherite.Faster
var stats = (StatsState) this.singletons[(int)TrackedObjectKey.Stats.ObjectType];
long actualLogSize = this.fht.Log.TailAddress - this.fht.Log.BeginAddress;
long minimalLogSize = this.MinimalLogSize;
long compactionAreaSize = (long)(0.5 * (this.fht.Log.SafeReadOnlyAddress - this.fht.Log.BeginAddress));
long mutableSectionSize = (this.fht.Log.TailAddress - this.fht.Log.SafeReadOnlyAddress);
long compactionAreaSize = Math.Min(50000, this.fht.Log.SafeReadOnlyAddress - this.fht.Log.BeginAddress);
if (actualLogSize > 2 * minimalLogSize // there must be significant bloat
&& mutableSectionSize < compactionAreaSize) // the potential size reduction must outweigh the cost of a foldover
&& compactionAreaSize >= 5000) // and enough compaction area to justify the overhead
{
return this.fht.Log.BeginAddress + compactionAreaSize;
}
@ -674,7 +678,7 @@ namespace DurableTask.Netherite.Faster
{
// partition is terminating
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.partition.ErrorHandler.HandleError(nameof(RunPrefetchSession), "PrefetchSession {sessionId} encountered exception", e, false, this.partition.ErrorHandler.IsTerminated);
}

Просмотреть файл

@ -29,29 +29,29 @@ namespace DurableTask.Netherite.Faster
// ----- faster storage layer events
public void FasterStoreCreated(long inputQueuePosition, long latencyMs)
public void FasterStoreCreated((long,int) inputQueuePosition, long latencyMs)
{
if (this.logLevelLimit <= LogLevel.Information)
{
this.logger.LogInformation("Part{partition:D2} Created Store, inputQueuePosition={inputQueuePosition} latencyMs={latencyMs}", this.partitionId, inputQueuePosition, latencyMs);
EtwSource.Log.FasterStoreCreated(this.account, this.taskHub, this.partitionId, inputQueuePosition, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
this.logger.LogInformation("Part{partition:D2} Created Store, inputQueuePosition={inputQueuePosition}.{inputQueueBatchPosition} latencyMs={latencyMs}", this.partitionId, inputQueuePosition.Item1, inputQueuePosition.Item2, latencyMs);
EtwSource.Log.FasterStoreCreated(this.account, this.taskHub, this.partitionId, inputQueuePosition.Item1, inputQueuePosition.Item2, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}
public void FasterCheckpointStarted(Guid checkpointId, string details, string storeStats, long commitLogPosition, long inputQueuePosition)
public void FasterCheckpointStarted(Guid checkpointId, string details, string storeStats, long commitLogPosition, (long, int) inputQueuePosition)
{
if (this.logLevelLimit <= LogLevel.Information)
{
this.logger.LogInformation("Part{partition:D2} Started Checkpoint {checkpointId}, details={details}, storeStats={storeStats}, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition}", this.partitionId, checkpointId, details, storeStats, commitLogPosition, inputQueuePosition);
EtwSource.Log.FasterCheckpointStarted(this.account, this.taskHub, this.partitionId, checkpointId, details, storeStats, commitLogPosition, inputQueuePosition, TraceUtils.AppName, TraceUtils.ExtensionVersion);
this.logger.LogInformation("Part{partition:D2} Started Checkpoint {checkpointId}, details={details}, storeStats={storeStats}, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition}.{inputQueueBatchPosition}", this.partitionId, checkpointId, details, storeStats, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2);
EtwSource.Log.FasterCheckpointStarted(this.account, this.taskHub, this.partitionId, checkpointId, details, storeStats, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}
public void FasterCheckpointPersisted(Guid checkpointId, string details, long commitLogPosition, long inputQueuePosition, long latencyMs)
public void FasterCheckpointPersisted(Guid checkpointId, string details, long commitLogPosition, (long,int) inputQueuePosition, long latencyMs)
{
if (this.logLevelLimit <= LogLevel.Information)
{
this.logger.LogInformation("Part{partition:D2} Persisted Checkpoint {checkpointId}, details={details}, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition} latencyMs={latencyMs}", this.partitionId, checkpointId, details, commitLogPosition, inputQueuePosition, latencyMs);
EtwSource.Log.FasterCheckpointPersisted(this.account, this.taskHub, this.partitionId, checkpointId, details, commitLogPosition, inputQueuePosition, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
this.logger.LogInformation("Part{partition:D2} Persisted Checkpoint {checkpointId}, details={details}, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition}.{inputQueueBatchPosition} latencyMs={latencyMs}", this.partitionId, checkpointId, details, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2, latencyMs);
EtwSource.Log.FasterCheckpointPersisted(this.account, this.taskHub, this.partitionId, checkpointId, details, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
if (latencyMs > 10000)
@ -83,21 +83,21 @@ namespace DurableTask.Netherite.Faster
}
}
public void FasterCheckpointLoaded(long commitLogPosition, long inputQueuePosition, string storeStats, long latencyMs)
public void FasterCheckpointLoaded(long commitLogPosition, (long,int) inputQueuePosition, string storeStats, long latencyMs)
{
if (this.logLevelLimit <= LogLevel.Information)
{
this.logger.LogInformation("Part{partition:D2} Loaded Checkpoint, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition} storeStats={storeStats} latencyMs={latencyMs}", this.partitionId, commitLogPosition, inputQueuePosition, storeStats, latencyMs);
EtwSource.Log.FasterCheckpointLoaded(this.account, this.taskHub, this.partitionId, commitLogPosition, inputQueuePosition, storeStats, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
this.logger.LogInformation("Part{partition:D2} Loaded Checkpoint, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition}.{inputQueueBatchPosition} storeStats={storeStats} latencyMs={latencyMs}", this.partitionId, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2, storeStats, latencyMs);
EtwSource.Log.FasterCheckpointLoaded(this.account, this.taskHub, this.partitionId, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2, storeStats, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}
public void FasterLogReplayed(long commitLogPosition, long inputQueuePosition, long numberEvents, long sizeInBytes, string storeStats, long latencyMs)
public void FasterLogReplayed(long commitLogPosition, (long,int) inputQueuePosition, long numberEvents, long sizeInBytes, string storeStats, long latencyMs)
{
if (this.logLevelLimit <= LogLevel.Information)
{
this.logger.LogInformation("Part{partition:D2} Replayed CommitLog, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition} numberEvents={numberEvents} sizeInBytes={sizeInBytes} storeStats={storeStats} latencyMs={latencyMs}", this.partitionId, commitLogPosition, inputQueuePosition, numberEvents, sizeInBytes, storeStats, latencyMs);
EtwSource.Log.FasterLogReplayed(this.account, this.taskHub, this.partitionId, commitLogPosition, inputQueuePosition, numberEvents, sizeInBytes, storeStats, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
this.logger.LogInformation("Part{partition:D2} Replayed CommitLog, commitLogPosition={commitLogPosition} inputQueuePosition={inputQueuePosition}.{inputQueueBatchPosition} numberEvents={numberEvents} sizeInBytes={sizeInBytes} storeStats={storeStats} latencyMs={latencyMs}", this.partitionId, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2, numberEvents, sizeInBytes, storeStats, latencyMs);
EtwSource.Log.FasterLogReplayed(this.account, this.taskHub, this.partitionId, commitLogPosition, inputQueuePosition.Item1, inputQueuePosition.Item2, numberEvents, sizeInBytes, storeStats, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}

Просмотреть файл

@ -72,7 +72,7 @@ namespace DurableTask.Netherite.Faster
await what;
}
public async Task<long> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint)
public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint)
{
this.partition = partition;
this.terminationToken = errorHandler.Token;

Просмотреть файл

@ -35,7 +35,7 @@ namespace DurableTask.Netherite.Faster
public Partition Partition;
public Dictionary<TrackedObjectKey, string> Store;
public long CommitLogPosition;
public long InputQueuePosition;
public (long,int) InputQueuePosition;
public EffectTracker EffectTracker;
}
@ -131,7 +131,7 @@ namespace DurableTask.Netherite.Faster
PartitionUpdateEvent DeserializePartitionUpdateEvent(string content)
=> (PartitionUpdateEvent) JsonConvert.DeserializeObject(content, this.settings);
internal void PartitionStarting(Partition partition, TrackedObjectStore store, long CommitLogPosition, long InputQueuePosition)
internal void PartitionStarting(Partition partition, TrackedObjectStore store, long CommitLogPosition, (long,int) InputQueuePosition)
{
var info = new Info()
{
@ -286,7 +286,7 @@ namespace DurableTask.Netherite.Faster
return default;
}
public override (long, long) GetPositions()
public override (long, (long,int)) GetPositions()
{
return (this.info.CommitLogPosition, this.info.InputQueuePosition);
}

Просмотреть файл

@ -23,7 +23,7 @@ namespace DurableTask.Netherite.Faster
bool isShuttingDown;
public string InputQueueFingerprint { get; private set; }
public long InputQueuePosition { get; private set; }
public (long,int) InputQueuePosition { get; private set; }
public long CommitLogPosition { get; private set; }
public LogWorker LogWorker { get; set; }
@ -31,8 +31,8 @@ namespace DurableTask.Netherite.Faster
// periodic index and store checkpointing
CheckpointTrigger pendingCheckpointTrigger;
Task pendingIndexCheckpoint;
Task<(long, long)> pendingStoreCheckpoint;
long lastCheckpointedInputQueuePosition;
Task<(long, (long,int))> pendingStoreCheckpoint;
(long,int) lastCheckpointedInputQueuePosition;
long lastCheckpointedCommitLogPosition;
long numberEventsSinceLastCheckpoint;
DateTime timeOfNextIdleCheckpoint;
@ -77,7 +77,7 @@ namespace DurableTask.Netherite.Faster
this.partition.ErrorHandler.Token.ThrowIfCancellationRequested();
this.InputQueueFingerprint = fingerprint;
this.InputQueuePosition = 0;
this.InputQueuePosition = (0,0);
this.CommitLogPosition = initialCommitLogPosition;
this.store.InitMainSession();
@ -101,7 +101,7 @@ namespace DurableTask.Netherite.Faster
var pokeLoop = this.PokeLoop();
}
public void SetCheckpointPositionsAfterRecovery(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint)
public void SetCheckpointPositionsAfterRecovery(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint)
{
this.CommitLogPosition = commitLogPosition;
this.InputQueuePosition = inputQueuePosition;
@ -225,9 +225,9 @@ namespace DurableTask.Netherite.Faster
this.loadInfo.CommitLogPosition = this.CommitLogPosition;
publish = true;
}
if (this.loadInfo.InputQueuePosition < this.InputQueuePosition)
if (this.loadInfo.InputQueuePosition < this.InputQueuePosition.Item1)
{
this.loadInfo.InputQueuePosition = this.InputQueuePosition;
this.loadInfo.InputQueuePosition = this.InputQueuePosition.Item1;
publish = true;
}
@ -293,7 +293,7 @@ namespace DurableTask.Netherite.Faster
compactUntil = null;
long inputQueuePositionLag =
this.InputQueuePosition - Math.Max(this.lastCheckpointedInputQueuePosition, this.LogWorker.LastCommittedInputQueuePosition);
this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition);
if (this.lastCheckpointedCommitLogPosition + this.partition.Settings.MaxNumberBytesBetweenCheckpoints <= this.CommitLogPosition)
{
@ -398,8 +398,8 @@ namespace DurableTask.Netherite.Faster
// we can persist it as part of a snapshot
if (partitionEvent.NextInputQueuePosition > 0)
{
this.partition.Assert(partitionEvent.NextInputQueuePosition > this.InputQueuePosition, "partitionEvent.NextInputQueuePosition > this.InputQueuePosition");
this.InputQueuePosition = partitionEvent.NextInputQueuePosition;
this.partition.Assert(partitionEvent.NextInputQueuePositionTuple.CompareTo(this.InputQueuePosition) > 0, "partitionEvent.NextInputQueuePosition > this.InputQueuePosition");
this.InputQueuePosition = partitionEvent.NextInputQueuePositionTuple;
}
// if we are processing events that count as activity, our latency category is at least "low"
@ -518,7 +518,7 @@ namespace DurableTask.Netherite.Faster
}
}
public async Task<(long,long)> WaitForCheckpointAsync(bool isIndexCheckpoint, Guid checkpointToken, bool removeObsoleteCheckpoints)
public async Task<(long,(long,int))> WaitForCheckpointAsync(bool isIndexCheckpoint, Guid checkpointToken, bool removeObsoleteCheckpoints)
{
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
@ -589,7 +589,7 @@ namespace DurableTask.Netherite.Faster
if (queueChange)
{
this.InputQueuePosition = 0;
this.InputQueuePosition = (0,0);
this.InputQueueFingerprint = inputQueueFingerprint;
this.traceHelper.FasterProgress($"Resetting input queue position because of new fingerprint {inputQueueFingerprint}");
@ -618,15 +618,15 @@ namespace DurableTask.Netherite.Faster
// update the input queue position if larger
// it can be smaller since the checkpoint can store positions advanced by non-update events
if (partitionUpdateEvent.NextInputQueuePosition > this.InputQueuePosition)
if (partitionUpdateEvent.NextInputQueuePositionTuple.CompareTo(this.InputQueuePosition) > 0)
{
this.InputQueuePosition = partitionUpdateEvent.NextInputQueuePosition;
this.InputQueuePosition = partitionUpdateEvent.NextInputQueuePositionTuple;
}
// must keep track of queue fingerprint changes detected in previous recoveries
else if (partitionUpdateEvent.ResetInputQueue)
{
this.InputQueuePosition = 0;
this.InputQueuePosition = (0,0);
this.InputQueueFingerprint = ((RecoveryCompleted) partitionUpdateEvent).ChangedFingerprint;
}
@ -644,7 +644,7 @@ namespace DurableTask.Netherite.Faster
{
// the transport layer should always deliver a fresh event; if it repeats itself that's a bug
// (note that it may not be the very next in the sequence since readonly events are not persisted in the log)
if (partitionUpdateEvent.NextInputQueuePosition > 0 && partitionUpdateEvent.NextInputQueuePosition <= this.InputQueuePosition)
if (partitionUpdateEvent.NextInputQueuePosition > 0 && partitionUpdateEvent.NextInputQueuePositionTuple.CompareTo(this.InputQueuePosition) <= 0)
{
this.partition.ErrorHandler.HandleError(nameof(ProcessUpdate), "Duplicate event detected", null, false, false);
return;

Просмотреть файл

@ -17,7 +17,7 @@ namespace DurableTask.Netherite.Faster
public abstract Task<bool> FindCheckpointAsync(bool logIsEmpty);
public abstract Task<(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint)> RecoverAsync();
public abstract Task<(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint)> RecoverAsync();
public abstract bool CompletePending();
@ -25,13 +25,13 @@ namespace DurableTask.Netherite.Faster
public abstract void AdjustCacheSize();
public abstract bool TakeFullCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid);
public abstract bool TakeFullCheckpoint(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid);
public abstract Task RemoveObsoleteCheckpoints();
public abstract Guid? StartIndexCheckpoint();
public abstract Guid? StartStoreCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, long? shiftBeginAddress);
public abstract Guid? StartStoreCheckpoint(long commitLogPosition, (long,int) inputQueuePosition, string inputQueueFingerprint, long? shiftBeginAddress);
public abstract ValueTask CompleteCheckpointAsync();

Просмотреть файл

@ -26,7 +26,7 @@ namespace DurableTask.Netherite.Faster
return this.store.ProcessEffectOnTrackedObject(key, tracker);
}
public override (long, long) GetPositions()
public override (long,(long,int)) GetPositions()
{
return (this.storeWorker.CommitLogPosition, this.storeWorker.InputQueuePosition);
}

Просмотреть файл

@ -23,7 +23,7 @@ namespace DurableTask.Netherite
Partition partition;
EffectTracker effects;
long commitPosition = 0;
long inputQueuePosition = 0;
(long,int) inputQueuePosition =(0,0);
public MemoryStorage(ILogger logger) : base(nameof(MemoryStorageLayer), true, int.MaxValue, CancellationToken.None, null)
{
@ -52,7 +52,7 @@ namespace DurableTask.Netherite
base.SubmitBatch(entries);
}
public async Task<long> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint)
public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint)
{
await Task.Yield();
this.partition = partition;
@ -69,7 +69,7 @@ namespace DurableTask.Netherite
}
this.commitPosition = 1;
this.inputQueuePosition = 0;
this.inputQueuePosition = (0,0);
return this.inputQueuePosition;
}
@ -173,8 +173,8 @@ namespace DurableTask.Netherite
if (partitionEvent.NextInputQueuePosition > 0)
{
this.partition.Assert(partitionEvent.NextInputQueuePosition > this.inputQueuePosition, "partitionEvent.NextInputQueuePosition > this.inputQueuePosition in MemoryStorage");
this.inputQueuePosition = partitionEvent.NextInputQueuePosition;
this.partition.Assert(partitionEvent.NextInputQueuePositionTuple.CompareTo(this.inputQueuePosition) > 0, "partitionEvent.NextInputQueuePosition > this.inputQueuePosition in MemoryStorage");
this.inputQueuePosition = partitionEvent.NextInputQueuePositionTuple;
}
}
catch (Exception e)
@ -211,7 +211,7 @@ namespace DurableTask.Netherite
return default;
}
public override (long, long) GetPositions()
public override (long, (long,int)) GetPositions()
{
return (this.memoryStorage.commitPosition, this.memoryStorage.inputQueuePosition);
}

Просмотреть файл

@ -273,25 +273,25 @@ namespace DurableTask.Netherite
// ----- Faster Storage
[Event(250, Level = EventLevel.Informational, Version = 2)]
public void FasterStoreCreated(string Account, string TaskHub, int PartitionId, long InputQueuePosition, long ElapsedMs, string AppName, string ExtensionVersion)
[Event(250, Level = EventLevel.Informational, Version = 3)]
public void FasterStoreCreated(string Account, string TaskHub, int PartitionId, long InputQueuePosition, long InputQueueBatchPosition, long ElapsedMs, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(250, Account, TaskHub, PartitionId, InputQueuePosition, ElapsedMs, AppName, ExtensionVersion);
this.WriteEvent(250, Account, TaskHub, PartitionId, InputQueuePosition, InputQueueBatchPosition, ElapsedMs, AppName, ExtensionVersion);
}
[Event(251, Level = EventLevel.Informational, Version = 1)]
public void FasterCheckpointStarted(string Account, string TaskHub, int PartitionId, Guid CheckpointId, string Details, string StoreStats, long CommitLogPosition, long InputQueuePosition, string AppName, string ExtensionVersion)
[Event(251, Level = EventLevel.Informational, Version = 2)]
public void FasterCheckpointStarted(string Account, string TaskHub, int PartitionId, Guid CheckpointId, string Details, string StoreStats, long CommitLogPosition, long InputQueuePosition, long InputQueueBatchPosition, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(251, Account, TaskHub, PartitionId, CheckpointId, Details, StoreStats, CommitLogPosition, InputQueuePosition, AppName, ExtensionVersion);
this.WriteEvent(251, Account, TaskHub, PartitionId, CheckpointId, Details, StoreStats, CommitLogPosition, InputQueuePosition, InputQueueBatchPosition, AppName, ExtensionVersion);
}
[Event(252, Level = EventLevel.Informational, Version = 2)]
public void FasterCheckpointPersisted(string Account, string TaskHub, int PartitionId, Guid CheckpointId, string Details, long CommitLogPosition, long InputQueuePosition, long ElapsedMs, string AppName, string ExtensionVersion)
[Event(252, Level = EventLevel.Informational, Version = 3)]
public void FasterCheckpointPersisted(string Account, string TaskHub, int PartitionId, Guid CheckpointId, string Details, long CommitLogPosition, long InputQueuePosition, long InputQueueBatchPosition, long ElapsedMs, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(252, Account, TaskHub, PartitionId, CheckpointId, Details, CommitLogPosition, InputQueuePosition, ElapsedMs, AppName, ExtensionVersion);
this.WriteEvent(252, Account, TaskHub, PartitionId, CheckpointId, Details, CommitLogPosition, InputQueuePosition, InputQueueBatchPosition, ElapsedMs, AppName, ExtensionVersion);
}
[Event(253, Level = EventLevel.Verbose, Version = 2)]
@ -301,18 +301,18 @@ namespace DurableTask.Netherite
this.WriteEvent(253, Account, TaskHub, PartitionId, CommitLogPosition, NumberEvents, SizeInBytes, ElapsedMs, AppName, ExtensionVersion);
}
[Event(254, Level = EventLevel.Informational, Version = 2)]
public void FasterCheckpointLoaded(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, string StoreStats, long ElapsedMs, string AppName, string ExtensionVersion)
[Event(254, Level = EventLevel.Informational, Version = 3)]
public void FasterCheckpointLoaded(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, long InputQueueBatchPosition, string StoreStats, long ElapsedMs, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(254, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, StoreStats, ElapsedMs, AppName, ExtensionVersion);
this.WriteEvent(254, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, InputQueueBatchPosition, StoreStats, ElapsedMs, AppName, ExtensionVersion);
}
[Event(255, Level = EventLevel.Informational, Version = 2)]
public void FasterLogReplayed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, long NumberEvents, long SizeInBytes, string StoreStats, long ElapsedMs, string AppName, string ExtensionVersion)
[Event(255, Level = EventLevel.Informational, Version = 3)]
public void FasterLogReplayed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, long InputQueueBatchPosition, long NumberEvents, long SizeInBytes, string StoreStats, long ElapsedMs, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(255, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, NumberEvents, SizeInBytes, StoreStats, ElapsedMs, AppName, ExtensionVersion);
this.WriteEvent(255, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, InputQueueBatchPosition, NumberEvents, SizeInBytes, StoreStats, ElapsedMs, AppName, ExtensionVersion);
}
[Event(256, Level = EventLevel.Error, Version = 1)]

Просмотреть файл

@ -0,0 +1,365 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.EventHubsTransport
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Netherite.Faster;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Storage;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Logging;
class BlobBatchReceiver<TEvent> where TEvent : Event
{
readonly string traceContext;
readonly EventHubsTraceHelper traceHelper;
readonly EventHubsTraceHelper lowestTraceLevel;
readonly BlobContainerClient containerClient;
readonly bool keepUntilConfirmed;
readonly bool isClientReceiver;
// Event Hubs discards messages after 24h, so we can throw away batches that are older than that
readonly static TimeSpan expirationTimeSpan = TimeSpan.FromHours(24) + TimeSpan.FromMinutes(1);
BlobDeletions blobDeletions;
public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings, bool keepUntilConfirmed)
{
this.traceContext = traceContext;
this.traceHelper = traceHelper;
this.lowestTraceLevel = traceHelper.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Trace) ? traceHelper : null;
var serviceClient = BlobUtilsV12.GetServiceClients(settings.BlobStorageConnection).WithRetries;
string containerName = BlobManager.GetContainerName(settings.HubName);
this.containerClient = serviceClient.GetBlobContainerClient(containerName);
this.keepUntilConfirmed = keepUntilConfirmed;
this.blobDeletions = this.keepUntilConfirmed ? new BlobDeletions(this) : null;
this.isClientReceiver = typeof(TEvent) == typeof(ClientEvent);
}
public async IAsyncEnumerable<(EventData eventData, TEvent[] events, long)> ReceiveEventsAsync(
byte[] guid,
IEnumerable<EventData> hubMessages,
[EnumeratorCancellation] CancellationToken token,
long? nextPacketToReceive = null)
{
int ignoredPacketCount = 0;
foreach (var eventData in hubMessages)
{
var seqno = eventData.SystemProperties.SequenceNumber;
if (nextPacketToReceive.HasValue)
{
if (seqno < nextPacketToReceive.Value)
{
this.lowestTraceLevel?.LogTrace("{context} discarded packet #{seqno} because it is already processed", this.traceContext, seqno);
continue;
}
else if (seqno > nextPacketToReceive.Value)
{
this.traceHelper.LogError("{context} received wrong packet, #{seqno} instead of #{expected} ", this.traceContext, seqno, nextPacketToReceive.Value);
// this should never happen, as EventHubs guarantees in-order delivery of packets
throw new InvalidOperationException("EventHubs Out-Of-Order Packet");
}
}
TEvent evt;
Packet.BlobReference blobReference;
try
{
Packet.Deserialize(eventData.Body, out evt, out blobReference, guid);
}
catch (Exception)
{
this.traceHelper.LogError("{context} could not deserialize packet #{seqno} ({size} bytes)", this.traceContext, seqno, eventData.Body.Count);
throw;
}
if (blobReference == null)
{
if (evt == null)
{
this.lowestTraceLevel?.LogTrace("{context} ignored packet #{seqno} ({size} bytes) because its guid does not match taskhub/client", this.traceContext, seqno, eventData.Body.Count);
ignoredPacketCount++;
}
else
{
yield return (eventData, new TEvent[1] { evt }, seqno);
}
}
else // we have to read messages from a blob batch
{
string blobPath = $"{BlobBatchSender.PathPrefix}{blobReference.BlobName}";
BlockBlobClient blobClient = this.containerClient.GetBlockBlobClient(blobPath);
await BlobManager.AsynchronousStorageReadMaxConcurrency.WaitAsync();
byte[] blobContent;
token.ThrowIfCancellationRequested();
try
{
this.lowestTraceLevel?.LogTrace("{context} downloading blob {blobName}", this.traceContext, blobClient.Name);
Azure.Response<BlobDownloadResult> downloadResult = await blobClient.DownloadContentAsync(token);
blobContent = downloadResult.Value.Content.ToArray();
this.lowestTraceLevel?.LogTrace("{context} downloaded blob {blobName} ({size} bytes, {count} packets)", this.traceContext, blobClient.Name, blobContent.Length, blobReference.PacketOffsets.Count + 1);
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// normal during shutdown
throw;
}
catch (Exception exception)
{
this.traceHelper.LogError("{context} failed to read blob {blobName} for #{seqno}: {exception}", this.traceContext, blobClient.Name, seqno, exception);
throw;
}
finally
{
BlobManager.AsynchronousStorageReadMaxConcurrency.Release();
}
TEvent[] result = new TEvent[blobReference.PacketOffsets.Count + 1];
for (int i = 0; i < result.Length; i++)
{
var offset = i == 0 ? 0 : blobReference.PacketOffsets[i - 1];
var nextOffset = i < blobReference.PacketOffsets.Count ? blobReference.PacketOffsets[i] : blobContent.Length;
var length = nextOffset - offset;
using var m = new MemoryStream(blobContent, offset, length, writable: false);
token.ThrowIfCancellationRequested();
try
{
Packet.Deserialize(m, out result[i], out _, null); // no need to check task hub match again
}
catch (Exception)
{
this.traceHelper.LogError("{context} could not deserialize packet from blob {blobName} at #{seqno}.{subSeqNo} offset={offset} length={length}", this.traceContext, blobClient.Name, seqno, i, offset, length);
throw;
}
}
yield return (eventData, result, seqno);
// we issue a deletion task; there is no strong guarantee that deletion will successfully complete
// which means some blobs can be left behind temporarily.
// This is fine because we have a second deletion path, a scan that removes old
// blobs that are past EH's expiration date.
if (!this.keepUntilConfirmed)
{
var bgTask = Task.Run(() => this.DeleteBlobAsync(new BlockBlobClient[1] { blobClient }));
}
else
{
// we cannot delete the blob until the partition has persisted an input queue position
// past this blob, so that we know we will not need to read it again
if (this.blobDeletions.TryRegister(result, blobClient))
{
this.blobDeletions = new BlobDeletions(this);
}
else
{
// the current batch will be registered along with the next
// batch that successfully registers
}
}
}
if (nextPacketToReceive.HasValue)
{
nextPacketToReceive = seqno + 1;
}
}
if (ignoredPacketCount > 0)
{
if (this.isClientReceiver)
{
// Ignored packets are very common for clients because multiple clients may share the same partition. We log this only for debug purposes.
this.traceHelper.LogDebug("{context} ignored {count} packets for different client", this.traceContext, ignoredPacketCount);
}
else
{
// Ignored packets may indicate misconfiguration (multiple taskhubs using same EH namespace). We create a visible warning.
this.traceHelper.LogWarning("{context} ignored {count} packets for different taskhub", this.traceContext, ignoredPacketCount);
}
}
}
public async Task<int> DeleteBlobAsync(IEnumerable<BlockBlobClient> blobClients)
{
int deletedCount = 0;
foreach (var blobClient in blobClients)
{
await BlobManager.AsynchronousStorageWriteMaxConcurrency.WaitAsync();
try
{
this.lowestTraceLevel?.LogTrace("{context} deleting blob {blobName}", this.traceContext, blobClient.Name);
Azure.Response response = await blobClient.DeleteAsync();
this.lowestTraceLevel?.LogTrace("{context} deleted blob {blobName}", this.traceContext, blobClient.Name);
deletedCount++;
}
catch (Azure.RequestFailedException e) when (BlobUtilsV12.BlobDoesNotExist(e))
{
this.lowestTraceLevel?.LogTrace("{context} blob {blobName} was already deleted", this.traceContext, blobClient.Name);
}
catch (Exception exception)
{
this.traceHelper.LogError("{context} failed to delete blob {blobName} : {exception}", this.traceContext, blobClient.Name, exception);
}
finally
{
BlobManager.AsynchronousStorageWriteMaxConcurrency.Release();
}
}
return deletedCount;
}
class BlobDeletions : TransportAbstraction.IDurabilityListener
{
readonly BlobBatchReceiver<TEvent> blobBatchReceiver;
readonly List<BlockBlobClient> blobClients;
public BlobDeletions(BlobBatchReceiver<TEvent> blobBatchReceiver)
{
this.blobBatchReceiver = blobBatchReceiver;
this.blobClients = new List<BlockBlobClient>();
}
public bool TryRegister(TEvent[] events, BlockBlobClient blobClient)
{
this.blobClients.Add(blobClient);
for(int i = events.Length - 1; i >= 0; i--)
{
if (events[i] is PartitionUpdateEvent e)
{
// we can register a callback
// to be invoked after the event has been persisted in the log
DurabilityListeners.Register(e, this);
return true;
}
}
return false; // only read or query events in this batch, cannot register a callback
}
public void ConfirmDurable(Event evt)
{
Task.Run(() => this.blobBatchReceiver.DeleteBlobAsync(this.blobClients));
}
}
public async Task<int> RemoveGarbageAsync(CancellationToken token)
{
async IAsyncEnumerable<Azure.Page<BlobItem>> GetExpiredBlobs()
{
// use a small first page since most of the time the query will
// return blobs that have not expired yet, so we are wasting time and space if the
// page is large
var firstpage = await this.containerClient.GetBlobsAsync(
prefix: BlobBatchSender.PathPrefix,
cancellationToken: token)
.AsPages(continuationToken: null, pageSizeHint: 5)
.FirstAsync();
yield return firstpage;
if (firstpage.ContinuationToken != null)
{
// for the remaining pages, use regular page size to reduce cost
var remainingPages = this.containerClient.GetBlobsAsync(
prefix: BlobBatchSender.PathPrefix,
cancellationToken: token)
.AsPages(continuationToken: firstpage.ContinuationToken, pageSizeHint: 100);
await foreach (var page in remainingPages)
{
yield return page;
}
}
}
int deletedCount = 0;
try
{
await foreach (Azure.Page<BlobItem> page in GetExpiredBlobs())
{
List<BlockBlobClient> blobs = new List<BlockBlobClient>();
bool completed = false;
foreach (var blob in page.Values)
{
if (IsExpired(blob.Name))
{
blobs.Add(this.containerClient.GetBlockBlobClient(blob.Name));
}
else
{
// blobs are sorted in ascending time order, so once we found one that is not
// expired yet we can stop enumerating
completed = true;
}
deletedCount += await this.DeleteBlobAsync(blobs);
if (completed)
{
return deletedCount;
}
bool IsExpired(string path)
{
// {PathPrefix}2023-06-13T23:28:55.5043743Z-2CA224EC
var name = path.Substring(BlobBatchSender.PathPrefix.Length);
// 2023-06-13T23:28:55.5043743Z-2CA224EC
var date = name.Substring(0, name.Length - 9);
// 2023-06-13T23:28:55.5043743Z
if (DateTime.TryParse(date, out DateTime result))
{
return (DateTime.Now - result) > expirationTimeSpan;
}
else
{
this.traceHelper.LogError("{context} failed to parse blob name {blobName} : '{date}' is not a DateTime", this.traceContext, name, date);
return false;
}
}
}
}
}
catch(OperationCanceledException) when (token.IsCancellationRequested)
{
// normal during shutdown;
}
catch (Exception exception)
{
this.traceHelper.LogError("{context} encountered exception while removing expired blob batches : {exception}", this.traceContext, exception);
}
return deletedCount;
}
}
}

Просмотреть файл

@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite.EventHubsTransport
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Netherite.Abstractions;
using DurableTask.Netherite.Faster;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
class BlobBatchSender
{
readonly string traceContext;
readonly EventHubsTraceHelper traceHelper;
readonly EventHubsTraceHelper lowestTraceLevel;
readonly BlobContainerClient containerClient;
readonly Random random = new Random();
readonly BlobUploadOptions options;
public const string PathPrefix = "eh-batches/";
public BlobBatchSender(string traceContext, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
{
this.traceContext = traceContext;
this.traceHelper = traceHelper;
this.lowestTraceLevel = traceHelper.IsEnabled(LogLevel.Trace) ? traceHelper : null;
var serviceClient = BlobUtilsV12.GetServiceClients(settings.BlobStorageConnection).WithRetries;
string containerName = BlobManager.GetContainerName(settings.HubName);
this.containerClient = serviceClient.GetBlobContainerClient(containerName);
this.options = new BlobUploadOptions() { };
}
// these constants influence the max size of batches transmitted in EH and via blobs.
// note that these cannot be made arbitrarily large (EH batches cannot exceed max batch size on EH, and neither can the indexes of blob batches)
public int MaxEventHubsBatchBytes = 30 * 1024;
public int MaxEventHubsBatchEvents = 300;
public int MaxBlobBatchBytes = 500 * 1024;
public int MaxBlobBatchEvents = 5000;
string GetRandomBlobName()
{
uint random = (uint)this.random.Next();
return $"{DateTime.UtcNow:o}-{random:X8}";
}
public async Task<EventData> UploadEventsAsync(MemoryStream stream, List<int> packetOffsets, byte[] guid, CancellationToken token)
{
string blobName = this.GetRandomBlobName();
string blobPath = $"{PathPrefix}{blobName}";
BlockBlobClient blobClient = this.containerClient.GetBlockBlobClient(blobPath);
long totalBytes = stream.Position;
stream.Seek(0, SeekOrigin.Begin);
stream.SetLength(totalBytes);
this.lowestTraceLevel?.LogTrace("{context} is writing blob {blobName} ({size} bytes)", this.traceContext, blobClient.Name, totalBytes);
await BlobManager.AsynchronousStorageWriteMaxConcurrency.WaitAsync();
try
{
await blobClient.UploadAsync(stream, this.options, token).ConfigureAwait(false);
this.lowestTraceLevel?.LogTrace("{context} wrote blob {blobName}", this.traceContext, blobClient.Name);
// create a message to send via event hubs
stream.SetLength(0);
Packet.Serialize(blobName, packetOffsets, stream, guid);
var arraySegment = new ArraySegment<byte>(stream.GetBuffer(), 0, (int)stream.Position);
var eventData = new EventData(arraySegment);
return eventData;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// normal during shutdown
throw;
}
catch (Exception exception)
{
this.traceHelper.LogError("{context} failed to write blob {blobName} : {exception}", this.traceContext, blobClient.Name, exception);
throw;
}
finally
{
BlobManager.AsynchronousStorageWriteMaxConcurrency.Release();
}
}
}
}

Просмотреть файл

@ -19,12 +19,12 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly EventHubsSender<ClientEvent>[] channels;
int roundRobin;
public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, EventHubsTraceHelper traceHelper)
public EventHubsClientSender(TransportAbstraction.IHost host, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
{
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[senders.Length];
for (int i = 0; i < senders.Length; i++)
{
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], traceHelper);
this.channels[i] = new EventHubsSender<ClientEvent>(host, clientId.ToByteArray(), senders[i], shutdownToken, traceHelper, settings);
}
}
@ -41,5 +41,10 @@ namespace DurableTask.Netherite.EventHubsTransport
var channel = this.channels.FirstOrDefault(this.Idle) ?? this.NextChannel();
channel.Submit(toSend);
}
public Task WaitForShutdownAsync()
{
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()));
}
}
}

Просмотреть файл

@ -19,6 +19,7 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly string[] clientHubs;
readonly string partitionHub;
readonly string loadMonitorHub;
readonly CancellationToken shutdownToken;
EventHubClient partitionClient;
List<EventHubClient> clientClients;
@ -44,12 +45,14 @@ namespace DurableTask.Netherite.EventHubsTransport
ConnectionInfo connectionInfo,
string partitionHub,
string[] clientHubs,
string loadMonitorHub)
string loadMonitorHub,
CancellationToken shutdownToken)
{
this.connectionInfo = connectionInfo;
this.partitionHub = partitionHub;
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
this.shutdownToken = shutdownToken;
}
public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";
@ -62,30 +65,43 @@ namespace DurableTask.Netherite.EventHubsTransport
this.EnsureLoadMonitorAsync());
}
public async Task StopAsync()
public Task StopAsync()
{
IEnumerable<EventHubClient> Clients()
return Task.WhenAll(
this.StopClientClients(),
this.StopPartitionClients(),
this.StopLoadMonitorClients()
);
}
async Task StopClientClients()
{
await Task.WhenAll(this._clientSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
if (this.clientClients != null)
{
if (this.partitionClient != null)
{
yield return this.partitionClient;
}
if (this.clientClients != null)
{
foreach (var client in this.clientClients)
{
yield return client;
}
}
if (this.loadMonitorHub != null)
{
yield return this.loadMonitorClient;
}
await Task.WhenAll(this.clientClients.Select(client => client.CloseAsync()).ToList());
}
}
await Task.WhenAll(Clients().Select(client => client.CloseAsync()).ToList());
async Task StopPartitionClients()
{
await Task.WhenAll(this._partitionSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
if (this.partitionClient != null)
{
await this.partitionClient.CloseAsync();
}
}
async Task StopLoadMonitorClients()
{
await Task.WhenAll(this._loadMonitorSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());
if (this.loadMonitorHub != null)
{
await this.loadMonitorClient.CloseAsync();
}
}
const int EventHubCreationRetries = 5;
@ -258,7 +274,7 @@ namespace DurableTask.Netherite.EventHubsTransport
}
public EventHubsSender<PartitionUpdateEvent> GetPartitionSender(int partitionId, byte[] taskHubGuid)
public EventHubsSender<PartitionUpdateEvent> GetPartitionSender(int partitionId, byte[] taskHubGuid, NetheriteOrchestrationServiceSettings settings)
{
return this._partitionSenders.GetOrAdd(partitionId, (key) => {
(EventHubClient client, string id) = this.partitionPartitions[partitionId];
@ -267,13 +283,15 @@ namespace DurableTask.Netherite.EventHubsTransport
this.Host,
taskHubGuid,
partitionSender,
this.TraceHelper);
this.shutdownToken,
this.TraceHelper,
settings);
this.TraceHelper.LogDebug("Created PartitionSender {sender} from {clientId}", partitionSender.ClientId, client.ClientId);
return sender;
});
}
public EventHubsClientSender GetClientSender(Guid clientId, byte[] taskHubGuid)
public EventHubsClientSender GetClientSender(Guid clientId, NetheriteOrchestrationServiceSettings settings)
{
return this._clientSenders.GetOrAdd(clientId, (key) =>
{
@ -287,10 +305,11 @@ namespace DurableTask.Netherite.EventHubsTransport
}
var sender = new EventHubsClientSender(
this.Host,
taskHubGuid,
clientId,
partitionSenders,
this.TraceHelper);
this.shutdownToken,
this.TraceHelper,
settings);
return sender;
});
}
@ -304,6 +323,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.Host,
taskHubGuid,
loadMonitorSender,
this.shutdownToken,
this.TraceHelper);
this.TraceHelper.LogDebug("Created LoadMonitorSender {sender} from {clientId}", loadMonitorSender.ClientId, this.loadMonitorClient.ClientId);
return sender;

Просмотреть файл

@ -30,6 +30,8 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly string eventHubPartition;
readonly byte[] taskHubGuid;
readonly uint partitionId;
readonly CancellationToken shutdownToken;
readonly BlobBatchReceiver<PartitionEvent> blobBatchReceiver;
//private uint partitionId;
CancellationTokenSource eventProcessorShutdown;
@ -45,7 +47,7 @@ namespace DurableTask.Netherite.EventHubsTransport
// since EventProcessorHost does not redeliver packets, we need to keep them around until we are sure
// they are processed durably, so we can redeliver them when recycling/recovering a partition
// we make this a concurrent queue so we can remove confirmed events concurrently with receiving new ones
readonly ConcurrentQueue<(PartitionEvent evt, long offset, long seqno)> pendingDelivery;
readonly ConcurrentQueue<(PartitionEvent[] events, long offset, long seqno)> pendingDelivery;
AsyncLock deliveryLock;
// this points to the latest incarnation of this partition; it gets
@ -62,7 +64,7 @@ namespace DurableTask.Netherite.EventHubsTransport
public IPartitionErrorHandler ErrorHandler;
public TransportAbstraction.IPartition Partition;
public Task<PartitionIncarnation> Next;
public long NextPacketToReceive;
public (long seqNo, int batchPos) NextPacketToReceive;
public int SuccessiveStartupFailures;
}
@ -81,7 +83,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.host = host;
this.sender = sender;
this.parameters = parameters;
this.pendingDelivery = new ConcurrentQueue<(PartitionEvent evt, long offset, long seqno)>();
this.pendingDelivery = new ConcurrentQueue<(PartitionEvent[] events, long offset, long seqno)>();
this.partitionContext = partitionContext;
this.settings = settings;
this.eventHubsTransport = eventHubsTransport;
@ -90,9 +92,12 @@ namespace DurableTask.Netherite.EventHubsTransport
this.taskHubGuid = parameters.TaskhubGuid.ToByteArray();
this.partitionId = uint.Parse(this.eventHubPartition);
this.traceHelper = new EventHubsTraceHelper(traceHelper, this.partitionId);
this.shutdownToken = shutdownToken;
string traceContext = $"EventHubsProcessor {this.eventHubName}/{this.eventHubPartition}";
this.blobBatchReceiver = new BlobBatchReceiver<PartitionEvent>(traceContext, this.traceHelper, this.settings, keepUntilConfirmed: true);
var _ = shutdownToken.Register(
() => { var _ = Task.Run(() => this.IdempotentShutdown("shutdownToken", false)); },
() => { var _ = Task.Run(() => this.IdempotentShutdown("shutdownToken", eventHubsTransport.FatalExceptionObserved)); },
useSynchronizationContext: false);
}
@ -120,7 +125,7 @@ namespace DurableTask.Netherite.EventHubsTransport
// this is called after an event has committed (i.e. has been durably persisted in the recovery log).
// so we know we will never need to deliver it again. We remove it from the local buffer, and also checkpoint
// with EventHubs occasionally.
while (this.pendingDelivery.TryPeek(out var front) && front.evt.NextInputQueuePosition <= ((PartitionEvent)evt).NextInputQueuePosition)
while (this.pendingDelivery.TryPeek(out var front) && front.events[front.events.Length - 1].NextInputQueuePosition <= ((PartitionEvent)evt).NextInputQueuePosition)
{
if (this.pendingDelivery.TryDequeue(out var candidate))
{
@ -224,10 +229,14 @@ namespace DurableTask.Netherite.EventHubsTransport
using (await this.deliveryLock.LockAsync())
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} checking for packets requiring redelivery (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, c.Incarnation);
var batch = this.pendingDelivery.Select(triple => triple.Item1).Where(evt => evt.NextInputQueuePosition > c.NextPacketToReceive).ToList();
var batch = this.pendingDelivery
.SelectMany(x => x.Item1)
.Where(evt => (evt.NextInputQueuePosition, evt.NextInputQueueBatchPosition).CompareTo(c.NextPacketToReceive) > 0)
.ToList();
if (batch.Count > 0)
{
c.NextPacketToReceive = batch[batch.Count - 1].NextInputQueuePosition;
var lastInBatch = batch[batch.Count - 1];
c.NextPacketToReceive = (lastInBatch.NextInputQueuePosition, lastInBatch.NextInputQueueBatchPosition);
c.Partition.SubmitEvents(batch);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} redelivered {batchsize} packets, starting with #{seqno}, next expected packet is #{nextSeqno} (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, batch.Count, batch[0].NextInputQueuePosition - 1, c.NextPacketToReceive, c.Incarnation);
}
@ -244,7 +253,7 @@ namespace DurableTask.Netherite.EventHubsTransport
// the partition startup was canceled
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} canceled partition startup (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, c.Incarnation);
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
c.SuccessiveStartupFailures = 1 + (prior?.SuccessiveStartupFailures ?? 0);
c.ErrorHandler.HandleError("EventHubsProcessor.StartPartitionAsync", "failed to start partition", e, true, false);
@ -329,11 +338,16 @@ namespace DurableTask.Netherite.EventHubsTransport
await context.CheckpointAsync(checkpoint);
this.lastCheckpointedOffset = long.Parse(checkpoint.Offset);
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
// updating EventHubs checkpoints has been known to fail occasionally due to leases shifting around; since it is optional anyway
// we don't want this exception to cause havoc
this.traceHelper.LogWarning("EventHubsProcessor {eventHubName}/{eventHubPartition} failed to checkpoint receive position: {e}", this.eventHubName, this.eventHubPartition, e);
if (Utils.IsFatal(e))
{
this.host.OnFatalExceptionObserved(e);
}
}
}
}
@ -375,9 +389,9 @@ namespace DurableTask.Netherite.EventHubsTransport
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> packets)
{
var first = packets.FirstOrDefault();
long sequenceNumber = first?.SystemProperties.SequenceNumber ?? 0;
long firstSequenceNumber = first?.SystemProperties.SequenceNumber ?? 0;
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is receiving events starting with #{seqno}", this.eventHubName, this.eventHubPartition, sequenceNumber);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is receiving events starting with #{seqno}", this.eventHubName, this.eventHubPartition, firstSequenceNumber);
PartitionIncarnation current = await this.currentIncarnation;
@ -402,99 +416,82 @@ namespace DurableTask.Netherite.EventHubsTransport
this.lastCheckpointedOffset = first == null ? null : long.Parse(first.SystemProperties.Offset);
// we may be missing packets if the service was down for longer than EH retention
if (sequenceNumber > current.NextPacketToReceive)
if (firstSequenceNumber > current.NextPacketToReceive.seqNo)
{
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition} missing packets in sequence, #{seqno} instead of #{expected}. Initiating recovery via delete and restart.", this.eventHubName, this.eventHubPartition, sequenceNumber, current.NextPacketToReceive);
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition} missing packets in sequence, #{seqno} instead of #{expected}. Initiating recovery via delete and restart.", this.eventHubName, this.eventHubPartition, firstSequenceNumber, current.NextPacketToReceive);
await this.eventHubsTransport.ExitProcess(true);
}
}
try
{
var batch = new List<PartitionEvent>();
var receivedTimestamp = current.Partition.CurrentTimeMs;
int totalEvents = 0;
Stopwatch stopwatch = Stopwatch.StartNew();
using (await this.deliveryLock.LockAsync()) // must prevent rare race with a partition that is currently restarting. Contention is very unlikely.
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is processing packets (incarnation {seqno})", this.eventHubName, this.eventHubPartition, current.Incarnation);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) is processing packets", this.eventHubName, this.eventHubPartition, current.Incarnation);
foreach (var eventData in packets)
await foreach ((EventData eventData, PartitionEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, current.ErrorHandler.Token, current.NextPacketToReceive.seqNo))
{
var seqno = eventData.SystemProperties.SequenceNumber;
if (seqno == current.NextPacketToReceive)
for (int i = 0; i < events.Length; i++)
{
PartitionEvent partitionEvent = null;
PartitionEvent evt = events[i];
try
if (i < events.Length - 1)
{
Packet.Deserialize(eventData.Body, out partitionEvent, this.taskHubGuid);
}
catch (Exception)
{
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition} could not deserialize packet #{seqno} ({size} bytes)", this.eventHubName, this.eventHubPartition, seqno, eventData.Body.Count);
throw;
}
current.NextPacketToReceive = seqno + 1;
if (partitionEvent != null)
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition} received packet #{seqno} ({size} bytes) {event}", this.eventHubName, this.eventHubPartition, seqno, eventData.Body.Count, partitionEvent);
evt.NextInputQueuePosition = seqNo;
evt.NextInputQueueBatchPosition = i + 1;
}
else
{
this.traceHelper.LogWarning("EventHubsProcessor {eventHubName}/{eventHubPartition} ignored packet #{seqno} for different taskhub", this.eventHubName, this.eventHubPartition, seqno);
continue;
evt.NextInputQueuePosition = seqNo + 1;
}
partitionEvent.NextInputQueuePosition = current.NextPacketToReceive;
batch.Add(partitionEvent);
this.pendingDelivery.Enqueue((partitionEvent, long.Parse(eventData.SystemProperties.Offset), eventData.SystemProperties.SequenceNumber));
DurabilityListeners.Register(partitionEvent, this);
partitionEvent.ReceivedTimestamp = current.Partition.CurrentTimeMs;
//partitionEvent.ReceivedTimestampUnixMs = DateTimeOffset.Now.ToUnixTimeMilliseconds();
if (this.traceHelper.IsEnabled(LogLevel.Trace))
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received packet #{seqno}.{subSeqNo} {event} id={eventId}", this.eventHubName, this.eventHubPartition, current.Incarnation, seqNo, i, evt, evt.EventIdString);
}
// Output the time it took for the event to go through eventhubs.
//if (partitionEvent.SentTimestampUnixMs != 0)
//{
// long duration = partitionEvent.ReceivedTimestampUnixMs - partitionEvent.SentTimestampUnixMs;
// this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} received packet #{seqno} eventId={eventId} with {eventHubsLatencyMs} ms latency", this.eventHubName, this.eventHubPartition, seqno, partitionEvent.EventIdString, duration);
//}
totalEvents++;
}
else if (seqno > current.NextPacketToReceive)
// add events to redelivery queue, and attach durability listener to last update event in the batch
this.pendingDelivery.Enqueue((events, long.Parse(eventData.SystemProperties.Offset), eventData.SystemProperties.SequenceNumber));
for (int i = events.Length - 1; i >= 0; i--)
{
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition} received wrong packet, #{seqno} instead of #{expected}", this.eventHubName, this.eventHubPartition, seqno, current.NextPacketToReceive);
// this should never happen, as EventHubs guarantees in-order delivery of packets
throw new InvalidOperationException("EventHubs Out-Of-Order Packet");
if (events[i] is PartitionUpdateEvent partitionUpdateEvent)
{
DurabilityListeners.Register(partitionUpdateEvent, this);
}
}
if (current.NextPacketToReceive.batchPos == 0)
{
current.Partition.SubmitEvents(events);
}
else
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition} discarded packet #{seqno} because it is already processed", this.eventHubName, this.eventHubPartition, seqno);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) skipping {batchPos} events in batch #{seqno} because they are already processed", this.eventHubName, this.eventHubPartition, current.Incarnation, current.NextPacketToReceive.batchPos, seqNo);
current.Partition.SubmitEvents(events.Skip(current.NextPacketToReceive.batchPos).ToList());
}
current.NextPacketToReceive = (seqNo + 1, 0);
}
}
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} finished processing packets", this.eventHubName, this.eventHubPartition);
if (batch.Count > 0)
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} received batch of {batchsize} packets, starting with #{seqno}, next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, batch.Count, batch[0].NextInputQueuePosition - 1, current.NextPacketToReceive);
current.Partition.SubmitEvents(batch);
}
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received {totalEvents} events in {latencyMs:F2}ms, starting with #{seqno}, next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, current.Incarnation, totalEvents, stopwatch.Elapsed.TotalMilliseconds, firstSequenceNumber, current.NextPacketToReceive.seqNo);
await this.SaveEventHubsReceiverCheckpoint(context, 600000);
// can use this for testing: terminates partition after every one packet received, but
// that packet is then processed once the partition recovers, so in the end there is progress
// throw new InvalidOperationException("error injection");
}
catch (OperationCanceledException)
{
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} was terminated", this.eventHubName, this.eventHubPartition);
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) was terminated", this.eventHubName, this.eventHubPartition, current.Incarnation);
}
catch (Exception exception)
{
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition} encountered an exception while processing packets : {exception}", this.eventHubName, this.eventHubPartition, exception);
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) encountered an exception while processing packets : {exception}", this.eventHubName, this.eventHubPartition, current.Incarnation, exception);
current?.ErrorHandler.HandleError("IEventProcessor.ProcessEventsAsync", "Encountered exception while processing events", exception, true, false);
}
}

Просмотреть файл

@ -3,39 +3,42 @@
namespace DurableTask.Netherite.EventHubsTransport
{
using DurableTask.Core.Common;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Consumer;
using DurableTask.Core.Common;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
class EventHubsSender<T> : BatchWorker<Event> where T: Event
class EventHubsSender<T> : BatchWorker<Event> where T : Event
{
readonly PartitionSender sender;
readonly TransportAbstraction.IHost host;
readonly byte[] taskHubGuid;
readonly byte[] guid;
readonly EventHubsTraceHelper traceHelper;
readonly EventHubsTraceHelper lowestTraceLevel;
readonly string eventHubName;
readonly string eventHubPartition;
readonly TimeSpan backoff = TimeSpan.FromSeconds(5);
int maxMessageSize = 900 * 1024; // we keep this slightly below the official limit since we have observed exceptions
int maxFragmentSize => this.maxMessageSize / 2; // we keep this lower than maxMessageSize because of serialization overhead
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();
readonly BlobBatchSender blobBatchSender;
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public EventHubsSender(TransportAbstraction.IHost host, byte[] guid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
this.guid = guid;
this.sender = sender;
this.traceHelper = traceHelper;
this.lowestTraceLevel = traceHelper.IsEnabled(LogLevel.Trace) ? traceHelper : null;
this.eventHubName = this.sender.EventHubClient.EventHubName;
this.eventHubPartition = this.sender.PartitionId;
this.blobBatchSender = new BlobBatchSender($"EventHubsSender {this.eventHubName}/{this.eventHubPartition}", this.traceHelper, settings);
}
protected override async Task Process(IList<Event> toSend)
@ -50,105 +53,130 @@ namespace DurableTask.Netherite.EventHubsTransport
var maybeSent = -1;
Exception senderException = null;
EventDataBatch CreateBatch() => this.sender.CreateBatch(new BatchOptions() { MaxMessageSize = maxMessageSize });
// track current position in toSend
int index = 0;
try
// track offsets of the packets in the stream
// since the first offset is always zero, there is one fewer than the number of packets
List<int> packetOffsets = new List<int>(Math.Min(toSend.Count, this.blobBatchSender.MaxBlobBatchEvents) - 1);
void CollectBatchContent(bool specificallyForBlob)
{
var batch = CreateBatch();
int maxEvents = specificallyForBlob ? this.blobBatchSender.MaxBlobBatchEvents : this.blobBatchSender.MaxEventHubsBatchEvents;
int maxBytes = specificallyForBlob ? this.blobBatchSender.MaxBlobBatchBytes : this.blobBatchSender.MaxEventHubsBatchBytes;
async Task SendBatch(int lastPosition)
this.stream.Seek(0, SeekOrigin.Begin);
packetOffsets.Clear();
for (; index < toSend.Count && index < maxEvents && this.stream.Position < maxBytes; index++)
{
maybeSent = lastPosition;
this.stopwatch.Restart();
await this.sender.SendAsync(batch).ConfigureAwait(false);
this.stopwatch.Stop();
sentSuccessfully = lastPosition;
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent batch of {numPackets} packets ({size} bytes) in {latencyMs:F2}ms, throughput={throughput:F2}MB/s", this.eventHubName, this.eventHubPartition, batch.Count, batch.Size, this.stopwatch.Elapsed.TotalMilliseconds, batch.Size/(1024*1024*this.stopwatch.Elapsed.TotalSeconds));
batch.Dispose();
}
for (int i = 0; i < toSend.Count; i++)
{
long startPos = this.stream.Position;
var evt = toSend[i];
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} is sending event {evt} id={eventId}", this.eventHubName, this.eventHubPartition, evt, evt.EventIdString);
Packet.Serialize(evt, this.stream, this.taskHubGuid);
int length = (int)(this.stream.Position - startPos);
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), (int)startPos, length);
var eventData = new EventData(arraySegment);
bool tooBig = length > this.maxFragmentSize;
if (!tooBig && batch.TryAdd(eventData))
int currentOffset = (int) this.stream.Position;
if (currentOffset > 0)
{
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} added packet to batch ({size} bytes) {evt} id={eventId}", this.eventHubName, this.eventHubPartition, eventData.Body.Count, evt, evt.EventIdString);
continue;
packetOffsets.Add(currentOffset);
}
var evt = toSend[index];
this.lowestTraceLevel?.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} is sending event {evt} id={eventId}", this.eventHubName, this.eventHubPartition, evt, evt.EventIdString);
if (!specificallyForBlob)
{
Packet.Serialize(evt, this.stream, this.guid);
}
else
{
if (batch.Count > 0)
{
// send the batch we have so far
await SendBatch(i - 1);
// we don't need to include the task hub guid if the event is sent via blob
Packet.Serialize(evt, this.stream);
}
}
}
// create a fresh batch
batch = CreateBatch();
}
try
{
// unless the total number of events is above the max already, we always check first if we can avoid using a blob
bool usingBlobBatches = toSend.Count > this.blobBatchSender.MaxEventHubsBatchEvents;
if (tooBig)
while (index < toSend.Count)
{
CollectBatchContent(usingBlobBatches);
if (!usingBlobBatches)
{
if (index == toSend.Count
&& index <= this.blobBatchSender.MaxEventHubsBatchEvents
&& this.stream.Position <= this.blobBatchSender.MaxEventHubsBatchBytes)
{
// the message is too big. Break it into fragments, and send each individually.
Guid groupId = Guid.NewGuid();
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} fragmenting large event ({size} bytes) id={eventId} groupId={group:N}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString, groupId);
var fragments = FragmentationAndReassembly.Fragment(arraySegment, evt, groupId, this.maxFragmentSize);
maybeSent = i;
for (int k = 0; k < fragments.Count; k++)
// we don't have a lot of bytes or messages to send
// send them all in a single EH batch
using var batch = this.sender.CreateBatch();
long maxPosition = this.stream.Position;
this.stream.Seek(0, SeekOrigin.Begin);
var buffer = this.stream.GetBuffer();
for (int j = 0; j < index; j++)
{
//TODO send bytes directly instead of as events (which causes significant space overhead)
this.stream.Seek(0, SeekOrigin.Begin);
var fragment = fragments[k];
Packet.Serialize((Event)fragment, this.stream, this.taskHubGuid);
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sending fragment {index}/{total} ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, k, fragments.Count, length, ((Event)fragment).EventIdString);
length = (int)this.stream.Position;
await this.sender.SendAsync(new EventData(new ArraySegment<byte>(this.stream.GetBuffer(), 0, length))).ConfigureAwait(false);
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent fragment {index}/{total} ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, k, fragments.Count, length, ((Event)fragment).EventIdString);
int offset = j == 0 ? 0 : packetOffsets[j - 1];
int nextOffset = j < packetOffsets.Count ? packetOffsets[j] : (int) maxPosition;
var length = nextOffset - offset;
var arraySegment = new ArraySegment<byte>(buffer, offset, length);
var eventData = new EventData(arraySegment);
if (batch.TryAdd(eventData))
{
Event evt = toSend[j];
this.lowestTraceLevel?.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} added packet to batch offset={offset} length={length} {evt} id={eventId}", this.eventHubName, this.eventHubPartition, offset, length, evt, evt.EventIdString);
}
else
{
throw new InvalidOperationException("could not add event to batch"); // should never happen as max send size is very small
}
}
sentSuccessfully = i;
maybeSent = index - 1;
this.stopwatch.Restart();
await this.sender.SendAsync(batch).ConfigureAwait(false);
this.stopwatch.Stop();
sentSuccessfully = index - 1;
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent batch of {numPackets} packets ({size} bytes) in {latencyMs:F2}ms, throughput={throughput:F2}MB/s", this.eventHubName, this.eventHubPartition, batch.Count, batch.Size, this.stopwatch.Elapsed.TotalMilliseconds, batch.Size / (1024 * 1024 * this.stopwatch.Elapsed.TotalSeconds));
break; // all messages were sent
}
else
{
// back up one
i--;
usingBlobBatches = true;
}
// the buffer can be reused now
this.stream.Seek(0, SeekOrigin.Begin);
}
}
if (batch.Count > 0)
{
await SendBatch(toSend.Count - 1);
// the buffer can be reused now
this.stream.Seek(0, SeekOrigin.Begin);
// send the event(s) as a blob batch
this.stopwatch.Restart();
EventData blobMessage = await this.blobBatchSender.UploadEventsAsync(this.stream, packetOffsets, this.guid, this.cancellationToken);
maybeSent = index - 1;
await this.sender.SendAsync(blobMessage);
this.stopwatch.Stop();
sentSuccessfully = index - 1;
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent blob-batch of {numPackets} packets ({size} bytes) in {latencyMs:F2}ms, throughput={throughput:F2}MB/s", this.eventHubName, this.eventHubPartition, packetOffsets.Count + 1, this.stream.Position, this.stopwatch.Elapsed.TotalMilliseconds, this.stream.Position / (1024 * 1024 * this.stopwatch.Elapsed.TotalSeconds));
}
}
catch(Microsoft.Azure.EventHubs.MessageSizeExceededException)
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
this.maxMessageSize = 200 * 1024;
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send due to message size, reducing to {maxMessageSize}kB",
this.eventHubName, this.eventHubPartition, this.maxMessageSize / 1024);
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send: {e}", this.eventHubName, this.eventHubPartition, e);
senderException = e;
if (Utils.IsFatal(e))
{
this.host.OnFatalExceptionObserved(e);
}
}
finally
{
// we don't need the contents of the stream anymore.
this.stream.SetLength(0);
this.stream.SetLength(0);
}
// Confirm all sent events, and retry or report maybe-sent ones
@ -198,10 +226,15 @@ namespace DurableTask.Netherite.EventHubsTransport
else
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} has confirmed {confirmed}, requeued {requeued}, dropped {dropped} outbound events", this.eventHubName, this.eventHubPartition, confirmed, requeued, dropped);
}
catch (Exception exception) when (!Utils.IsFatal(exception))
catch (Exception exception)
{
this.traceHelper.LogError("EventHubsSender {eventHubName}/{eventHubPartitionId} encountered an error while trying to confirm messages: {exception}", this.eventHubName, this.eventHubPartition, exception);
if (Utils.IsFatal(exception))
{
this.host.OnFatalExceptionObserved(exception);
}
}
}
}
}
}

Просмотреть файл

@ -17,6 +17,7 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Linq;
using System.Threading.Channels;
using DurableTask.Netherite.Abstractions;
using System.Diagnostics;
/// <summary>
/// The EventHubs transport implementation.
@ -31,6 +32,7 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly ILogger logger;
readonly EventHubsTraceHelper traceHelper;
readonly IStorageLayer storage;
readonly string shortClientId;
EventProcessorHost eventProcessorHost;
EventProcessorHost loadMonitorHost;
@ -51,9 +53,14 @@ namespace DurableTask.Netherite.EventHubsTransport
CloudBlockBlob partitionScript;
ScriptedEventProcessorHost scriptedEventProcessorHost;
int shutdownTriggered;
public Guid ClientId { get; private set; }
public string Fingerprint => this.connections.Fingerprint;
public bool FatalExceptionObserved { get; private set; }
public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestrationServiceSettings settings, IStorageLayer storage, ILoggerFactory loggerFactory)
{
if (storage is MemoryStorageLayer)
@ -68,6 +75,7 @@ namespace DurableTask.Netherite.EventHubsTransport
this.logger = EventHubsTraceHelper.CreateLogger(loggerFactory);
this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName);
this.ClientId = Guid.NewGuid();
this.shortClientId = Client.GetShortId(this.ClientId);
}
// these are hardcoded now but we may turn them into settings
@ -102,7 +110,7 @@ namespace DurableTask.Netherite.EventHubsTransport
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
{
Host = host,
TraceHelper = this.traceHelper,
@ -287,16 +295,20 @@ namespace DurableTask.Netherite.EventHubsTransport
}
}
async Task ITransportLayer.StopAsync()
async Task ITransportLayer.StopAsync(bool fatalExceptionObserved)
{
this.traceHelper.LogInformation("EventHubsTransport is shutting down");
this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions
if (Interlocked.CompareExchange(ref this.shutdownTriggered, 1, 0) == 0)
{
this.traceHelper.LogInformation("EventHubsTransport is shutting down");
this.FatalExceptionObserved = fatalExceptionObserved;
this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions
await Task.WhenAll(
this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask,
this.StopClientsAndConnectionsAsync());
await Task.WhenAll(
this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask,
this.StopClientsAndConnectionsAsync());
this.traceHelper.LogInformation("EventHubsTransport is shut down");
this.traceHelper.LogInformation("EventHubsTransport is shut down");
}
}
async Task StopWorkersAsync()
@ -352,13 +364,13 @@ namespace DurableTask.Netherite.EventHubsTransport
{
case ClientEvent clientEvent:
var clientId = clientEvent.ClientId;
var clientSender = this.connections.GetClientSender(clientEvent.ClientId, this.taskhubGuid);
var clientSender = this.connections.GetClientSender(clientEvent.ClientId, this.settings);
clientSender.Submit(clientEvent);
break;
case PartitionEvent partitionEvent:
var partitionId = partitionEvent.PartitionId;
var partitionSender = this.connections.GetPartitionSender((int) partitionId, this.taskhubGuid);
var partitionSender = this.connections.GetPartitionSender((int) partitionId, this.taskhubGuid, this.settings);
partitionSender.Submit(partitionEvent);
break;
@ -376,15 +388,15 @@ namespace DurableTask.Netherite.EventHubsTransport
{
try
{
this.traceHelper.LogDebug("Client.{clientId}.ch{index} establishing connection", Client.GetShortId(this.ClientId), index);
this.traceHelper.LogDebug("Client.{clientId}.ch{index} establishing connection", this.shortClientId, index);
// receive a dummy packet to establish connection
// (the packet, if any, cannot be for this receiver because it is fresh)
await receiver.ReceiveAsync(1, TimeSpan.FromMilliseconds(1));
this.traceHelper.LogDebug("Client.{clientId}.ch{index} connection established", Client.GetShortId(this.ClientId), index);
this.traceHelper.LogDebug("Client.{clientId}.ch{index} connection established", this.shortClientId, index);
}
catch (Exception exception)
{
this.traceHelper.LogError("Client.{clientId}.ch{index} could not connect: {exception}", Client.GetShortId(this.ClientId), index, exception);
this.traceHelper.LogError("Client.{clientId}.ch{index} could not connect: {exception}", this.shortClientId, index, exception);
throw;
}
}
@ -393,21 +405,23 @@ namespace DurableTask.Netherite.EventHubsTransport
{
try
{
byte[] taskHubGuid = this.parameters.TaskhubGuid.ToByteArray();
byte[] clientGuid = this.ClientId.ToByteArray();
TimeSpan longPollingInterval = TimeSpan.FromMinutes(1);
var backoffDelay = TimeSpan.Zero;
string context = $"Client{this.shortClientId}.ch{index}";
var blobBatchReceiver = new BlobBatchReceiver<ClientEvent>(context, this.traceHelper, this.settings, keepUntilConfirmed: false);
await this.clientConnectionsEstablished[index];
while (!this.shutdownSource.IsCancellationRequested)
{
IEnumerable<EventData> eventData;
IEnumerable<EventData> packets;
try
{
this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", Client.GetShortId(this.ClientId), index);
this.traceHelper.LogTrace("Client{clientId}.ch{index} waiting for new packets", this.shortClientId, index);
eventData = await receiver.ReceiveAsync(1000, longPollingInterval);
packets = await receiver.ReceiveAsync(1000, longPollingInterval);
backoffDelay = TimeSpan.Zero;
}
@ -419,40 +433,43 @@ namespace DurableTask.Netherite.EventHubsTransport
}
// if we lose access to storage temporarily, we back off, but don't quit
this.traceHelper.LogError("Client.{clientId}.ch{index} backing off for {backoffDelay} after error in receive loop: {exception}", Client.GetShortId(this.ClientId), index, backoffDelay, exception);
this.traceHelper.LogError("Client.{clientId}.ch{index} backing off for {backoffDelay} after error in receive loop: {exception}", this.shortClientId, index, backoffDelay, exception);
await Task.Delay(backoffDelay);
continue; // retry
}
if (eventData != null)
if (packets != null)
{
foreach (var ed in eventData)
{
this.shutdownSource.Token.ThrowIfCancellationRequested();
ClientEvent clientEvent = null;
int totalEvents = 0;
var stopwatch = Stopwatch.StartNew();
try
await foreach ((EventData eventData, ClientEvent[] events, long seqNo) in blobBatchReceiver.ReceiveEventsAsync(clientGuid, packets, this.shutdownSource.Token))
{
for (int i = 0; i < events.Length; i++)
{
this.traceHelper.LogDebug("Client.{clientId}.ch{index} received packet #{seqno} ({size} bytes)", Client.GetShortId(this.ClientId), index, ed.SystemProperties.SequenceNumber, ed.Body.Count);
Packet.Deserialize(ed.Body, out clientEvent, taskHubGuid);
var clientEvent = events[i];
clientEvent.ReceiveChannel = index;
if (clientEvent != null && clientEvent.ClientId == this.ClientId)
if (clientEvent.ClientId == this.ClientId)
{
this.traceHelper.LogTrace("Client.{clientId}.ch{index} receiving event {evt} id={eventId}]", Client.GetShortId(this.ClientId), index, clientEvent, clientEvent.EventIdString);
this.traceHelper.LogTrace("Client.{clientId}.ch{index} receiving packet #{seqno}.{subSeqNo} {event} id={eventId}", this.shortClientId, index, seqNo, i, clientEvent, clientEvent.EventIdString);
await channelWriter.WriteAsync(clientEvent, this.shutdownSource.Token);
totalEvents++;
}
else
{
this.traceHelper.LogError("Client.{clientId}.ch{index} received packet #{seqno}.{subSeqNo} for client {otherClient}", this.shortClientId, index, seqNo, i, Client.GetShortId(clientEvent.ClientId));
}
}
catch (Exception)
{
this.traceHelper.LogError("Client.{clientId}.ch{index} could not deserialize packet #{seqno} ({size} bytes)", Client.GetShortId(this.ClientId), index, ed.SystemProperties.SequenceNumber, ed.Body.Count);
}
}
this.traceHelper.LogDebug("Client{clientId}.ch{index} received {totalEvents} events in {latencyMs:F2}ms", this.shortClientId, index, totalEvents, stopwatch.Elapsed.TotalMilliseconds);
}
else
{
this.traceHelper.LogTrace("Client{clientId}.ch{index} no new packets for last {longPollingInterval}", Client.GetShortId(this.ClientId), index, longPollingInterval);
this.traceHelper.LogTrace("Client{clientId}.ch{index} no new packets for last {longPollingInterval}", this.shortClientId, index, longPollingInterval);
}
}
}
@ -462,11 +479,11 @@ namespace DurableTask.Netherite.EventHubsTransport
}
catch (Exception exception)
{
this.traceHelper.LogError("Client.{clientId}.ch{index} event processing exception: {exception}", Client.GetShortId(this.ClientId), index, exception);
this.traceHelper.LogError("Client.{clientId}.ch{index} event processing exception: {exception}", this.shortClientId, index, exception);
}
finally
{
this.traceHelper.LogInformation("Client.{clientId}.ch{index} event processing terminated", Client.GetShortId(this.ClientId), index);
this.traceHelper.LogInformation("Client.{clientId}.ch{index} exits receive loop", this.shortClientId, index);
}
}
@ -486,11 +503,11 @@ namespace DurableTask.Netherite.EventHubsTransport
}
catch(Exception exception)
{
this.traceHelper.LogError("Client.{clientId} event processing exception: {exception}", Client.GetShortId(this.ClientId), exception);
this.traceHelper.LogError("Client.{clientId} event processing exception: {exception}", this.shortClientId, exception);
}
finally
{
this.traceHelper.LogInformation("Client.{clientId} event processing terminated", Client.GetShortId(this.ClientId));
this.traceHelper.LogInformation("Client.{clientId} exits process loop", this.shortClientId);
}
}
}

Просмотреть файл

@ -29,8 +29,13 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly string eventHubPartition;
readonly byte[] taskHubGuid;
readonly uint partitionId;
readonly CancellationToken shutdownToken;
readonly BlobBatchReceiver<LoadMonitorEvent> blobBatchReceiver;
TransportAbstraction.ILoadMonitor loadMonitor;
DateTime lastGarbageCheck = DateTime.MinValue;
readonly static TimeSpan GarbageCheckFrequency = TimeSpan.FromMinutes(30);
public LoadMonitorProcessor(
TransportAbstraction.IHost host,
@ -51,21 +56,24 @@ namespace DurableTask.Netherite.EventHubsTransport
this.taskHubGuid = parameters.TaskhubGuid.ToByteArray();
this.partitionId = uint.Parse(this.eventHubPartition);
this.traceHelper = new EventHubsTraceHelper(traceHelper, this.partitionId);
this.shutdownToken = shutdownToken;
this.blobBatchReceiver = new BlobBatchReceiver<LoadMonitorEvent>("LoadMonitor", traceHelper, settings, keepUntilConfirmed: false);
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} is opening", this.eventHubName, this.eventHubPartition);
this.traceHelper.LogInformation("LoadMonitor is opening", this.eventHubName, this.eventHubPartition);
this.loadMonitor = this.host.AddLoadMonitor(this.parameters.TaskhubGuid, this.sender);
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} opened", this.eventHubName, this.eventHubPartition);
this.traceHelper.LogInformation("LoadMonitor opened", this.eventHubName, this.eventHubPartition);
this.PeriodicGarbageCheck();
return Task.CompletedTask;
}
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} is closing", this.eventHubName, this.eventHubPartition);
this.traceHelper.LogInformation("LoadMonitor is closing", this.eventHubName, this.eventHubPartition);
await this.loadMonitor.StopAsync();
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} closed", this.eventHubName, this.eventHubPartition);
this.traceHelper.LogInformation("LoadMonitor closed", this.eventHubName, this.eventHubPartition);
}
Task IEventProcessor.ProcessErrorAsync(PartitionContext context, Exception exception)
@ -84,67 +92,67 @@ namespace DurableTask.Netherite.EventHubsTransport
break;
}
this.traceHelper.Log(logLevel, "EventHubsProcessor {eventHubName}/{eventHubPartition} received internal error indication from EventProcessorHost: {exception}", this.eventHubName, this.eventHubPartition, exception);
this.traceHelper.Log(logLevel, "LoadMonitor received internal error indication from EventProcessorHost: {exception}", exception);
return Task.CompletedTask;
}
Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> packets)
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> packets)
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition} receiving #{seqno}", this.eventHubName, this.eventHubPartition, packets.First().SystemProperties.SequenceNumber);
this.traceHelper.LogTrace("LoadMonitor receiving #{seqno}", packets.First().SystemProperties.SequenceNumber);
try
{
EventData last = null;
int count = 0;
int ignored = 0;
foreach (var eventData in packets)
int totalEvents = 0;
var stopwatch = Stopwatch.StartNew();
await foreach ((EventData eventData, LoadMonitorEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, this.shutdownToken))
{
var seqno = eventData.SystemProperties.SequenceNumber;
LoadMonitorEvent loadMonitorEvent = null;
last = eventData;
try
for (int i = 0; i < events.Length; i++)
{
Packet.Deserialize(eventData.Body, out loadMonitorEvent, this.taskHubGuid);
}
catch (Exception)
{
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition} could not deserialize packet #{seqno} ({size} bytes)", this.eventHubName, this.eventHubPartition, seqno, eventData.Body.Count);
throw;
}
if (loadMonitorEvent != null)
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition} received packet #{seqno} ({size} bytes) {event}", this.eventHubName, this.eventHubPartition, seqno, eventData.Body.Count, loadMonitorEvent);
count++;
var loadMonitorEvent = events[i];
this.traceHelper.LogTrace("LoadMonitor receiving packet #{seqno}.{subSeqNo} {event} id={eventId}", seqNo, i, loadMonitorEvent, loadMonitorEvent.EventIdString);
this.loadMonitor.Process(loadMonitorEvent);
totalEvents++;
}
else
{
this.traceHelper.LogWarning("EventHubsProcessor {eventHubName}/{eventHubPartition} ignored packet #{seqno} for different taskhub", this.eventHubName, this.eventHubPartition, seqno);
ignored++;
continue;
}
if (context.CancellationToken.IsCancellationRequested)
{
break;
}
last = eventData;
}
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} received batch of {batchsize} packets, through #{seqno}", this.eventHubName, this.eventHubPartition, count, last.SystemProperties.SequenceNumber);
this.traceHelper.LogDebug("LoadMonitor received {totalEvents} events in {latencyMs:F2}ms, through #{seqno}", totalEvents, stopwatch.Elapsed.TotalMilliseconds, last.SystemProperties.SequenceNumber);
this.PeriodicGarbageCheck();
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (this.shutdownToken.IsCancellationRequested)
{
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} was terminated", this.eventHubName, this.eventHubPartition);
// normal during shutdown
}
catch (Exception exception)
{
this.traceHelper.LogError("EventHubsProcessor {eventHubName}/{eventHubPartition} encountered an exception while processing packets : {exception}", this.eventHubName, this.eventHubPartition, exception);
this.traceHelper.LogError("LoadMonitor encountered an exception while processing packets : {exception}", exception);
throw;
}
finally
{
this.traceHelper.LogInformation("LoadMonitor exits receive loop");
}
}
return Task.CompletedTask;
void PeriodicGarbageCheck()
{
if (DateTime.UtcNow - this.lastGarbageCheck > GarbageCheckFrequency)
{
this.lastGarbageCheck = DateTime.UtcNow;
Task.Run(async () =>
{
var stopwatch = Stopwatch.StartNew();
int deletedCount = await this.blobBatchReceiver.RemoveGarbageAsync(this.shutdownToken);
this.traceHelper.LogInformation("LoadMonitor removed {deletedCount} expired blob batches in {elapsed:F2}s", deletedCount, stopwatch.Elapsed.TotalSeconds);
});
}
}
}
}

Просмотреть файл

@ -26,8 +26,8 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
@ -89,6 +89,7 @@ namespace DurableTask.Netherite.EventHubsTransport
int length = (int)(this.stream.Position);
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), 0, length);
var eventData = new EventData(arraySegment);
this.cancellationToken.ThrowIfCancellationRequested();
await this.sender.SendAsync(eventData);
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
this.stream.Seek(0, SeekOrigin.Begin);
@ -106,6 +107,12 @@ namespace DurableTask.Netherite.EventHubsTransport
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds);
}
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);

Просмотреть файл

@ -11,6 +11,7 @@ namespace DurableTask.Netherite.EventHubsTransport
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -220,6 +221,7 @@ namespace DurableTask.Netherite.EventHubsTransport
{
readonly uint partitionId;
readonly ScriptedEventProcessorHost host;
readonly BlobBatchReceiver<PartitionEvent> blobBatchReceiver;
TransportAbstraction.IPartition partition;
Task partitionEventLoop;
@ -234,6 +236,8 @@ namespace DurableTask.Netherite.EventHubsTransport
this.partitionId = partitionId;
this.Incarnation = incarnation;
this.host = eventProcessorHost;
string traceContext = $"PartitionInstance {this.host.eventHubPath}/{this.partitionId}({this.Incarnation})";
this.blobBatchReceiver = new BlobBatchReceiver<PartitionEvent>(traceContext, this.host.logger, this.host.settings, keepUntilConfirmed: true);
}
public int Incarnation { get; }
@ -253,7 +257,7 @@ namespace DurableTask.Netherite.EventHubsTransport
var errorHandler = this.host.host.CreateErrorHandler(this.partitionId);
var nextPacketToReceive = await this.partition.CreateOrRestoreAsync(errorHandler, this.host.parameters, this.host.Fingerprint).ConfigureAwait(false);
this.host.logger.LogInformation("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) started partition, next expected packet is #{nextSeqno}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive);
this.host.logger.LogInformation("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) started partition, next expected packet is #{nextSeqno}.{batchPos}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.Item1, nextPacketToReceive.Item2);
this.partitionEventLoop = Task.Run(() => this.PartitionEventLoop(nextPacketToReceive));
}
@ -307,89 +311,81 @@ namespace DurableTask.Netherite.EventHubsTransport
}
// TODO: Update all the logging messages
async Task PartitionEventLoop(long nextPacketToReceive)
async Task PartitionEventLoop((long seqNo, int batchPos) nextPacketToReceive)
{
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) starting receive loop", this.host.eventHubPath, this.partitionId, this.Incarnation);
try
{
this.partitionReceiver = this.host.connections.CreatePartitionReceiver((int)this.partitionId, this.host.consumerGroupName, nextPacketToReceive);
this.partitionReceiver = this.host.connections.CreatePartitionReceiver((int)this.partitionId, this.host.consumerGroupName, nextPacketToReceive.Item1);
List<PartitionEvent> batch = new List<PartitionEvent>();
while (!this.shutdownSource.IsCancellationRequested)
{
this.host.logger.LogTrace("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) trying to receive eventdata from position {position}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive);
this.host.logger.LogTrace("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) trying to receive eventdata from position {position}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.Item1);
IEnumerable<EventData> eventData;
IEnumerable<EventData> hubMessages;
try
{
var receiveTask = this.partitionReceiver.ReceiveAsync(MaxReceiveBatchSize, TimeSpan.FromMinutes(1));
await Task.WhenAny(receiveTask, this.shutdownTask).ConfigureAwait(false);
this.shutdownSource.Token.ThrowIfCancellationRequested();
eventData = await receiveTask.ConfigureAwait(false);
hubMessages = await receiveTask.ConfigureAwait(false);
}
catch (TimeoutException exception)
{
// not sure that we should be seeing this, but we do.
this.host.logger.LogWarning("Retrying after transient(?) TimeoutException in ReceiveAsync {exception}", exception);
eventData = null;
hubMessages = null;
}
if (eventData != null)
if (hubMessages != null)
{
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) received eventdata from position {position}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive);
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) received eventdata from position {position}", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.Item1);
int totalEvents = 0;
Stopwatch stopwatch = Stopwatch.StartNew();
var batch = new List<PartitionEvent>();
var receivedTimestamp = this.partition.CurrentTimeMs;
foreach (var eventDatum in eventData)
await foreach ((EventData eventData, PartitionEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.host.taskHubGuid, hubMessages, this.shutdownSource.Token, nextPacketToReceive.seqNo))
{
var seqno = eventDatum.SystemProperties.SequenceNumber;
if (seqno == nextPacketToReceive)
for (int i = 0; i < events.Length; i++)
{
PartitionEvent partitionEvent = null;
try
{
Packet.Deserialize(eventDatum.Body, out partitionEvent, this.host.taskHubGuid);
}
catch (Exception)
{
this.host.logger.LogError("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) could not deserialize packet #{seqno} ({size} bytes)", this.host.eventHubPath, this.partitionId, this.Incarnation, seqno, eventDatum.Body.Count);
throw;
}
PartitionEvent evt = events[i];
nextPacketToReceive = seqno + 1;
if (partitionEvent != null)
if (i < events.Length - 1)
{
this.host.logger.LogTrace("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) received packet #{seqno} ({size} bytes) {event} id={eventId}", this.host.eventHubPath, this.partitionId, this.Incarnation, seqno, eventDatum.Body.Count, partitionEvent, partitionEvent.EventIdString);
evt.NextInputQueuePosition = seqNo;
evt.NextInputQueueBatchPosition = i + 1;
}
else
{
this.host.logger.LogWarning("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) ignored packet #{seqno} for different taskhub", this.host.eventHubPath, this.partitionId, this.Incarnation, seqno);
continue;
evt.NextInputQueuePosition = seqNo + 1;
}
partitionEvent.NextInputQueuePosition = nextPacketToReceive;
batch.Add(partitionEvent);
partitionEvent.ReceivedTimestamp = this.partition.CurrentTimeMs;
if (this.host.logger.IsEnabled(LogLevel.Trace))
{
this.host.logger.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received packet #{seqno}.{subSeqNo} {event} id={eventId}", this.host.eventHubPath, this.partitionId, this.Incarnation, seqNo, i, evt, evt.EventIdString);
}
totalEvents++;
}
else if (seqno > nextPacketToReceive)
if (nextPacketToReceive.batchPos == 0)
{
this.host.logger.LogError("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) received wrong packet, #{seqno} instead of #{expected}", this.host.eventHubPath, this.partitionId, this.Incarnation, seqno, nextPacketToReceive);
// this should never happen, as EventHubs guarantees in-order delivery of packets
throw new InvalidOperationException("EventHubs Out-Of-Order Packet");
this.partition.SubmitEvents(events);
}
else
{
this.host.logger.LogTrace("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) discarded packet #{seqno} because it is already processed", this.host.eventHubPath, this.partitionId, this.Incarnation, seqno);
this.host.logger.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) skipping {batchPos} events in batch #{seqno} because they are already processed", this.host.eventHubPath, this.partitionId, this.Incarnation, nextPacketToReceive.batchPos, seqNo);
this.partition.SubmitEvents(events.Skip(nextPacketToReceive.batchPos).ToList());
}
nextPacketToReceive = (seqNo + 1, 0);
}
if (batch.Count > 0 && !this.shutdownSource.IsCancellationRequested)
{
this.host.logger.LogDebug("PartitionInstance {eventHubName}/{eventHubPartition}({incarnation}) received batch of {batchsize} packets, starting with #{seqno}, next expected packet is #{nextSeqno}", this.host.eventHubPath, this.partitionId, this.Incarnation, batch.Count, batch[0].NextInputQueuePosition - 1, nextPacketToReceive);
this.partition.SubmitEvents(batch);
}
this.host.logger.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) received {totalEvents} events in {latencyMs:F2}ms, next expected packet is #{nextSeqno}", this.host.eventHubPath, this.partitionId, this.Incarnation, totalEvents, stopwatch.Elapsed.TotalMilliseconds, nextPacketToReceive.seqNo);
}
}
}

Просмотреть файл

@ -33,7 +33,8 @@ namespace DurableTask.Netherite
/// <summary>
/// Stops the transport backend.
/// </summary>
/// <param name="fatalExceptionObserved">Whether this stop was initiated because we have observed a fatal exception.</param>
/// <returns>After the transport backend has stopped.</returns>
Task StopAsync();
Task StopAsync(bool fatalExceptionObserved);
}
}

Просмотреть файл

@ -87,7 +87,7 @@ namespace DurableTask.Netherite.SingleHostTransport
this.Notify();
};
var nextInputQueuePosition = await this.Partition.CreateOrRestoreAsync(errorHandler, this.parameters, this.fingerPrint);
var (nextInputQueuePosition, _) = await this.Partition.CreateOrRestoreAsync(errorHandler, this.parameters, this.fingerPrint);
while(this.redeliverQueuePosition < nextInputQueuePosition)
{
@ -144,7 +144,7 @@ namespace DurableTask.Netherite.SingleHostTransport
stream.Seek(0, SeekOrigin.Begin);
Packet.Serialize(evt, stream, this.taskhubGuid);
stream.Seek(0, SeekOrigin.Begin);
Packet.Deserialize(stream, out PartitionEvent freshEvent, null);
Packet.Deserialize(stream, out PartitionEvent freshEvent, out _, null);
DurabilityListeners.Register(freshEvent, this);
freshEvent.NextInputQueuePosition = ++position;
list.Add(freshEvent);

Просмотреть файл

@ -95,7 +95,7 @@ namespace DurableTask.Netherite.SingleHostTransport
return Task.CompletedTask;
}
async Task ITransportLayer.StopAsync()
async Task ITransportLayer.StopAsync(bool fatalExceptionObserved)
{
var tasks = new List<Task>();
tasks.Add(this.clientQueue.Client.StopAsync());

Просмотреть файл

@ -39,6 +39,8 @@ namespace DurableTask.Netherite
bool processingBatch;
public TimeSpan? ProcessingBatchSince => this.processingBatch ? this.stopwatch.Elapsed : null;
volatile TaskCompletionSource<object> shutdownCompletionSource;
/// <summary>
/// Constructor including a cancellation token.
/// </summary>
@ -93,6 +95,22 @@ namespace DurableTask.Netherite
return tcs.Task;
}
public Task WaitForShutdownAsync()
{
if (!this.cancellationToken.IsCancellationRequested)
{
throw new InvalidOperationException("must call this only after canceling the token");
}
if (this.shutdownCompletionSource == null)
{
Interlocked.CompareExchange(ref this.shutdownCompletionSource, new TaskCompletionSource<object>(), null);
this.NotifyInternal();
}
return this.shutdownCompletionSource.Task;
}
readonly List<T> batch = new List<T>();
readonly List<TaskCompletionSource<bool>> waiters = new List<TaskCompletionSource<bool>>();
IList<T> requeued = null;
@ -226,6 +244,11 @@ namespace DurableTask.Netherite
this.processingBatch = false;
previousBatch = this.batch.Count;
}
if (this.cancellationToken.IsCancellationRequested)
{
this.shutdownCompletionSource?.TrySetResult(null);
}
}
public void Resume()

Просмотреть файл

@ -73,7 +73,7 @@ namespace DurableTask.Netherite
{
stream.Write(lastFragment.Bytes, 0, lastFragment.Bytes.Length);
stream.Seek(0, SeekOrigin.Begin);
Packet.Deserialize(stream, out TEvent evt, null);
Packet.Deserialize(stream, out TEvent evt, out _, null);
stream.Dispose();
return evt;
}
@ -91,7 +91,7 @@ namespace DurableTask.Netherite
}
stream.Write(lastFragment.Bytes, 0, lastFragment.Bytes.Length);
stream.Seek(0, SeekOrigin.Begin);
Packet.Deserialize(stream, out TEvent evt, null);
Packet.Deserialize(stream, out TEvent evt, out _, null);
return evt;
}
}

Просмотреть файл

@ -26,8 +26,8 @@
<!-- This version MUST be kept constant with DurableTask.Netherite.AzureFunctions -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>5</PatchVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>

Просмотреть файл

@ -4,4 +4,4 @@
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
// This must be updated when updating the version of the package
[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.3.5", true)]
[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.4.0", true)]

Просмотреть файл

@ -24,7 +24,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\DurableTask.Netherite.AzureFunctions\DurableTask.Netherite.AzureFunctions.csproj" />
<ProjectReference Include="..\DurableTask.Netherite.Tests\DurableTask.Netherite.Tests.csproj" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.11.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.13.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -4,9 +4,9 @@
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.7.2" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.8.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.9.6" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="5.1.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.4" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.1.3" />