* intermediate commit

* updates

* fix client path configure await

* fix query implementation of in-memory emulator
This commit is contained in:
Sebastian Burckhardt 2022-11-16 16:56:05 -08:00 коммит произвёл GitHub
Родитель ed721c67f0
Коммит 029991890d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
46 изменённых файлов: 1286 добавлений и 288 удалений

Просмотреть файл

@ -222,7 +222,7 @@ namespace DurableTask.Netherite
}
}
public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<OrchestrationState> instances)
public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<OrchestrationState> instances, DateTime attempt)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying, "query events are never part of the replay");
@ -233,7 +233,7 @@ namespace DurableTask.Netherite
try
{
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, false);
await queryEvent.OnQueryCompleteAsync(instances, this.Partition);
await queryEvent.OnQueryCompleteAsync(instances, this.Partition, attempt);
}
catch (OperationCanceledException)
{

Просмотреть файл

@ -59,15 +59,7 @@ namespace DurableTask.Netherite
/// <summary>
/// Is called on all singleton objects once at the very beginning
/// </summary>
public virtual void OnFirstInitialization()
{
}
/// <summary>
/// Is automatically called on all singleton objects after recovery. Typically used to
/// restart pending activities, timers, tasks and the like.
/// </summary>
public virtual void OnRecoveryCompleted(EffectTracker effects, RecoveryCompleted evt)
public virtual void OnFirstInitialization(Partition partition)
{
}

Просмотреть файл

@ -15,6 +15,9 @@ namespace DurableTask.Netherite
[DataMember]
public long RequestId { get; set; }
[IgnoreDataMember]
public int ReceiveChannel { get; set; }
[IgnoreDataMember]
public override EventId EventId => EventId.MakeClientResponseEventId(this.ClientId, this.RequestId);
}

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
interface IPagedResponse
{
string ContinuationToken { get; }
int Count { get; }
}
}

Просмотреть файл

@ -7,11 +7,17 @@ namespace DurableTask.Netherite
using System.Text;
[DataContract]
class PurgeResponseReceived : ClientEvent
class PurgeResponseReceived : ClientEvent, IPagedResponse
{
[DataMember]
public int NumberInstancesPurged { get; set; }
[DataMember]
public string ContinuationToken { get; set; } // null indicates we have reached the end of all instances in this partition
[IgnoreDataMember]
public int Count => this.NumberInstancesPurged;
protected override void ExtraTraceInformation(StringBuilder s)
{
s.Append(" count=");

Просмотреть файл

@ -8,20 +8,43 @@ namespace DurableTask.Netherite
using System.IO;
using DurableTask.Core;
using System;
using System.Text;
[DataContract]
class QueryResponseReceived : ClientEvent
class QueryResponseReceived : ClientEvent, IPagedResponse
{
// for efficiency, we use a binary representation with custom serialization and deserialization (see below)
[IgnoreDataMember]
public List<OrchestrationState> OrchestrationStates { get; set; }
[DataMember]
public DateTime Attempt { get; set; }
[DataMember]
public int? Final { get; set; }
[DataMember]
byte[] BinaryState { get; set; }
[DataMember]
public string ContinuationToken { get; set; } // null indicates we have reached the end of all instances in this partition
[IgnoreDataMember]
public int Count => this.OrchestrationStates.Count;
protected override void ExtraTraceInformation(StringBuilder s)
{
s.Append(" attempt=");
s.Append(this.Attempt.ToString("o"));
if (this.Final.HasValue)
{
s.Append( " final ");
s.Append(this.Final.Value);
s.Append(' ');
s.Append(this.ContinuationToken ?? "null");
}
}
public void SerializeOrchestrationStates(MemoryStream memoryStream, bool includeInput)
{
var writer = new BinaryWriter(memoryStream);

Просмотреть файл

@ -3,6 +3,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Runtime.Serialization;
using System.Text;
@ -23,6 +24,12 @@ namespace DurableTask.Netherite
[DataMember]
public bool IsLast { get; set; }
[DataMember]
public DateTime? Timeout { get; set; } // so we can remove incomplete fragments from client requests after they time out
[DataMember]
public (uint,long,int)? DedupPosition { get; set; } // so we can remove incomplete fragments from other partitions
[IgnoreDataMember]
public PartitionEvent ReassembledEvent;

Просмотреть файл

@ -22,6 +22,12 @@ namespace DurableTask.Netherite
[DataMember]
public int PreviousAttempts { get; set; }
[DataMember]
public string ContinuationToken { get; set; }
[DataMember]
public int PageSize { get; set; }
[IgnoreDataMember]
public override EventId EventId => EventId.MakeClientRequestEventId(this.ClientId, this.RequestId);
@ -33,7 +39,7 @@ namespace DurableTask.Netherite
}
}
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition);
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition, DateTime attempt);
public sealed override void DetermineEffects(EffectTracker effects)
{

Просмотреть файл

@ -15,21 +15,30 @@ namespace DurableTask.Netherite
{
const int batchsize = 11;
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> instances, Partition partition)
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> instances, Partition partition, DateTime attempt)
{
int totalcount = 0;
string continuationToken = this.ContinuationToken ?? "";
var response = new QueryResponseReceived
{
ClientId = this.ClientId,
RequestId = this.RequestId,
OrchestrationStates = new List<OrchestrationState>()
Attempt = attempt,
OrchestrationStates = new List<OrchestrationState>(),
};
using var memoryStream = new MemoryStream();
await foreach (var orchestrationState in instances)
{
// a null is used to indicate that we have read the last instance
if (orchestrationState == null)
{
continuationToken = null; // indicates completion
break;
}
if (response.OrchestrationStates.Count == batchsize)
{
response.SerializeOrchestrationStates(memoryStream, this.InstanceQuery.FetchInput);
@ -38,17 +47,22 @@ namespace DurableTask.Netherite
{
ClientId = this.ClientId,
RequestId = this.RequestId,
OrchestrationStates = new List<OrchestrationState>()
Attempt = attempt,
OrchestrationStates = new List<OrchestrationState>(),
};
}
response.OrchestrationStates.Add(orchestrationState);
continuationToken = orchestrationState.OrchestrationInstance.InstanceId;
totalcount++;
}
response.Final = totalcount;
response.ContinuationToken = continuationToken;
response.SerializeOrchestrationStates(memoryStream, this.InstanceQuery.FetchInput);
partition.Send(response);
partition.EventTraceHelper.TraceEventProcessingDetail($"query {this.EventId} attempt {attempt:o} responded totalcount={totalcount} continuationToken={response.ContinuationToken ?? "null"}");
}
public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)

Просмотреть файл

@ -3,6 +3,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Threading.Tasks;
@ -17,16 +18,17 @@ namespace DurableTask.Netherite
[DataMember]
public int NumberInstancesPurged { get; set; } = 0;
const int MaxBatchSize = 1000;
const int MaxBatchSize = 200;
public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> instances, Partition partition)
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> instances, Partition partition, DateTime attempt)
{
int batchCount = 0;
string continuationToken = null;
PurgeBatchIssued makeNewBatchObject()
=> new PurgeBatchIssued()
@ -41,9 +43,6 @@ namespace DurableTask.Netherite
PurgeBatchIssued batch = makeNewBatchObject();
// TODO : while the request itself is reliable, the client response is not.
// We should probably fix that by using the ClientState to track progress.
async Task ExecuteBatch()
{
await partition.State.Prefetch(batch.KeysToPrefetch);
@ -53,12 +52,23 @@ namespace DurableTask.Netherite
await foreach (var orchestrationState in instances)
{
batch.InstanceIds.Add(orchestrationState.OrchestrationInstance.InstanceId);
if (batch.InstanceIds.Count == MaxBatchSize)
if (orchestrationState != null)
{
await ExecuteBatch();
batch = makeNewBatchObject();
string instanceId = orchestrationState.OrchestrationInstance.InstanceId;
batch.InstanceIds.Add(instanceId);
if (batch.InstanceIds.Count == MaxBatchSize)
{
await ExecuteBatch();
batch = makeNewBatchObject();
}
continuationToken = instanceId;
}
else
{
continuationToken = null;
}
}
@ -72,6 +82,7 @@ namespace DurableTask.Netherite
ClientId = this.ClientId,
RequestId = this.RequestId,
NumberInstancesPurged = this.NumberInstancesPurged,
ContinuationToken = continuationToken,
});
}
}

Просмотреть файл

@ -22,6 +22,9 @@ namespace DurableTask.Netherite
[IgnoreDataMember]
public abstract IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages { get; }
[IgnoreDataMember]
public (uint,long,int) DedupPositionForFragments => (this.OriginPartition, this.DedupPosition.Item1, this.DedupPosition.Item2);
public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Dedup);

Просмотреть файл

@ -26,9 +26,15 @@ namespace DurableTask.Netherite
[DataMember]
public string ChangedFingerprint { get; set; }
[DataMember]
public bool KeepInstanceIdsInMemory { get; set; }
[IgnoreDataMember]
public override bool ResetInputQueue => !string.IsNullOrEmpty(this.ChangedFingerprint);
[IgnoreDataMember]
public Dictionary<uint, (long Position, int SubPosition)> ReceivePositions;
public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
@ -64,6 +70,7 @@ namespace DurableTask.Netherite
effects.Add(TrackedObjectKey.Timers);
effects.Add(TrackedObjectKey.Prefetch);
effects.Add(TrackedObjectKey.Queries);
effects.Add(TrackedObjectKey.Dedup);
}
}
}

Просмотреть файл

@ -15,9 +15,6 @@ namespace DurableTask.Netherite
[DataMember]
public uint PartitionId { get; set; }
[IgnoreDataMember]
public ArraySegment<byte> Serialized;
/// <summary>
/// For events coming from the input queue, the next input queue position after this event. For internal events, zero.
/// </summary>
@ -52,7 +49,6 @@ namespace DurableTask.Netherite
// clear all the non-data fields
evt.DurabilityListeners.Clear();
evt.Serialized = default;
evt.NextInputQueuePosition = 0;
// clear the timestamp

Просмотреть файл

@ -22,12 +22,23 @@ namespace DurableTask.Netherite
/// </summary>
public abstract DateTime? TimeoutUtc { get; }
/// <summary>
/// The continuation token for the query
/// </summary>
public abstract string ContinuationToken { get; }
/// <summary>
/// The suggested page size returned. Actual size may be more, or less, than the suggested size.
/// </summary>
public abstract int PageSize { get; }
/// <summary>
/// The continuation for the query operation.
/// </summary>
/// <param name="result">The tracked objects returned by this query</param>
/// <param name="exceptionTask">A task that throws an exception if the enumeration fails</param>
/// <param name="partition">The partition</param>
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition);
/// <param name="attempt">The timestamp for this query attempt</param>
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition, DateTime attempt);
}
}

Просмотреть файл

@ -11,8 +11,14 @@ namespace DurableTask.Netherite
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using DurableTask.Core;
using DurableTask.Core.History;
using Microsoft.Azure.Storage;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using static DurableTask.Netherite.TransportAbstraction;
using static Microsoft.Azure.Amqp.Serialization.SerializableType;
class Client : TransportAbstraction.IClient
{
@ -33,7 +39,7 @@ namespace DurableTask.Netherite
readonly BatchTimer<PendingRequest> ResponseTimeouts;
readonly ConcurrentDictionary<long, PendingRequest> ResponseWaiters;
readonly Dictionary<string, MemoryStream> Fragments;
readonly Dictionary<(string,int), (MemoryStream, int)> Fragments;
readonly Dictionary<long, QueryResponseReceived> QueryResponses;
public static string GetShortId(Guid clientId) => clientId.ToString("N").Substring(0, 7);
@ -56,7 +62,7 @@ namespace DurableTask.Netherite
this.shutdownToken = shutdownToken;
this.ResponseTimeouts = new BatchTimer<PendingRequest>(this.shutdownToken, this.Timeout, this.traceHelper.TraceTimerProgress);
this.ResponseWaiters = new ConcurrentDictionary<long, PendingRequest>();
this.Fragments = new Dictionary<string, MemoryStream>();
this.Fragments = new Dictionary<(string,int), (MemoryStream, int)>();
this.QueryResponses = new Dictionary<long, QueryResponseReceived>();
this.ResponseTimeouts.Start("ClientTimer");
this.workItemStopwatch = new Stopwatch();
@ -78,77 +84,123 @@ namespace DurableTask.Netherite
public void Process(ClientEvent clientEvent)
{
if (clientEvent is QueryResponseReceived queryResponseReceived)
if (clientEvent is ClientEventFragment fragment)
{
queryResponseReceived.DeserializeOrchestrationStates();
bool GotAllResults() => queryResponseReceived.Final == queryResponseReceived.OrchestrationStates.Count;
var originalEventString = fragment.OriginalEventId.ToString();
var index = (originalEventString, clientEvent.ReceiveChannel);
if (this.QueryResponses.TryGetValue(queryResponseReceived.RequestId, out QueryResponseReceived prev))
if (this.traceHelper.LogLevelLimit == Microsoft.Extensions.Logging.LogLevel.Trace)
{
// combine all received states
prev.OrchestrationStates.AddRange(queryResponseReceived.OrchestrationStates);
queryResponseReceived.OrchestrationStates = prev.OrchestrationStates;
this.traceHelper.TraceReceive(fragment, ClientTraceHelper.ResponseType.Fragment);
}
// keep the final count, if known
if (prev.Final.HasValue)
if (fragment.IsLast)
{
(MemoryStream stream, int last) = this.Fragments[index];
if (last != fragment.Fragment)
{
queryResponseReceived.Final = prev.Final;
throw new InvalidDataException($"wrong fragment sequence for event id={originalEventString}");
}
if (GotAllResults())
{
this.QueryResponses.Remove(queryResponseReceived.RequestId);
}
}
var reassembledEvent = FragmentationAndReassembly.Reassemble<ClientEvent>(stream, fragment);
if (GotAllResults())
{
this.ProcessInternal(queryResponseReceived);
this.Process(reassembledEvent);
}
else
{
this.QueryResponses[queryResponseReceived.RequestId] = queryResponseReceived;
}
}
else if (clientEvent is ClientEventFragment fragment)
{
var originalEventString = fragment.OriginalEventId.ToString();
if (!fragment.IsLast)
{
MemoryStream stream;
(MemoryStream, int) streamAndPosition;
if (fragment.Fragment == 0)
{
this.Fragments[originalEventString] = stream = new MemoryStream();
this.Fragments[index] = streamAndPosition = (new MemoryStream(), 0);
}
else
{
stream = this.Fragments[originalEventString];
streamAndPosition = this.Fragments[index];
}
if (streamAndPosition.Item2 != fragment.Fragment)
{
throw new InvalidDataException($"wrong fragment sequence for event id={originalEventString}");
}
stream.Write(fragment.Bytes, 0, fragment.Bytes.Length);
}
else
{
var reassembledEvent = FragmentationAndReassembly.Reassemble<ClientEvent>(this.Fragments[originalEventString], fragment);
this.Fragments.Remove(fragment.EventIdString);
this.ProcessInternal(reassembledEvent);
streamAndPosition.Item1.Write(fragment.Bytes, 0, fragment.Bytes.Length);
streamAndPosition.Item2++;
this.Fragments[index] = streamAndPosition;
}
}
else
{
this.ProcessInternal(clientEvent);
this.Fragments.Remove((clientEvent.EventIdString, clientEvent.ReceiveChannel));
if (clientEvent is QueryResponseReceived queryResponseReceived)
{
queryResponseReceived.DeserializeOrchestrationStates();
bool GotAllResults() => queryResponseReceived.Final == queryResponseReceived.OrchestrationStates.Count;
if (this.QueryResponses.TryGetValue(queryResponseReceived.RequestId, out QueryResponseReceived prev))
{
if (prev.Attempt < queryResponseReceived.Attempt)
{
// ignore the previous entry since we are processing a new attempt
this.QueryResponses.Remove(queryResponseReceived.RequestId);
}
else if (prev.Attempt > queryResponseReceived.Attempt)
{
// the response we just received is part of a superseded attempt, so we just ignore it altogether.
return;
}
else
{
// combine the previously stored states and the newly received ones
prev.OrchestrationStates.AddRange(queryResponseReceived.OrchestrationStates);
queryResponseReceived.OrchestrationStates = prev.OrchestrationStates;
// keep the final count and continuation token, if we received it in the previous message
if (prev.Final.HasValue)
{
queryResponseReceived.Final = prev.Final;
queryResponseReceived.ContinuationToken = prev.ContinuationToken;
}
}
if (GotAllResults())
{
this.QueryResponses.Remove(queryResponseReceived.RequestId);
}
}
if (GotAllResults())
{
this.ProcessInternal(queryResponseReceived);
}
else
{
this.traceHelper.TraceReceive(queryResponseReceived, ClientTraceHelper.ResponseType.Partial);
this.QueryResponses[queryResponseReceived.RequestId] = queryResponseReceived;
}
}
else
{
this.ProcessInternal(clientEvent);
}
}
}
void ProcessInternal(ClientEvent clientEvent)
{
this.traceHelper.TraceReceive(clientEvent);
{
if (this.ResponseWaiters.TryRemove(clientEvent.RequestId, out var waiter))
{
this.traceHelper.TraceReceive(clientEvent, ClientTraceHelper.ResponseType.Response);
waiter.Respond(clientEvent);
}
else
{
this.traceHelper.TraceReceive(clientEvent, ClientTraceHelper.ResponseType.Obsolete);
}
}
public void Send(PartitionEvent partitionEvent)
@ -196,8 +248,6 @@ namespace DurableTask.Netherite
public Task<ClientEvent> Task => this.continuation.Task;
public (DateTime, int) TimeoutKey => this.timeoutKey;
public string RequestId => $"{Client.GetShortId(this.client.ClientId)}-{this.requestId}"; // matches EventId
public PendingRequest(long requestId, EventId partitionEventId, uint partitionId, Client client, DateTime due, int timeoutId)
{
this.requestId = requestId;
@ -462,26 +512,28 @@ namespace DurableTask.Netherite
return (response?.ExecutionId, response?.History);
}
public Task<IList<OrchestrationState>> GetOrchestrationStateAsync(CancellationToken cancellationToken)
=> this.RunPartitionQueries<InstanceQueryReceived, QueryResponseReceived, IList<OrchestrationState>>(
partitionId => new InstanceQueryReceived() {
PartitionId = partitionId,
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
public Task<List<OrchestrationState>> GetOrchestrationStateAsync(CancellationToken cancellationToken)
=> this.RunPartitionQueries(
this.GetInitialPositions(),
() => new InstanceQueryReceived() {
InstanceQuery = new InstanceQuery(),
},
(IEnumerable<QueryResponseReceived> responses) => responses.SelectMany(response => response.OrchestrationStates).ToList(),
new List<OrchestrationState>(),
(List<OrchestrationState> list, QueryResponseReceived response) =>
{
list.AddRange(response.OrchestrationStates);
return list;
},
(QueryResponseReceived response) => response.ContinuationToken,
pageSize: 500,
keepGoingUntilDone: true,
cancellationToken);
public Task<IList<OrchestrationState>> GetOrchestrationStateAsync(DateTime? createdTimeFrom, DateTime? createdTimeTo,
public Task<List<OrchestrationState>> GetOrchestrationStateAsync(DateTime? createdTimeFrom, DateTime? createdTimeTo,
IEnumerable<OrchestrationStatus> runtimeStatus, string instanceIdPrefix, CancellationToken cancellationToken = default)
=> this.RunPartitionQueries<InstanceQueryReceived,QueryResponseReceived,IList<OrchestrationState>>(
partitionId => new InstanceQueryReceived() {
PartitionId = partitionId,
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
=> this.RunPartitionQueries(
this.GetInitialPositions(),
() => new InstanceQueryReceived() {
InstanceQuery = new InstanceQuery(
runtimeStatus?.ToArray(),
createdTimeFrom?.ToUniversalTime(),
@ -489,17 +541,22 @@ namespace DurableTask.Netherite
instanceIdPrefix,
fetchInput: true),
},
(IEnumerable<QueryResponseReceived> responses) => responses.SelectMany(response => response.OrchestrationStates).ToList(),
new List<OrchestrationState>(),
(List<OrchestrationState> list, QueryResponseReceived response) =>
{
list.AddRange(response.OrchestrationStates);
return list;
},
(QueryResponseReceived response) => response.ContinuationToken,
pageSize: 500,
keepGoingUntilDone: true,
cancellationToken);
public Task<int> PurgeInstanceHistoryAsync(DateTime? createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, CancellationToken cancellationToken = default)
=> this.RunPartitionQueries(
partitionId => new PurgeRequestReceived()
this.GetInitialPositions(),
() => new PurgeRequestReceived()
{
PartitionId = partitionId,
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
InstanceQuery = new InstanceQuery(
runtimeStatus?.ToArray(),
createdTimeFrom?.ToUniversalTime(),
@ -508,41 +565,312 @@ namespace DurableTask.Netherite
fetchInput: false)
{ PrefetchHistory = true },
},
(IEnumerable<PurgeResponseReceived> responses) => responses.Sum(response => response.NumberInstancesPurged),
0,
(int sum, PurgeResponseReceived response) => sum + response.NumberInstancesPurged,
(PurgeResponseReceived response) => response.ContinuationToken,
pageSize: 500,
keepGoingUntilDone: true,
cancellationToken);
public Task<InstanceQueryResult> QueryOrchestrationStatesAsync(InstanceQuery instanceQuery, int pageSize, string continuationToken, CancellationToken cancellationToken)
=> this.RunPartitionQueries(
partitionId => new InstanceQueryReceived()
public async Task<InstanceQueryResult> QueryOrchestrationStatesAsync(InstanceQuery instanceQuery, int pageSize, string continuationToken, CancellationToken cancellationToken)
{
var positions = this.ConvertContinuationTokenToPositions(continuationToken);
List<OrchestrationState> instances = await this.RunPartitionQueries(
positions,
() => new InstanceQueryReceived()
{
PartitionId = partitionId,
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
InstanceQuery = instanceQuery,
},
(IEnumerable<QueryResponseReceived> responses) => new InstanceQueryResult()
new List<OrchestrationState>(),
(List<OrchestrationState> list, QueryResponseReceived response) =>
{
Instances = responses.SelectMany(response => response.OrchestrationStates),
ContinuationToken = null,
list.AddRange(response.OrchestrationStates);
return list;
},
cancellationToken);
(QueryResponseReceived response) => response.ContinuationToken,
pageSize,
pageSize == 0,
cancellationToken).ConfigureAwait(false);
async Task<TResult> RunPartitionQueries<TRequest,TResponse,TResult>(
Func<uint, TRequest> requestCreator,
Func<IEnumerable<TResponse>,TResult> responseAggregator,
CancellationToken cancellationToken)
where TRequest: IClientRequestEvent
{
IEnumerable<Task<ClientEvent>> launchQueries()
continuationToken = this.ConvertPositionsToContinuationToken(positions);
return new InstanceQueryResult()
{
for (uint partitionId = 0; partitionId < this.host.NumberPartitions; ++partitionId)
Instances = instances,
ContinuationToken = continuationToken,
};
}
string[] GetInitialPositions()
{
string[] positions = new string[this.host.NumberPartitions];
for (int i = 0; i < positions.Length; i++)
{
positions[i] = String.Empty;
}
return positions;
}
string[] ConvertContinuationTokenToPositions(string continuationToken)
{
if (string.IsNullOrEmpty(continuationToken))
{
return this.GetInitialPositions();
}
else
{
string[] positions;
try
{
yield return this.PerformRequestWithTimeoutAndCancellation(cancellationToken, requestCreator(partitionId), false);
positions = JsonConvert.DeserializeObject<string[]>(continuationToken);
}
catch (Exception e)
{
throw new ArgumentException("invalid continuation token: failed to parse: ", e);
}
if (positions.Length != this.host.NumberPartitions)
{
throw new ArgumentException("invalid continuation token: wrong number of partition");
}
if (positions.Count(s => s != null) == 0)
{
throw new ArgumentException("invalid continuation token: no targets");
}
return positions;
}
}
string ConvertPositionsToContinuationToken(string[] positions)
{
return positions.Any(t => t != null) ? JsonConvert.SerializeObject(positions) : null;
}
async Task<TResult> RunPartitionQueries<TRequest, TResponse, TResult>(
string[] partitionPositions,
Func<TRequest> requestCreator,
TResult initialResult,
Func<TResult, TResponse, TResult> responseAggregator,
Func<TResponse, string> continuationToken,
int pageSize,
bool keepGoingUntilDone,
CancellationToken cancellationToken)
where TRequest : ClientRequestEventWithQuery
where TResponse : ClientEvent, IPagedResponse
{
string clientQueryId = $"Q{Interlocked.Increment(ref this.SequenceNumber)}";
string PrintPartitionPositions() => string.Join(",", partitionPositions.Select(s => s ?? "null"));
if (!this.host.Settings.KeepInstanceIdsInMemory)
{
pageSize = 0; // paging is not supported
}
this.traceHelper.TraceProgress($"Query {clientQueryId} type={typeof(TRequest).Name} paging={pageSize > 0} starting at {PrintPartitionPositions()}");
var stopwatch = Stopwatch.StartNew();
try
{
TResult result;
if (pageSize > 0)
{
result = await this.RunPagedPartitionQueries(
clientQueryId,
partitionPositions,
requestCreator,
initialResult,
responseAggregator,
pageSize,
keepGoingUntilDone,
cancellationToken).ConfigureAwait(false);
}
else
{
result = await this.RunUnpagedPartitionQueries(
clientQueryId,
(uint partitionId) =>
{
var request = requestCreator();
request.PartitionId = partitionId;
request.ClientId = this.ClientId;
request.RequestId = Interlocked.Increment(ref this.SequenceNumber);
request.PageSize = 0;
request.TimeoutUtc = this.GetTimeoutBucket(TimeSpan.FromMinutes(60));
return request;
},
(IEnumerable<TResponse> responses) =>
{
TResult result = initialResult;
int i = 0;
foreach (var response in responses)
{
result = responseAggregator(result, response);
partitionPositions[i++] = continuationToken(response);
}
return result;
},
cancellationToken).ConfigureAwait(false);
}
this.traceHelper.TraceProgress($"Query {clientQueryId} type={typeof(TRequest).Name} completed after {stopwatch.Elapsed.TotalSeconds:F2}s at {PrintPartitionPositions()}");
return result;
}
catch (Exception exception)
{
this.traceHelper.TraceError("RunPartitionQueries", $"Query {clientQueryId} type={typeof(TRequest).Name} failed after {stopwatch.Elapsed.TotalSeconds:F2}s", exception);
throw;
}
}
async Task<TResult> RunUnpagedPartitionQueries<TRequest, TResponse, TResult>(
string clientQueryId,
Func<uint, TRequest> requestCreator,
Func<IEnumerable<TResponse>, TResult> responseAggregator,
CancellationToken cancellationToken)
where TRequest : ClientRequestEventWithQuery
where TResponse : ClientEvent, IPagedResponse
{
async Task<TResponse> QueryPartition(uint partitionId)
{
var stopwatch = Stopwatch.StartNew();
var request = requestCreator(partitionId);
var response = (TResponse) await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
this.traceHelper.TraceQueryProgress(clientQueryId, request.EventIdString, partitionId, stopwatch.Elapsed, request.PageSize, response.Count, response.ContinuationToken);
return response;
}
var tasks = Enumerable.Range(0, (int) this.host.NumberPartitions).Select(i => QueryPartition((uint) i)).ToList();
ClientEvent[] responses = await Task.WhenAll(tasks).ConfigureAwait(false);
return responseAggregator(responses.Cast<TResponse>());
}
public async Task<TResult> RunPagedPartitionQueries<TRequest, TResponse, TResult>(
string clientQueryId,
string[] partitionPositions,
Func<TRequest> requestCreator,
TResult initialResult,
Func<TResult, TResponse, TResult> responseAggregator,
int pageSize,
bool keepGoingUntilDone,
CancellationToken cancellationToken)
where TRequest : ClientRequestEventWithQuery
where TResponse: ClientEvent, IPagedResponse
{
TResult currentResult = initialResult;
var aggregationLock = new object();
// query each partition
var tasks = new Task[partitionPositions.Length];
for (uint i = 0; i < tasks.Length; i++)
{
tasks[i] = QueryPartition(i);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
// return the aggregated result
return currentResult;
async Task QueryPartition(uint partitionId)
{
int retries = 5;
TimeSpan retryDelay = TimeSpan.FromSeconds(2);
Task BackOffAsync()
{
retries--;
retryDelay += retryDelay;
return Task.Delay(retryDelay);
}
void ResetRetries()
{
retries = 5;
retryDelay = TimeSpan.FromSeconds(2);
}
bool hasMore = (partitionPositions[partitionId] != null);
while (hasMore && !cancellationToken.IsCancellationRequested)
{
hasMore = await GetNextPageAsync().ConfigureAwait(false);
if (!keepGoingUntilDone)
{
return; // we take only the first page from each partition
}
}
async Task<bool> GetNextPageAsync()
{
var request = requestCreator();
request.PartitionId = partitionId;
request.ClientId = this.ClientId;
request.RequestId = Interlocked.Increment(ref this.SequenceNumber);
request.PageSize = pageSize;
request.TimeoutUtc = this.GetTimeoutBucket(Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(30));
request.ContinuationToken = partitionPositions[partitionId];
try
{
if (request.ContinuationToken == null)
{
throw new InvalidDataException($"query {clientQueryId} is issuing query for already-completed partition id {partitionId}");
}
var stopwatch = Stopwatch.StartNew();
var response = (TResponse)await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
this.traceHelper.TraceQueryProgress(clientQueryId, request.EventIdString, partitionId, stopwatch.Elapsed, request.PageSize, response.Count, response.ContinuationToken);
ResetRetries();
lock (aggregationLock)
{
currentResult = responseAggregator(currentResult, response);
}
if (response.ContinuationToken == null)
{
partitionPositions[partitionId] = null;
return false;
}
int progress = response.ContinuationToken.CompareTo(partitionPositions[partitionId]);
if (progress > 0)
{
partitionPositions[partitionId] = response.ContinuationToken;
return true;
}
else if (progress < 0)
{
throw new InvalidDataException($"query {clientQueryId} received invalid continuation token for {request.EventId}");
}
else if (retries > 0)
{
await BackOffAsync().ConfigureAwait(false);
return true;
}
else
{
throw new TimeoutException($"query {clientQueryId} did not make progress in time on partition {partitionId}");
}
}
catch (OperationCanceledException)
{
return false;
}
catch (Exception) when (retries > 0)
{
await BackOffAsync().ConfigureAwait(false);
return true;
}
}
}
ClientEvent[] responses = await Task.WhenAll(launchQueries().ToList()).ConfigureAwait(false);
return responseAggregator(responses.Cast<TResponse>());
}
public Task ForceTerminateTaskOrchestrationAsync(uint partitionId, string instanceId, string message)

Просмотреть файл

@ -311,6 +311,8 @@ namespace DurableTask.Netherite
public async Task TryStartAsync(bool clientOnly)
{
clientOnly = clientOnly || this.Settings.PartitionManagement == PartitionManagementOptions.ClientOnly;
while (true)
{
var currentTransition = this.currentTransition;
@ -370,6 +372,9 @@ namespace DurableTask.Netherite
this.checkedClient = this.client;
this.ActivityWorkItemQueue = new WorkItemQueue<ActivityWorkItem>();
this.OrchestrationWorkItemQueue = new WorkItemQueue<OrchestrationWorkItem>();
this.TraceHelper.TraceProgress($"Started client");
return ServiceState.Client;
@ -406,9 +411,6 @@ namespace DurableTask.Netherite
this.TraceHelper.TraceProgress("Starting Workers");
this.ActivityWorkItemQueue = new WorkItemQueue<ActivityWorkItem>();
this.OrchestrationWorkItemQueue = new WorkItemQueue<OrchestrationWorkItem>();
LeaseTimer.Instance.DelayWarning = (int delay) =>
this.TraceHelper.TraceWarning($"Lease timer is running {delay}s behind schedule");
@ -561,34 +563,35 @@ namespace DurableTask.Netherite
/// <inheritdoc />
async Task IOrchestrationServiceClient.CreateTaskOrchestrationAsync(TaskMessage creationMessage)
=> await (await this.GetClientAsync()).CreateTaskOrchestrationAsync(
=> await (await this.GetClientAsync().ConfigureAwait(false)).CreateTaskOrchestrationAsync(
this.GetPartitionId(creationMessage.OrchestrationInstance.InstanceId),
creationMessage,
null);
null).ConfigureAwait(false);
/// <inheritdoc />
async Task IOrchestrationServiceClient.CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
=> await (await this.GetClientAsync()).CreateTaskOrchestrationAsync(
=> await (await this.GetClientAsync().ConfigureAwait(false)).CreateTaskOrchestrationAsync(
this.GetPartitionId(creationMessage.OrchestrationInstance.InstanceId),
creationMessage,
dedupeStatuses);
dedupeStatuses).ConfigureAwait(false);
/// <inheritdoc />
async Task IOrchestrationServiceClient.SendTaskOrchestrationMessageAsync(TaskMessage message)
=> await (await this.GetClientAsync()).SendTaskOrchestrationMessageBatchAsync(
=> await (await this.GetClientAsync().ConfigureAwait(false)).SendTaskOrchestrationMessageBatchAsync(
this.GetPartitionId(message.OrchestrationInstance.InstanceId),
new[] { message });
new[] { message }).ConfigureAwait(false);
/// <inheritdoc />
async Task IOrchestrationServiceClient.SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
{
var client = await this.GetClientAsync();
var client = await this.GetClientAsync().ConfigureAwait(false);
if (messages.Length != 0)
{
await Task.WhenAll(messages
.GroupBy(tm => this.GetPartitionId(tm.OrchestrationInstance.InstanceId))
.Select(group => client.SendTaskOrchestrationMessageBatchAsync(group.Key, group))
.ToList());
.ToList())
.ConfigureAwait(false);
}
}
@ -599,19 +602,19 @@ namespace DurableTask.Netherite
string executionId,
TimeSpan timeout,
CancellationToken cancellationToken)
=> await (await this.GetClientAsync()).WaitForOrchestrationAsync(
=> await (await this.GetClientAsync().ConfigureAwait(false)).WaitForOrchestrationAsync(
this.GetPartitionId(instanceId),
instanceId,
executionId,
timeout,
cancellationToken);
cancellationToken).ConfigureAwait(false);
/// <inheritdoc />
async Task<OrchestrationState> IOrchestrationServiceClient.GetOrchestrationStateAsync(
string instanceId,
string executionId)
{
var state = await (await this.GetClientAsync()).GetOrchestrationStateAsync(this.GetPartitionId(instanceId), instanceId, true).ConfigureAwait(false);
var state = await (await this.GetClientAsync().ConfigureAwait(false)).GetOrchestrationStateAsync(this.GetPartitionId(instanceId), instanceId, true).ConfigureAwait(false);
return state != null && (executionId == null || executionId == state.OrchestrationInstance.ExecutionId)
? state
: null;
@ -623,7 +626,7 @@ namespace DurableTask.Netherite
bool allExecutions)
{
// note: allExecutions is always ignored because storage contains never more than one execution.
var state = await (await this.GetClientAsync()).GetOrchestrationStateAsync(this.GetPartitionId(instanceId), instanceId, true).ConfigureAwait(false);
var state = await (await this.GetClientAsync().ConfigureAwait(false)).GetOrchestrationStateAsync(this.GetPartitionId(instanceId), instanceId, true).ConfigureAwait(false);
return state != null
? (new[] { state })
: (new OrchestrationState[0]);
@ -633,14 +636,14 @@ namespace DurableTask.Netherite
async Task IOrchestrationServiceClient.ForceTerminateTaskOrchestrationAsync(
string instanceId,
string message)
=> await (await this.GetClientAsync()).ForceTerminateTaskOrchestrationAsync(this.GetPartitionId(instanceId), instanceId, message);
=> await (await this.GetClientAsync().ConfigureAwait(false)).ForceTerminateTaskOrchestrationAsync(this.GetPartitionId(instanceId), instanceId, message).ConfigureAwait(false);
/// <inheritdoc />
async Task<string> IOrchestrationServiceClient.GetOrchestrationHistoryAsync(
string instanceId,
string executionId)
{
var client = await this.GetClientAsync();
var client = await this.GetClientAsync().ConfigureAwait(false);
(string actualExecutionId, IList<HistoryEvent> history) =
await client.GetOrchestrationHistoryAsync(this.GetPartitionId(instanceId), instanceId).ConfigureAwait(false);
@ -665,38 +668,38 @@ namespace DurableTask.Netherite
throw new NotSupportedException("Purging is supported only for Orchestration created time filter.");
}
await (await this.GetClientAsync()).PurgeInstanceHistoryAsync(thresholdDateTimeUtc, null, null);
await (await this.GetClientAsync().ConfigureAwait(false)).PurgeInstanceHistoryAsync(thresholdDateTimeUtc, null, null).ConfigureAwait(false);
}
/// <inheritdoc />
async Task<OrchestrationState> IOrchestrationServiceQueryClient.GetOrchestrationStateAsync(string instanceId, bool fetchInput, bool fetchOutput)
{
return await (await this.GetClientAsync()).GetOrchestrationStateAsync(this.GetPartitionId(instanceId), instanceId, fetchInput, fetchOutput);
return await (await this.GetClientAsync().ConfigureAwait(false)).GetOrchestrationStateAsync(this.GetPartitionId(instanceId), instanceId, fetchInput, fetchOutput).ConfigureAwait(false);
}
/// <inheritdoc />
async Task<IList<OrchestrationState>> IOrchestrationServiceQueryClient.GetAllOrchestrationStatesAsync(CancellationToken cancellationToken)
=> await (await this.GetClientAsync()).GetOrchestrationStateAsync(cancellationToken);
=> await (await this.GetClientAsync().ConfigureAwait(false)).GetOrchestrationStateAsync(cancellationToken).ConfigureAwait(false);
/// <inheritdoc />
async Task<IList<OrchestrationState>> IOrchestrationServiceQueryClient.GetOrchestrationStateAsync(DateTime? CreatedTimeFrom, DateTime? CreatedTimeTo, IEnumerable<OrchestrationStatus> RuntimeStatus, string InstanceIdPrefix, CancellationToken CancellationToken)
=> await (await this.GetClientAsync()).GetOrchestrationStateAsync(CreatedTimeFrom, CreatedTimeTo, RuntimeStatus, InstanceIdPrefix, CancellationToken);
=> await (await this.GetClientAsync().ConfigureAwait(false)).GetOrchestrationStateAsync(CreatedTimeFrom, CreatedTimeTo, RuntimeStatus, InstanceIdPrefix, CancellationToken).ConfigureAwait(false);
/// <inheritdoc />
async Task<int> IOrchestrationServiceQueryClient.PurgeInstanceHistoryAsync(string instanceId)
=> await (await this.GetClientAsync()).DeleteAllDataForOrchestrationInstance(this.GetPartitionId(instanceId), instanceId);
=> await (await this.GetClientAsync().ConfigureAwait(false)).DeleteAllDataForOrchestrationInstance(this.GetPartitionId(instanceId), instanceId).ConfigureAwait(false);
/// <inheritdoc />
async Task<int> IOrchestrationServiceQueryClient.PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
=> await (await this.GetClientAsync()).PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus);
=> await (await this.GetClientAsync().ConfigureAwait(false)).PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus).ConfigureAwait(false);
/// <inheritdoc />
async Task<InstanceQueryResult> IOrchestrationServiceQueryClient.QueryOrchestrationStatesAsync(InstanceQuery instanceQuery, int pageSize, string continuationToken, CancellationToken cancellationToken)
=> await (await this.GetClientAsync()).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken);
=> await (await this.GetClientAsync().ConfigureAwait(false)).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken).ConfigureAwait(false);
/// <inheritdoc />
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId)
=> new PurgeResult(await (await this.GetClientAsync()).DeleteAllDataForOrchestrationInstance(this.GetPartitionId(instanceId), instanceId));
=> new PurgeResult(await (await this.GetClientAsync().ConfigureAwait(false)).DeleteAllDataForOrchestrationInstance(this.GetPartitionId(instanceId), instanceId).ConfigureAwait(false));
/// <inheritdoc />
async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
@ -930,7 +933,7 @@ namespace DurableTask.Netherite
{
if (nextActivityWorkItem.WaitForDequeueCountPersistence != null)
{
await nextActivityWorkItem.WaitForDequeueCountPersistence.Task;
await nextActivityWorkItem.WaitForDequeueCountPersistence.Task.ConfigureAwait(false);
}
this.workItemTraceHelper.TraceWorkItemStarted(

Просмотреть файл

@ -145,6 +145,11 @@ namespace DurableTask.Netherite
/// </summary>
public bool UseAlternateObjectStore { get; set; } = false;
/// <summary>
/// Whether to keep an in-memory set of all instance ids in memory. This is required for supporting paged queries.
/// </summary>
public bool KeepInstanceIdsInMemory = true;
/// <summary>
/// Forces steps to pe persisted before applying their effects, disabling all pipelining.
/// </summary>

Просмотреть файл

@ -48,7 +48,7 @@ namespace DurableTask.Netherite
public static string GetWorkItemId(uint partition, long activityId) => $"{partition:D2}A{activityId}";
public override void OnFirstInitialization()
public override void OnFirstInitialization(Partition partition)
{
this.Pending = new Dictionary<long, ActivityInfo>();
// Backlog queue for local tasks

Просмотреть файл

@ -83,5 +83,12 @@ namespace DurableTask.Netherite
}
}
}
public override void Process(RecoveryCompleted evt, EffectTracker tracker)
{
// throw away fragments for which we have already gone past the dedup position
evt.ReceivePositions = this.LastProcessed;
tracker.Add(TrackedObjectKey.Reassembly);
}
}
}

Просмотреть файл

@ -33,7 +33,7 @@ namespace DurableTask.Netherite
if (!effects.IsReplaying)
{
effects.EventTraceHelper.TraceEventProcessingWarning($"Dropped query {kvp.Value.EventIdString} because it has timed out");
effects.EventTraceHelper.TraceEventProcessingWarning($"Dropped query {kvp.Value.EventIdString} during recovery because it has timed out");
}
}
@ -108,6 +108,10 @@ namespace DurableTask.Netherite
this.request = clientRequest;
}
public override string ContinuationToken => this.request.ContinuationToken;
public override int PageSize => this.request.PageSize;
protected override void ExtraTraceInformation(StringBuilder s)
{
s.Append(':');
@ -118,7 +122,7 @@ namespace DurableTask.Netherite
public override Netherite.InstanceQuery InstanceQuery => this.request.InstanceQuery;
public override async Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition)
public override async Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition, DateTime attempt)
{
partition.Assert(this.request.Phase == ClientRequestEventWithQuery.ProcessingPhase.Query, "wrong phase in QueriesState.OnQueryCompleteAsync");
@ -126,18 +130,28 @@ namespace DurableTask.Netherite
try
{
await this.request.OnQueryCompleteAsync(result, partition);
await this.request.OnQueryCompleteAsync(result, partition, attempt);
retry = false;
}
catch (FASTER.core.FasterException exception) when (this.request.PreviousAttempts < 3 && exception.Message.StartsWith("Iterator address is less than log BeginAddress"))
{
partition.EventTraceHelper.TraceEventProcessingWarning($"retrying query {this.request.EventId} after internal error, PreviousAttempts={this.request.PreviousAttempts}");
partition.EventTraceHelper.TraceEventProcessingWarning($"retrying query {this.request.EventId} attempt {attempt} after internal error, PreviousAttempts={this.request.PreviousAttempts}");
retry = true;
}
catch (OperationCanceledException)
{
partition.EventTraceHelper.TraceEventProcessingWarning($"canceling query {this.request.EventId} attempt {attempt}");
retry = true;
}
catch (Exception) when (partition.ErrorHandler.IsTerminated)
{
partition.EventTraceHelper.TraceEventProcessingWarning($"abandoning query {this.request.EventId} attempt {attempt} as partition is shutting down");
retry = false;
}
catch (Exception exception) when (!Utils.IsFatal(exception) && !partition.ErrorHandler.IsTerminated)
{
// unhandled exceptions terminate the query
partition.EventTraceHelper.TraceEventProcessingWarning($"abandoning query {this.request.EventId} after internal error, PreviousAttempts={this.request.PreviousAttempts}");
partition.EventTraceHelper.TraceEventProcessingWarning($"abandoning query {this.request.EventId} attempt {attempt} after internal error: {exception}");
retry = false;
}

Просмотреть файл

@ -6,6 +6,7 @@ namespace DurableTask.Netherite
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
[DataContract]
@ -21,13 +22,47 @@ namespace DurableTask.Netherite
return $"Reassembly ({this.Fragments.Count} pending)";
}
public override void Process(RecoveryCompleted evt, EffectTracker effects)
{
bool IsExpired(List<PartitionEventFragment> list)
{
var fragment = list.First();
if (fragment.Timeout.HasValue)
{
return fragment.Timeout.Value < evt.Timestamp;
}
else if (fragment.DedupPosition.HasValue)
{
evt.ReceivePositions.TryGetValue(fragment.DedupPosition.Value.Item1, out (long, int) lastProcessed);
return lastProcessed.CompareTo((fragment.DedupPosition.Value.Item2, fragment.DedupPosition.Value.Item3)) >= 0;
}
else
{
return false;
}
}
var expired = this.Fragments.Where(kvp => IsExpired(kvp.Value)).ToList();
foreach (var kvp in expired)
{
this.Fragments.Remove(kvp.Key);
if (!effects.IsReplaying)
{
effects.EventTraceHelper.TraceEventProcessingDetail($"Dropped {kvp.Value.Count()} expired fragments for id={kvp.Value.First().OriginalEventId} during recovery");
}
}
}
public override void Process(PartitionEventFragment evt, EffectTracker effects)
{
// stores fragments until the last one is received
var originalEventString = evt.OriginalEventId.ToString();
if (evt.IsLast)
{
evt.ReassembledEvent = FragmentationAndReassembly.Reassemble<PartitionEvent>(this.Fragments[originalEventString], evt);
evt.ReassembledEvent = FragmentationAndReassembly.Reassemble<PartitionEvent>(this.Fragments[originalEventString], evt, effects.Partition);
effects.EventDetailTracer?.TraceEventProcessingDetail($"Reassembled {evt.ReassembledEvent}");
@ -70,6 +105,6 @@ namespace DurableTask.Netherite
list.Add(evt);
}
}
}
}
}

Просмотреть файл

@ -18,9 +18,15 @@ namespace DurableTask.Netherite
[DataMember]
public long InstanceCount { get; set; }
[DataMember]
public SortedSet<string> InstanceIds { get; set; }
[IgnoreDataMember]
public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Stats);
[IgnoreDataMember]
public bool HasInstanceIds => this.InstanceIds != null;
public override string ToString()
{
return $"Stats (InstanceCount={this.InstanceCount})";
@ -29,11 +35,41 @@ namespace DurableTask.Netherite
public override void UpdateLoadInfo(PartitionLoadInfo info)
{
info.Instances = this.InstanceCount;
this.Partition.Assert(!this.HasInstanceIds || this.InstanceCount == this.InstanceIds.Count, "instance count does not match index size");
}
public override void OnFirstInitialization(Partition partition)
{
// indexing the keys
if (partition.Settings.KeepInstanceIdsInMemory)
{
this.InstanceIds = new SortedSet<string>();
}
}
public override void Process(RecoveryCompleted evt, EffectTracker effects)
{
if (this.HasInstanceIds && !evt.KeepInstanceIdsInMemory)
{
this.InstanceIds = null; // remove the index
}
else if (!this.HasInstanceIds && evt.KeepInstanceIdsInMemory && !effects.IsReplaying)
{
// TODO: kick off a background task to rebuild the index
}
}
public override void Process(CreationRequestReceived evt, EffectTracker effects)
{
this.InstanceCount++;
if (this.InstanceIds != null)
{
lock (this.InstanceIds)
{
this.InstanceIds.Add(evt.InstanceId);
}
}
}
public override void Process(BatchProcessed evt, EffectTracker effects)
@ -46,17 +82,91 @@ namespace DurableTask.Netherite
{
this.InstanceCount--;
}
if (this.InstanceIds != null)
{
lock (this)
{
if (!evt.DeleteInstance)
{
this.InstanceIds.Add(evt.InstanceId);
}
else
{
this.InstanceIds.Remove(evt.InstanceId);
}
}
}
}
public override void Process(PurgeBatchIssued evt, EffectTracker effects)
{
this.InstanceCount -= evt.Purged.Count;
if (this.InstanceIds != null)
{
lock (this)
{
foreach (var key in evt.Purged)
{
this.InstanceIds.Remove(key);
}
}
}
}
public override void Process(DeletionRequestReceived evt, EffectTracker effects)
{
this.InstanceCount--;
if (this.InstanceIds != null)
{
lock (this.InstanceIds)
{
this.InstanceIds.Remove(evt.InstanceId);
}
}
}
// called by query
public IEnumerator<string> GetEnumerator(string prefix, string from)
{
int pageSize = 500;
Func<string, bool> predicate =
string.IsNullOrEmpty(prefix) ? ((s) => true) : ((s) => s.StartsWith(prefix));
while (true)
{
var chunk = GetChunk();
foreach (var s in chunk)
{
yield return s;
}
if (chunk.Count < 500)
{
yield break;
}
from = chunk[chunk.Count - 1];
}
List<string> GetChunk()
{
lock (this.InstanceIds)
{
if (string.IsNullOrEmpty(from))
{
return this.InstanceIds.Where(predicate).Take(pageSize).ToList();
}
else
{
return this.InstanceIds.GetViewBetween(from, this.InstanceIds.Max).Where(s => s != from).Where(predicate).Take(pageSize).ToList();
}
}
}
}
}
}

Просмотреть файл

@ -191,7 +191,7 @@ namespace DurableTask.Netherite.Faster
internal void CheckForHangs(object _)
{
DateTime threshold = DateTime.UtcNow - this.limit;
DateTime threshold = DateTime.UtcNow - (Debugger.IsAttached ? TimeSpan.FromMinutes(30) : this.limit);
foreach(var kvp in this.pendingReadWriteOperations)
{

Просмотреть файл

@ -51,6 +51,8 @@ namespace DurableTask.Netherite.Faster
public IDevice HybridLogDevice { get; private set; }
public IDevice ObjectLogDevice { get; private set; }
public DateTime IncarnationTimestamp { get; private set; }
IDevice[] PsfLogDevices;
internal CheckpointInfo[] PsfCheckpointInfos { get; }
int PsfGroupCount => this.PsfCheckpointInfos.Length;
@ -137,7 +139,7 @@ namespace DurableTask.Netherite.Faster
(useSeparatePageBlobStorage ? 35 // 32 GB
: 32), // 4 GB
PreallocateLog = false,
ReadFlags = ReadFlags.CopyReadsToTail,
ReadFlags = ReadFlags.None,
ReadCacheSettings = null, // no read cache
MemorySizeBits = memorybits,
};
@ -567,6 +569,7 @@ namespace DurableTask.Netherite.Faster
this.TraceHelper.LeaseAcquired();
}
this.IncarnationTimestamp = DateTime.UtcNow;
this.leaseTimer = newLeaseTimer;
this.LeaseMaintenanceLoopTask = Task.Run(() => this.MaintenanceLoopAsync());
return;

Просмотреть файл

@ -4,10 +4,12 @@
namespace DurableTask.Netherite.Faster
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Channels;
@ -30,7 +32,7 @@ namespace DurableTask.Netherite.Faster
readonly MemoryTracker.CacheTracker cacheTracker;
readonly LogSettings storelogsettings;
readonly Stopwatch compactionStopwatch;
readonly Dictionary<(PartitionReadEvent,Key), double> pendingReads;
readonly Dictionary<(PartitionReadEvent, Key), double> pendingReads;
readonly List<IDisposable> sessionsToDisposeOnShutdown;
TrackedObject[] singletons;
@ -38,6 +40,35 @@ namespace DurableTask.Netherite.Faster
ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>> mainSession;
const int queryParallelism = 100;
readonly ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>>[] querySessions
= new ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>>[queryParallelism];
static readonly SemaphoreSlim availableQuerySessions = new SemaphoreSlim(queryParallelism);
readonly ConcurrentBag<int> idleQuerySessions = new ConcurrentBag<int>(Enumerable.Range(0, queryParallelism));
async ValueTask<FasterKV<Key, Value>.ReadAsyncResult<EffectTracker, Output, object>> ReadOnQuerySessionAsync(string instanceId, CancellationToken cancellationToken)
{
await availableQuerySessions.WaitAsync();
try
{
bool success = this.idleQuerySessions.TryTake(out var session);
this.partition.Assert(success, "available sessions must be larger than or equal to semaphore count");
try
{
var result = await this.querySessions[session].ReadAsync(TrackedObjectKey.Instance(instanceId), token: cancellationToken).ConfigureAwait(false);
return result;
}
finally
{
this.idleQuerySessions.Add(session);
}
}
finally
{
availableQuerySessions.Release();
}
}
double nextHangCheck;
const int HangCheckPeriod = 30000;
const int ReadRetryAfter = 20000;
@ -56,8 +87,8 @@ namespace DurableTask.Netherite.Faster
partition.ErrorHandler.Token.ThrowIfCancellationRequested();
this.storelogsettings = blobManager.GetDefaultStoreLogSettings(
partition.Settings.UseSeparatePageBlobStorage,
memoryTracker.MaxCacheSize,
partition.Settings.UseSeparatePageBlobStorage,
memoryTracker.MaxCacheSize,
partition.Settings.FasterTuningParameters);
this.fht = new FasterKV<Key, Value>(
@ -114,6 +145,19 @@ namespace DurableTask.Netherite.Faster
// can happen during shutdown
}
this.TraceHelper.FasterProgress("Disposing Query Sessions");
foreach (var s in this.querySessions)
{
try
{
s?.Dispose();
}
catch (OperationCanceledException)
{
// can happen during shutdown
}
}
this.TraceHelper.FasterProgress("Disposing FasterKV");
this.fht.Dispose();
@ -142,7 +186,7 @@ namespace DurableTask.Netherite.Faster
ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>> CreateASession(string id, bool isScan)
{
var functions = new Functions(this.partition, this, this.cacheTracker, isScan);
return this.fht.NewSession(functions, id);
return this.fht.NewSession(functions, id, readFlags: (isScan ? ReadFlags.None : ReadFlags.CopyReadsToTail));
}
public IDisposable TrackTemporarySession(ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key, Value, EffectTracker, Output, object>> session)
@ -168,14 +212,17 @@ namespace DurableTask.Netherite.Faster
}
}
string RandomSuffix() => Guid.NewGuid().ToString().Substring(0, 5);
public LogAccessor<Key, Value> Log => this.fht?.Log;
public override void InitMainSession()
{
this.singletons = new TrackedObject[TrackedObjectKey.NumberSingletonTypes];
this.mainSession = this.CreateASession($"main-{this.RandomSuffix()}", false);
string suffix = DateTime.UtcNow.ToString("O");
this.mainSession = this.CreateASession($"main-{suffix}", false);
for (int i = 0; i < this.querySessions.Length; i++)
{
this.querySessions[i] = this.CreateASession($"query{i:D2}-{suffix}", true);
}
this.cacheTracker.MeasureCacheSize(true);
this.CheckInvariants();
}
@ -200,7 +247,11 @@ namespace DurableTask.Netherite.Faster
// recover Faster
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV");
await this.fht.RecoverAsync(this.partition.Settings.FasterTuningParameters?.NumPagesToPreload ?? 1, true, -1, this.terminationToken);
this.mainSession = this.CreateASession($"main-{this.RandomSuffix()}", false);
this.mainSession = this.CreateASession($"main-{this.blobManager.IncarnationTimestamp:o}", false);
for (int i = 0; i < this.querySessions.Length; i++)
{
this.querySessions[i] = this.CreateASession($"query{i:D2}-{this.blobManager.IncarnationTimestamp:o}", true);
}
this.cacheTracker.MeasureCacheSize(true);
this.CheckInvariants();
@ -290,7 +341,7 @@ namespace DurableTask.Netherite.Faster
{
Task<Task> tasktask = new Task<Task>(() => asyncAction());
var thread = TrackedThreads.MakeTrackedThread(RunTask, name);
void RunTask() {
try
{
@ -406,18 +457,18 @@ namespace DurableTask.Netherite.Faster
return this.fht.Log.BeginAddress + compactionAreaSize;
}
else
{
{
this.TraceHelper.FasterCompactionProgress(
FasterTraceHelper.CompactionProgress.Skipped,
"",
"",
this.Log.BeginAddress,
this.Log.SafeReadOnlyAddress,
this.Log.TailAddress,
this.Log.TailAddress,
minimalLogSize,
compactionAreaSize,
this.GetElapsedCompactionMilliseconds());
return null;
return null;
}
}
@ -425,7 +476,7 @@ namespace DurableTask.Netherite.Faster
public override async Task<long> RunCompactionAsync(long target)
{
string id = this.RandomSuffix(); // for tracing purposes
string id = DateTime.UtcNow.ToString("O"); // for tracing purposes
await maxCompactionThreads.WaitAsync();
try
{
@ -442,7 +493,7 @@ namespace DurableTask.Netherite.Faster
this.GetElapsedCompactionMilliseconds());
var tcs = new TaskCompletionSource<long>(TaskCreationOptions.RunContinuationsAsynchronously);
var thread = TrackedThreads.MakeTrackedThread(RunCompaction, $"Compaction.{id}");
var thread = TrackedThreads.MakeTrackedThread(RunCompaction, $"Compaction.{id}");
thread.Start();
return await tcs.Task;
@ -459,8 +510,8 @@ namespace DurableTask.Netherite.Faster
this.TraceHelper.FasterCompactionProgress(
FasterTraceHelper.CompactionProgress.Completed,
id,
compactedUntil,
id,
compactedUntil,
this.Log.SafeReadOnlyAddress,
this.Log.TailAddress,
this.MinimalLogSize,
@ -493,8 +544,31 @@ namespace DurableTask.Netherite.Faster
{
try
{
var orchestrationStates = this.ScanOrchestrationStates(effectTracker, queryEvent);
await effectTracker.ProcessQueryResultAsync(queryEvent, orchestrationStates);
this.terminationToken.ThrowIfCancellationRequested();
DateTime attempt = DateTime.UtcNow;
IAsyncEnumerable<OrchestrationState> orchestrationStates;
var stats = (StatsState)this.singletons[(int)TrackedObjectKey.Stats.ObjectType];
if (stats.HasInstanceIds)
{
orchestrationStates = this.QueryEnumeratedStates(
effectTracker,
queryEvent,
stats.GetEnumerator(queryEvent.InstanceQuery.InstanceIdPrefix, queryEvent.ContinuationToken),
queryEvent.PageSize,
TimeSpan.FromSeconds(15),
attempt);
}
else
{
orchestrationStates = this.ScanOrchestrationStates(effectTracker, queryEvent, attempt);
}
// process the stream of results, and any exceptions or cancellations
await effectTracker.ProcessQueryResultAsync(queryEvent, orchestrationStates, attempt);
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
@ -523,7 +597,7 @@ namespace DurableTask.Netherite.Faster
if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold)
{
this.blobManager.TraceHelper.FasterProgress(
$"FasterKV PrefetchSession {sessionId} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s issued={numberIssued} pending={maxConcurrency-prefetchSemaphore.CurrentCount} hits={numberHits} misses={numberMisses}");
$"FasterKV PrefetchSession {sessionId} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s issued={numberIssued} pending={maxConcurrency - prefetchSemaphore.CurrentCount} hits={numberHits} misses={numberMisses}");
lastReport = stopwatch.ElapsedMilliseconds;
}
}
@ -531,7 +605,7 @@ namespace DurableTask.Netherite.Faster
try
{
// these are disposed after the prefetch thread is done
var prefetchSession = this.CreateASession($"prefetch-{this.RandomSuffix()}", false);
var prefetchSession = this.CreateASession($"prefetch-{DateTime.UtcNow:O}", false);
using (this.TrackTemporarySession(prefetchSession))
{
@ -792,18 +866,201 @@ namespace DurableTask.Netherite.Faster
return default;
}
IAsyncEnumerable<OrchestrationState> ScanOrchestrationStates(
async IAsyncEnumerable<OrchestrationState> QueryEnumeratedStates(
EffectTracker effectTracker,
PartitionQueryEvent queryEvent)
PartitionQueryEvent queryEvent,
IEnumerator<string> enumerator,
int pageSize,
TimeSpan pageTime,
DateTime attempt
)
{
var instanceQuery = queryEvent.InstanceQuery;
string queryId = queryEvent.EventIdString;
int? pageLimit = pageSize > 0 ? pageSize : null;
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration from {queryEvent.ContinuationToken} with pageLimit={(pageLimit.HasValue ? pageLimit.ToString() : "none")} pageTime={pageTime}");
Stopwatch stopwatch = Stopwatch.StartNew();
var channel = Channel.CreateBounded<(bool last, ValueTask<FasterKV<Key, Value>.ReadAsyncResult<EffectTracker, Output, object>> responseTask)>(200);
using var leftToFill = new SemaphoreSlim(pageLimit.HasValue ? pageLimit.Value : 100);
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(this.partition.ErrorHandler.Token);
var cancellationToken = cancellationTokenSource.Token;
Task readIssueLoop = Task.Run(ReadIssueLoop);
async Task ReadIssueLoop()
{
try
{
while (enumerator.MoveNext())
{
if (!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix)
&& !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix))
{
// the instance does not match the prefix
continue;
}
while (!await leftToFill.WaitAsync(2000, cancellationToken))
{
if (stopwatch.Elapsed > pageTime)
{
// stop issuing more reads and just finish and return what we have so far
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration finished because of time limit");
channel.Writer.Complete();
return;
}
}
await channel.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false);
var readTask = this.ReadOnQuerySessionAsync(enumerator.Current, cancellationToken);
await channel.Writer.WriteAsync((false, readTask), cancellationToken).ConfigureAwait(false);
}
await channel.Writer.WriteAsync((true, default), cancellationToken).ConfigureAwait(false); // marks end of index
channel.Writer.Complete();
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration finished because it reached end");
}
catch (OperationCanceledException)
{
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration cancelled");
channel.Writer.TryComplete();
}
catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
{
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration cancelled due to partition termination");
channel.Writer.TryComplete();
}
catch (Exception e)
{
this.partition.EventTraceHelper.TraceEventProcessingWarning($"query {queryId} attempt {attempt:o} enumeration failed with exception {e}");
channel.Writer.TryComplete();
}
}
long scanned = 0;
long found = 0;
long matched = 0;
long lastReport;
string position = queryEvent.ContinuationToken ?? "";
void ReportProgress(string status)
{
this.partition.EventTraceHelper.TraceEventProcessingDetail(
$"query {queryId} attempt {attempt:o} {status} position={position} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s scanned={scanned} found={found} matched={matched}");
lastReport = stopwatch.ElapsedMilliseconds;
}
ReportProgress("start");
while (await channel.Reader.WaitToReadAsync(this.partition.ErrorHandler.Token).ConfigureAwait(false))
{
while (channel.Reader.TryRead(out var item))
{
if (item.last)
{
ReportProgress("completed");
yield return null;
goto done;
}
if (stopwatch.ElapsedMilliseconds - lastReport > 5000)
{
ReportProgress("underway");
}
OrchestrationState orchestrationState = null;
try
{
var response = await item.responseTask.ConfigureAwait(false);
(Status status, Output output) = response.Complete();
scanned++;
if (status.NotFound)
{
// because we are running concurrently, the index can be out of sync with the actual store
leftToFill.Release();
continue;
}
this.partition.Assert(status.Found, "FASTER did not complete the read");
var instanceState = (InstanceState)output.Read(this, queryId);
found++;
//this.partition.EventDetailTracer?.TraceEventProcessingDetail($"found instance {enumerator.Current}");
// reading the orchestrationState may race with updating the orchestration state
// but it is benign because the OrchestrationState object is immutable
orchestrationState = instanceState?.OrchestrationState;
}
catch (OperationCanceledException)
{
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} cancelled");
goto done;
}
catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
{
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} cancelled due to partition termination");
cancellationTokenSource.Cancel();
goto done;
}
catch (Exception e)
{
this.partition.EventTraceHelper.TraceEventProcessingWarning($"query {queryId} attempt {attempt:o} enumeration failed with exception {e}");
cancellationTokenSource.Cancel();
goto done;
}
if (orchestrationState != null && instanceQuery.Matches(orchestrationState))
{
matched++;
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"match instance {enumerator.Current}");
yield return orchestrationState;
position = orchestrationState.OrchestrationInstance.InstanceId;
if (pageLimit.HasValue)
{
if (matched >= pageLimit.Value)
{
cancellationTokenSource.Cancel();
break;
}
}
else
{
leftToFill.Release();
}
}
else
{
leftToFill.Release();
}
}
}
ReportProgress("completed-page");
done:
cancellationTokenSource.Cancel();
await readIssueLoop;
yield break;
}
IAsyncEnumerable<OrchestrationState> ScanOrchestrationStates(
EffectTracker effectTracker,
PartitionQueryEvent queryEvent,
DateTime attempt)
{
var instanceQuery = queryEvent.InstanceQuery;
string queryId = queryEvent.EventIdString;
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"starting query {queryId}");
// we use a separate thread to iterate, since Faster can iterate synchronously only at the moment
// and we don't want it to block thread pool worker threads
var channel = Channel.CreateBounded<OrchestrationState>(500);
var scanThread = TrackedThreads.MakeTrackedThread(RunScan, $"QueryScan-{queryId}");
var scanThread = TrackedThreads.MakeTrackedThread(RunScan, $"QueryScan-{queryId}-{attempt:o}");
scanThread.Start();
// read from channel until the channel is completed, or an exception is encountered
@ -814,7 +1071,7 @@ namespace DurableTask.Netherite.Faster
try
{
using var _ = EventTraceContext.MakeContext(0, queryId);
var session = this.CreateASession($"scan-{queryId}-{this.RandomSuffix()}", true);
var session = this.CreateASession($"scan-{queryId}-{attempt:o}", true);
using (this.TrackTemporarySession(session))
{
// get the unique set of keys appearing in the log and emit them
@ -826,10 +1083,10 @@ namespace DurableTask.Netherite.Faster
long deserialized = 0;
long matched = 0;
long lastReport;
void ReportProgress()
void ReportProgress(string status)
{
this.partition.EventDetailTracer?.TraceEventProcessingDetail(
$"query {queryId} scan position={iter1.CurrentAddress} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s scanned={scanned} deserialized={deserialized} matched={matched}");
this.partition.EventTraceHelper.TraceEventProcessingDetail(
$"query {queryId} attempt {attempt:o} scan {status} position={iter1.CurrentAddress} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s scanned={scanned} deserialized={deserialized} matched={matched}");
lastReport = stopwatch.ElapsedMilliseconds;
if (queryEvent.TimeoutUtc.HasValue && DateTime.UtcNow > queryEvent.TimeoutUtc.Value)
@ -838,13 +1095,13 @@ namespace DurableTask.Netherite.Faster
}
}
ReportProgress();
ReportProgress("starting");
while (iter1.GetNext(out RecordInfo recordInfo, out Key key, out Value val) && !recordInfo.Tombstone)
{
if (stopwatch.ElapsedMilliseconds - lastReport > 5000)
{
ReportProgress();
ReportProgress("underway");
}
if (key.Val.ObjectType == TrackedObjectKey.TrackedObjectType.Instance)
@ -880,7 +1137,7 @@ namespace DurableTask.Netherite.Faster
{
matched++;
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"match instance {key.Val.InstanceId}");
//this.partition.EventDetailTracer?.TraceEventProcessingDetail($"match instance {key.Val.InstanceId}");
var task = channel.Writer.WriteAsync(orchestrationState);
@ -893,26 +1150,31 @@ namespace DurableTask.Netherite.Faster
}
}
ReportProgress();
ReportProgress("completed");
var task1 = channel.Writer.WriteAsync(null);
if (!task1.IsCompleted)
{
task1.AsTask().Wait();
}
channel.Writer.Complete();
}
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"finished query {queryId}");
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
{
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"cancelled query {queryId} due to partition termination");
this.partition.EventTraceHelper.TraceEventProcessingWarning($"cancelled query {queryId} attempt {attempt:o} scan due to partition termination");
channel.Writer.TryComplete(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken));
}
catch (TimeoutException e)
{
this.partition.EventTraceHelper.TraceEventProcessingWarning($"query {queryId} timed out");
this.partition.EventTraceHelper.TraceEventProcessingWarning($"query {queryId} attempt {attempt:o} scan timed out");
channel.Writer.TryComplete(e);
}
catch (Exception e)
{
this.partition.EventTraceHelper.TraceEventProcessingWarning($"query {queryId} failed with exception {e}");
this.partition.EventTraceHelper.TraceEventProcessingWarning($"query {queryId} attempt {attempt:o} scan failed with exception {e}");
channel.Writer.TryComplete(e);
}
}
@ -931,7 +1193,7 @@ namespace DurableTask.Netherite.Faster
emitItem(key, singleton);
}
var session = this.CreateASession($"emitCurrentState-{this.RandomSuffix()}", true);
var session = this.CreateASession($"emitCurrentState-{DateTime.UtcNow:O}", true);
using (this.TrackTemporarySession(session))
{
// iterate histories
@ -1000,7 +1262,6 @@ namespace DurableTask.Netherite.Faster
{
try
{
long trackedSizeBefore = 0;
long totalSize = 0;
Dictionary<TrackedObjectKey, List<(long delta, long address, string desc)>> perKey = null;

Просмотреть файл

@ -76,7 +76,7 @@ namespace DurableTask.Netherite.Faster
return cloudBlobContainer.GetBlockBlobReference("taskhubparameters.json");
}
this.traceHelper.TraceProgress("Creating LoadMonitor Service");
this.traceHelper.TraceProgress("Creating LoadPublisher Service");
if (!string.IsNullOrEmpty(settings.LoadInformationAzureTableName))
{
this.LoadPublisher = new AzureTableLoadPublisher(settings.TableStorageConnection, settings.LoadInformationAzureTableName, settings.HubName);

Просмотреть файл

@ -193,7 +193,7 @@ namespace DurableTask.Netherite.Faster
}
// restart pending actitivities, timers, work items etc.
this.storeWorker.RestartThingsAtEndOfRecovery(inputQueueFingerprint);
this.storeWorker.RestartThingsAtEndOfRecovery(inputQueueFingerprint, this.blobManager.IncarnationTimestamp);
this.TraceHelper.FasterProgress("Recovery complete");
}
@ -209,7 +209,7 @@ namespace DurableTask.Netherite.Faster
internal void CheckForStuckWorkers(object _)
{
TimeSpan limit = TimeSpan.FromMinutes(1);
TimeSpan limit = Debugger.IsAttached ? TimeSpan.FromMinutes(30) : TimeSpan.FromMinutes(1);
// check if any of the workers got stuck in a processing loop
Check("StoreWorker", this.storeWorker.ProcessingBatchSince);

Просмотреть файл

@ -85,7 +85,7 @@ namespace DurableTask.Netherite.Faster
foreach (var key in TrackedObjectKey.GetSingletons())
{
var target = await this.store.CreateAsync(key);
target.OnFirstInitialization();
target.OnFirstInitialization(this.partition);
}
this.lastCheckpointedCommitLogPosition = this.CommitLogPosition;
@ -582,7 +582,7 @@ namespace DurableTask.Netherite.Faster
this.traceHelper.FasterLogReplayed(this.CommitLogPosition, this.InputQueuePosition, this.numberEventsSinceLastCheckpoint, this.CommitLogPosition - startPosition, this.store.StoreStats.Get(), stopwatch.ElapsedMilliseconds);
}
public void RestartThingsAtEndOfRecovery(string inputQueueFingerprint)
public void RestartThingsAtEndOfRecovery(string inputQueueFingerprint, DateTime incarnationTimestamp)
{
bool queueChange = (this.InputQueueFingerprint != inputQueueFingerprint);
@ -599,8 +599,9 @@ namespace DurableTask.Netherite.Faster
{
PartitionId = this.partition.PartitionId,
RecoveredPosition = this.CommitLogPosition,
Timestamp = DateTime.UtcNow,
Timestamp = incarnationTimestamp,
WorkerId = this.partition.Settings.WorkerId,
KeepInstanceIdsInMemory = this.partition.Settings.KeepInstanceIdsInMemory,
ChangedFingerprint = queueChange ? inputQueueFingerprint : null,
};

Просмотреть файл

@ -64,7 +64,7 @@ namespace DurableTask.Netherite
if (trackedObject.Key.IsSingleton)
{
trackedObject.OnFirstInitialization();
trackedObject.OnFirstInitialization(partition);
}
}
@ -91,15 +91,20 @@ namespace DurableTask.Netherite
return result;
}
IList<OrchestrationState> QueryOrchestrationStates(InstanceQuery query)
IList<OrchestrationState> QueryOrchestrationStates(InstanceQuery query, int pageSize, string continuationToken)
{
return this.trackedObjects
.Values
.Select(trackedObject => trackedObject as InstanceState)
.Select(instanceState => instanceState?.OrchestrationState)
.Where(orchestrationState => orchestrationState != null
.Where(orchestrationState =>
orchestrationState != null
&& orchestrationState.OrchestrationInstance.InstanceId.CompareTo(continuationToken) > 0
&& (query == null || query.Matches(orchestrationState)))
.OrderBy(orchestrationState => orchestrationState.OrchestrationInstance.InstanceId)
.Select(orchestrationState => orchestrationState.ClearFieldsImmutably(!query.FetchInput, false))
.Append(null)
.Take(pageSize == 0 ? int.MaxValue : pageSize)
.ToList();
}
@ -140,8 +145,8 @@ namespace DurableTask.Netherite
break;
case PartitionQueryEvent queryEvent:
var instances = this.QueryOrchestrationStates(queryEvent.InstanceQuery);
var backgroundTask = Task.Run(() => this.effects.ProcessQueryResultAsync(queryEvent, instances.ToAsyncEnumerable()));
var instances = this.QueryOrchestrationStates(queryEvent.InstanceQuery, queryEvent.PageSize, queryEvent.ContinuationToken ?? "");
var backgroundTask = Task.Run(() => this.effects.ProcessQueryResultAsync(queryEvent, instances.ToAsyncEnumerable(), DateTime.UtcNow));
break;
default:

Просмотреть файл

@ -17,6 +17,8 @@ namespace DurableTask.Netherite
readonly LogLevel logLevelLimit;
readonly string tracePrefix;
public LogLevel LogLevelLimit => this.logLevelLimit;
public ClientTraceHelper(ILoggerFactory loggerFactory, LogLevel logLevelLimit, string storageAccountName, string taskHubName, Guid clientId)
{
this.logger = loggerFactory.CreateLogger($"{NetheriteOrchestrationService.LoggerCategoryName}.Client");
@ -87,6 +89,22 @@ namespace DurableTask.Netherite
}
}
public void TraceQueryProgress(string clientQueryId, string queryId, uint partitionId, TimeSpan elapsed, int pageSize, int count, string continuationToken)
{
if (this.logLevelLimit <= LogLevel.Debug)
{
continuationToken = continuationToken ?? "null";
if (this.logger.IsEnabled(LogLevel.Debug))
{
this.logger.LogDebug("{client} Query {clientQueryId} received response {queryId} from partition {partitionId:D2} elapsed={elapsedSeconds:F2}s pageSize={pageSize} count={count} continuationToken={continuationToken} ", this.tracePrefix, clientQueryId, queryId, partitionId, elapsed.TotalSeconds, pageSize, count, continuationToken);
}
if (EtwSource.Log.IsEnabled())
{
EtwSource.Log.ClientQueryProgress(this.account, this.taskHub, this.clientId, clientQueryId, queryId, partitionId, elapsed.TotalSeconds, pageSize, count, continuationToken, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}
}
public void TraceSend(PartitionEvent @event)
{
if (this.logLevelLimit <= LogLevel.Debug)
@ -102,17 +120,19 @@ namespace DurableTask.Netherite
}
}
public void TraceReceive(ClientEvent @event)
public enum ResponseType { Fragment, Partial, Obsolete, Response };
public void TraceReceive(ClientEvent @event, ResponseType status)
{
if (this.logLevelLimit <= LogLevel.Debug)
{
if (this.logger.IsEnabled(LogLevel.Debug))
{
this.logger.LogDebug("{client} Processing event {eventId}: {event}", this.tracePrefix, @event.EventIdString, @event);
this.logger.LogDebug("{client} Processing event id={eventId} {status}: {event}", this.tracePrefix, @event.EventIdString, status, @event);
}
if (EtwSource.Log.IsEnabled())
{
EtwSource.Log.ClientReceivedEvent(this.account, this.taskHub, this.clientId, @event.EventIdString, @event.ToString(), TraceUtils.AppName, TraceUtils.ExtensionVersion);
EtwSource.Log.ClientReceivedEvent(this.account, this.taskHub, this.clientId, @event.EventIdString, status.ToString(), @event.ToString(), TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}
}

Просмотреть файл

@ -134,6 +134,13 @@ namespace DurableTask.Netherite
this.WriteEvent(217, Account, TaskHub, ClientId, PartitionEventId, PartitionId, AppName, ExtensionVersion);
}
[Event(218, Level = EventLevel.Verbose, Version = 2)]
public void ClientQueryProgress(string Account, string TaskHub, Guid ClientId, string ClientQueryId, string QueryId, uint PartitionId, double ElapsedSeconds, int PageSize, int Count, string ContinuationToken, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(218, Account, TaskHub, ClientId, ClientQueryId, QueryId, PartitionId, ElapsedSeconds, PageSize, Count, ContinuationToken, AppName, ExtensionVersion);
}
// ----- specific events relating to DurableTask concepts (TaskMessage, OrchestrationWorkItem, Instance)
[Event(220, Level = EventLevel.Verbose, Version = 1)]
@ -229,11 +236,11 @@ namespace DurableTask.Netherite
this.WriteEvent(242, Account, TaskHub, PartitionId, CommitLogPosition, PartitionEventId, Details, AppName, ExtensionVersion);
}
[Event(243, Level = EventLevel.Verbose, Version = 1)]
public void ClientReceivedEvent(string Account, string TaskHub, Guid ClientId, string PartitionEventId, string EventInfo, string AppName, string ExtensionVersion)
[Event(243, Level = EventLevel.Verbose, Version = 2)]
public void ClientReceivedEvent(string Account, string TaskHub, Guid ClientId, string PartitionEventId, string status, string EventInfo, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(243, Account, TaskHub, ClientId, PartitionEventId, EventInfo, AppName, ExtensionVersion);
this.WriteEvent(243, Account, TaskHub, ClientId, PartitionEventId, status, EventInfo, AppName, ExtensionVersion);
}
[Event(244, Level = EventLevel.Verbose, Version = 1)]

Просмотреть файл

@ -22,15 +22,11 @@ namespace DurableTask.Netherite.EventHubsTransport
readonly string eventHubName;
readonly string eventHubPartition;
readonly TimeSpan backoff = TimeSpan.FromSeconds(5);
const int maxFragmentSize = 500 * 1024; // account for very non-optimal serialization of event
int maxMessageSize = 900 * 1024; // we keep this slightly below the official limit since we have observed exceptions
int maxFragmentSize => this.maxMessageSize / 2; // we keep this lower than maxMessageSize because of serialization overhead
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();
// we manually set the max message size to leave extra room as
// we have observed exceptions in practice otherwise.
readonly BatchOptions batchOptions = new BatchOptions() { MaxMessageSize = 900 * 1024 };
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
{
@ -53,10 +49,12 @@ namespace DurableTask.Netherite.EventHubsTransport
var sentSuccessfully = -1;
var maybeSent = -1;
Exception senderException = null;
EventDataBatch CreateBatch() => this.sender.CreateBatch(new BatchOptions() { MaxMessageSize = maxMessageSize });
try
{
var batch = this.sender.CreateBatch(this.batchOptions);
var batch = CreateBatch();
async Task SendBatch(int lastPosition)
{
@ -79,7 +77,7 @@ namespace DurableTask.Netherite.EventHubsTransport
int length = (int)(this.stream.Position - startPos);
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), (int)startPos, length);
var eventData = new EventData(arraySegment);
bool tooBig = length > maxFragmentSize;
bool tooBig = length > this.maxFragmentSize;
if (!tooBig && batch.TryAdd(eventData))
{
@ -94,14 +92,14 @@ namespace DurableTask.Netherite.EventHubsTransport
await SendBatch(i - 1);
// create a fresh batch
batch = this.sender.CreateBatch(this.batchOptions);
batch = CreateBatch();
}
if (tooBig)
{
// the message is too big. Break it into fragments, and send each individually.
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} fragmenting large event ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
var fragments = FragmentationAndReassembly.Fragment(arraySegment, evt, maxFragmentSize);
var fragments = FragmentationAndReassembly.Fragment(arraySegment, evt, this.maxFragmentSize);
maybeSent = i;
for (int k = 0; k < fragments.Count; k++)
{
@ -135,6 +133,12 @@ namespace DurableTask.Netherite.EventHubsTransport
this.stream.Seek(0, SeekOrigin.Begin);
}
}
catch(Microsoft.Azure.EventHubs.MessageSizeExceededException)
{
this.maxMessageSize = 200 * 1024;
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send due to message size, reducing to {maxMessageSize}kB",
this.eventHubName, this.eventHubPartition, this.maxMessageSize / 1024);
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);

Просмотреть файл

@ -360,15 +360,15 @@ namespace DurableTask.Netherite.EventHubsTransport
{
try
{
this.traceHelper.LogDebug("Client{clientId}.ch{index} establishing connection", Client.GetShortId(this.ClientId), index);
this.traceHelper.LogDebug("Client.{clientId}.ch{index} establishing connection", Client.GetShortId(this.ClientId), index);
// receive a dummy packet to establish connection
// (the packet, if any, cannot be for this receiver because it is fresh)
await receiver.ReceiveAsync(1, TimeSpan.FromMilliseconds(1));
this.traceHelper.LogDebug("Client{clientId}.ch{index} connection established", Client.GetShortId(this.ClientId), index);
this.traceHelper.LogDebug("Client.{clientId}.ch{index} connection established", Client.GetShortId(this.ClientId), index);
}
catch (Exception exception)
{
this.traceHelper.LogError("Client{clientId}.ch{index} could not connect: {exception}", Client.GetShortId(this.ClientId), index, exception);
this.traceHelper.LogError("Client.{clientId}.ch{index} could not connect: {exception}", Client.GetShortId(this.ClientId), index, exception);
throw;
}
}
@ -403,7 +403,7 @@ namespace DurableTask.Netherite.EventHubsTransport
}
// if we lose access to storage temporarily, we back off, but don't quit
this.traceHelper.LogError("Client{clientId}.ch{index} backing off for {backoffDelay} after error in receive loop: {exception}", Client.GetShortId(this.ClientId), index, backoffDelay, exception);
this.traceHelper.LogError("Client.{clientId}.ch{index} backing off for {backoffDelay} after error in receive loop: {exception}", Client.GetShortId(this.ClientId), index, backoffDelay, exception);
await Task.Delay(backoffDelay);
@ -419,17 +419,18 @@ namespace DurableTask.Netherite.EventHubsTransport
try
{
this.traceHelper.LogDebug("Client{clientId}.ch{index} received packet #{seqno} ({size} bytes)", Client.GetShortId(this.ClientId), index, ed.SystemProperties.SequenceNumber, ed.Body.Count);
this.traceHelper.LogDebug("Client.{clientId}.ch{index} received packet #{seqno} ({size} bytes)", Client.GetShortId(this.ClientId), index, ed.SystemProperties.SequenceNumber, ed.Body.Count);
Packet.Deserialize(ed.Body, out clientEvent, taskHubGuid);
clientEvent.ReceiveChannel = index;
if (clientEvent != null && clientEvent.ClientId == this.ClientId)
{
this.traceHelper.LogTrace("Client{clientId}.ch{index} receiving event {evt} id={eventId}]", Client.GetShortId(this.ClientId), index, clientEvent, clientEvent.EventIdString);
this.traceHelper.LogTrace("Client.{clientId}.ch{index} receiving event {evt} id={eventId}]", Client.GetShortId(this.ClientId), index, clientEvent, clientEvent.EventIdString);
await channelWriter.WriteAsync(clientEvent, this.shutdownSource.Token);
}
}
catch (Exception)
{
this.traceHelper.LogError("Client{clientId}.ch{index} could not deserialize packet #{seqno} ({size} bytes)", Client.GetShortId(this.ClientId), index, ed.SystemProperties.SequenceNumber, ed.Body.Count);
this.traceHelper.LogError("Client.{clientId}.ch{index} could not deserialize packet #{seqno} ({size} bytes)", Client.GetShortId(this.ClientId), index, ed.SystemProperties.SequenceNumber, ed.Body.Count);
}
}
}
@ -445,11 +446,11 @@ namespace DurableTask.Netherite.EventHubsTransport
}
catch (Exception exception)
{
this.traceHelper.LogError("Client{clientId}.ch{index} event processing exception: {exception}", Client.GetShortId(this.ClientId), index, exception);
this.traceHelper.LogError("Client.{clientId}.ch{index} event processing exception: {exception}", Client.GetShortId(this.ClientId), index, exception);
}
finally
{
this.traceHelper.LogInformation("Client{clientId}.ch{index} event processing terminated", Client.GetShortId(this.ClientId), index);
this.traceHelper.LogInformation("Client.{clientId}.ch{index} event processing terminated", Client.GetShortId(this.ClientId), index);
}
}
@ -469,11 +470,11 @@ namespace DurableTask.Netherite.EventHubsTransport
}
catch(Exception exception)
{
this.traceHelper.LogError("Client{clientId} event processing exception: {exception}", Client.GetShortId(this.ClientId), exception);
this.traceHelper.LogError("Client.{clientId} event processing exception: {exception}", Client.GetShortId(this.ClientId), exception);
}
finally
{
this.traceHelper.LogInformation("Client{clientId} event processing terminated", Client.GetShortId(this.ClientId));
this.traceHelper.LogInformation("Client.{clientId} event processing terminated", Client.GetShortId(this.ClientId));
}
}
}

Просмотреть файл

@ -24,24 +24,24 @@ namespace DurableTask.Netherite.SingleHostTransport
}
protected override Task Process(IList<ClientEvent> batch)
{
try
{
foreach (var evt in batch)
{
foreach (var evt in batch)
try
{
this.Client.Process(evt);
DurabilityListeners.ConfirmDurable(evt);
}
}
catch (System.Threading.Tasks.TaskCanceledException)
{
// this is normal during shutdown
}
catch (Exception e)
{
this.Client.ReportTransportError(nameof(ClientQueue), e);
}
catch (System.Threading.Tasks.TaskCanceledException)
{
// this is normal during shutdown
}
catch (Exception e)
{
this.Client.ReportTransportError(nameof(ClientQueue), e);
}
}
return Task.CompletedTask;
}
}

Просмотреть файл

@ -82,7 +82,7 @@ namespace DurableTask.Netherite.SingleHostTransport
{
if (!this.isShuttingDown && this.testHooks?.FaultInjectionActive != true)
{
this.testHooks.Error("MemoryTransport", "Unexpected partition termination");
this.testHooks?.Error("MemoryTransport", "Unexpected partition termination");
}
this.Notify();
};

Просмотреть файл

@ -101,6 +101,12 @@ namespace DurableTask.Netherite
return true; // empirically observed
}
// Empirically observed transient cancellation exceptions that are not application initiated
if (exception is OperationCanceledException && !token.IsCancellationRequested)
{
return true;
}
return false;
}

Просмотреть файл

@ -20,6 +20,8 @@ namespace DurableTask.Netherite
byte[] Bytes { get; }
bool IsLast { get; }
int Fragment { get; }
}
public static List<IEventFragment> Fragment(ArraySegment<byte> segment, Event original, int maxFragmentSize)
@ -52,6 +54,8 @@ namespace DurableTask.Netherite
{
PartitionId = partitionEvent.PartitionId,
OriginalEventId = original.EventId,
Timeout = (partitionEvent as ClientRequestEvent)?.TimeoutUtc,
DedupPosition = (partitionEvent as PartitionMessageEvent)?.DedupPositionForFragments,
Bytes = new ArraySegment<byte>(segment.Array, offset, portion).ToArray(),
Fragment = count++,
IsLast = (portion == length),
@ -72,13 +76,16 @@ namespace DurableTask.Netherite
return evt;
}
public static TEvent Reassemble<TEvent>(IEnumerable<IEventFragment> earlierFragments, IEventFragment lastFragment) where TEvent: Event
public static TEvent Reassemble<TEvent>(IEnumerable<IEventFragment> earlierFragments, IEventFragment lastFragment, Partition partition) where TEvent: Event
{
using (var stream = new MemoryStream())
{
int position = 0;
foreach (var x in earlierFragments)
{
partition.Assert(position == x.Fragment, "bad fragment sequence: position");
stream.Write(x.Bytes, 0, x.Bytes.Length);
position++;
}
stream.Write(lastFragment.Bytes, 0, lastFragment.Bytes.Length);
stream.Seek(0, SeekOrigin.Begin);

Просмотреть файл

@ -41,6 +41,7 @@ namespace DurableTask.Netherite.Tests
LoadMonitorLogLevelLimit = LogLevel.Trace,
PartitionCount = 6,
ThrowExceptionOnInvalidDedupeStatus = true,
KeepInstanceIdsInMemory = true,
TakeStateCheckpointWhenStoppingPartition = true, // set to false for testing recovery from log
UseAlternateObjectStore = false, // set to true to bypass FasterKV; default is false
IdleCheckpointFrequencyMs = 1000000000, // set this low for testing frequent checkpointing

Просмотреть файл

@ -19,6 +19,7 @@ namespace PerformanceTests
using System.Threading;
using System.Net.Http;
using DurableTask.Core.Stats;
using System.Diagnostics;
/// <summary>
/// operations for starting, awaiting, counting, or purging large numbers of orchestration instances
@ -184,7 +185,7 @@ namespace PerformanceTests
int completed = 0;
int pending = 0;
int running = 0;
int other = 0;
int notfound = 0;
long earliestStart = long.MaxValue;
long latestUpdate = 0;
@ -192,8 +193,7 @@ namespace PerformanceTests
object lockForUpdate = new object();
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
var stopwatch = Stopwatch.StartNew();
var tasks = new List<Task<bool>>();
@ -207,7 +207,7 @@ namespace PerformanceTests
{
if (status == null)
{
other++;
notfound++;
}
else
{
@ -229,12 +229,15 @@ namespace PerformanceTests
}
else
{
other++;
notfound++;
}
}
}
});
stopwatch.Stop();
double querySeconds = stopwatch.ElapsedMilliseconds / 1000.0;
double elapsedSeconds = 0;
if (gotTimeRange)
@ -247,7 +250,8 @@ namespace PerformanceTests
completed,
running,
pending,
other,
notfound,
querySeconds,
elapsedSeconds,
};
@ -276,6 +280,10 @@ namespace PerformanceTests
long earliestStart = long.MaxValue;
long latestUpdate = 0;
var stopwatch = Stopwatch.StartNew();
var receivedInstanceIds = new HashSet<string>();
do
{
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(queryCondition, CancellationToken.None);
@ -300,12 +308,21 @@ namespace PerformanceTests
other++;
}
bool isFresh = receivedInstanceIds.Add(status.InstanceId);
if (!isFresh)
{
throw new InvalidDataException($"received duplicate instance id: {status.InstanceId}");
}
earliestStart = Math.Min(earliestStart, status.CreatedTime.Ticks);
latestUpdate = Math.Max(latestUpdate, status.LastUpdatedTime.Ticks);
}
} while (queryCondition.ContinuationToken != null);
stopwatch.Stop();
double querySeconds = stopwatch.ElapsedMilliseconds / 1000.0;
double elapsedSeconds = 0;
if (completed + pending + running + other > 0)
@ -319,6 +336,7 @@ namespace PerformanceTests
running,
pending,
other,
querySeconds,
elapsedSeconds,
};
@ -333,38 +351,63 @@ namespace PerformanceTests
public static async Task<IActionResult> Purge(HttpRequest req, IDurableClient client, ILogger log, string prefix)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
int numberOrchestrations = int.Parse(requestBody);
DateTime deadline = DateTime.UtcNow + TimeSpan.FromMinutes(5);
// wait for the specified number of orchestration instances to complete
try
if (string.Compare(requestBody.Trim(), "all", true) == 0)
{
log.LogWarning($"Purging {numberOrchestrations} orchestration instances...");
var stopwatch = Stopwatch.StartNew();
var all = Enum.GetValues<DurableTask.Core.OrchestrationStatus>().ToArray();
PurgeHistoryResult result = await client.PurgeInstanceHistoryAsync(DateTime.MinValue, null, all);
int purged = result.InstancesDeleted;
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
stopwatch.Stop();
double querySec = stopwatch.ElapsedMilliseconds / 1000.0;
var tasks = new List<Task<bool>>();
int deleted = 0;
// start all the orchestrations
await Enumerable.Range(0, numberOrchestrations).ParallelForEachAsync(200, true, async (iteration) =>
var resultObject = new
{
var orchestrationInstanceId = InstanceId(prefix, iteration);
var response = await client.PurgeInstanceHistoryAsync(orchestrationInstanceId);
Interlocked.Add(ref deleted, response.InstancesDeleted);
});
purged,
querySec,
throughput = purged > 0 ? (purged / querySec).ToString("F2") : "n/a",
};
return new OkObjectResult(
deleted == numberOrchestrations
? $"all {numberOrchestrations} orchestration instances purged.\n"
: $"only {deleted}/{numberOrchestrations} orchestration instances purged.\n");
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
}
catch (Exception e)
else
{
return new ObjectResult(new { error = e.ToString() });
int numberOrchestrations = int.Parse(requestBody);
DateTime deadline = DateTime.UtcNow + TimeSpan.FromMinutes(5);
// wait for the specified number of orchestration instances to complete
try
{
log.LogWarning($"Purging {numberOrchestrations} orchestration instances...");
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
var tasks = new List<Task<bool>>();
int deleted = 0;
// start all the orchestrations
await Enumerable.Range(0, numberOrchestrations).ParallelForEachAsync(200, true, async (iteration) =>
{
var orchestrationInstanceId = InstanceId(prefix, iteration);
var response = await client.PurgeInstanceHistoryAsync(orchestrationInstanceId);
Interlocked.Add(ref deleted, response.InstancesDeleted);
});
return new OkObjectResult(
deleted == numberOrchestrations
? $"all {numberOrchestrations} orchestration instances purged.\n"
: $"only {deleted}/{numberOrchestrations} orchestration instances purged.\n");
}
catch (Exception e)
{
return new ObjectResult(new { error = e.ToString() });
}
}
}

Просмотреть файл

@ -32,7 +32,7 @@ namespace PerformanceTests
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
log.LogInformation("C# HTTP trigger function processed a request.");
log.LogWarning("Received ping {requestBody}", requestBody);
var is64bit = Environment.Is64BitProcess;
try

Просмотреть файл

@ -23,17 +23,14 @@ namespace PerformanceTests
public static class Queries
{
[FunctionName(nameof(PagedQuery))]
public static async Task<JsonResult> PagedQuery(
public static async Task<IActionResult> PagedQuery(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "pagedquery")] HttpRequest req,
[DurableClient] IDurableClient client,
ILogger log)
{
try
{
var queryCondition = new OrchestrationStatusQueryCondition()
{
PageSize = 100,
};
var queryCondition = new OrchestrationStatusQueryCondition();
try
{
@ -104,6 +101,8 @@ namespace PerformanceTests
int inputchars = 0;
int pages = 0;
var receivedInstanceIds = new HashSet<string>();
log.LogWarning($"Querying orchestration instances...");
var stopwatch = Stopwatch.StartNew();
@ -127,6 +126,12 @@ namespace PerformanceTests
{
inputchars += status.Input.ToString().Length;
}
bool isFresh = receivedInstanceIds.Add(status.InstanceId);
if (!isFresh)
{
throw new InvalidDataException($"received duplicate instance id: {status.InstanceId}");
}
}
} while (queryCondition.ContinuationToken != null);
@ -134,7 +139,7 @@ namespace PerformanceTests
stopwatch.Stop();
double querySec = stopwatch.ElapsedMilliseconds / 1000.0;
return new JsonResult(new
var resultObject = new
{
records,
completed,
@ -142,7 +147,9 @@ namespace PerformanceTests
pages,
querySec,
throughput = records > 0 ? (records/querySec).ToString("F2") : "n/a",
});
};
return new OkObjectResult($"{JsonConvert.SerializeObject(resultObject)}\n");
}
catch (Exception e)
{

Просмотреть файл

@ -16,7 +16,7 @@
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>

Просмотреть файл

@ -95,6 +95,9 @@
// options: "Local", "Static", "Locavore"
"ActivityScheduler": "Locavore",
// controls whether the instance ids are kept in memory. If false, paged queries are not supported.
"KeepInstanceIdsInMemory": true,
// The log level limits below control the production of log events by the various components.
// it limits production, not just consumption, of the events, so it can be used to prevent overheads.
// "Debug" is a reasonable setting, as it allows troubleshooting without impacting perf too much.

Просмотреть файл

@ -0,0 +1,34 @@
#!/usr/bin/pwsh
param (
$Settings="./settings.ps1",
$Configuration="Release"
)
# read the settings and initialize the azure resources
. ./scripts/init.ps1 -Settings $Settings
# build the code
Write-Host Building $Configuration Configuration...
dotnet build -c $Configuration
# enter the directory with the binaries
Push-Location -Path bin/$Configuration/net6.0
# update the host.json so the app runs locally in client mode
$HostConfigurationFile = "host.json"
Write-Host Updating host.json...
$hostconf = (Get-Content "host.json" | ConvertFrom-Json -Depth 32)
$hostconf.extensions.durableTask.storageProvider.PartitionManagement = "ClientOnly"
$hostconf | ConvertTo-Json -depth 32 | set-content "host.json"
# look up the two connection strings and assign them to the respective environment variables
Write-Host Setting environment variables...
$Env:AzureWebJobsStorage = (az storage account show-connection-string --name $storageName --resource-group $groupName | ConvertFrom-Json).connectionString
$Env:EventHubsConnection = (az eventhubs namespace authorization-rule keys list --resource-group $groupName --namespace-name $namespaceName --name RootManageSharedAccessKey | ConvertFrom-Json).primaryConnectionString
# start the function app locally
Write-Host Starting the function app...
func start --no-build
Pop-Location