fix frequent query timeouts by correctly stopping page collection when time is about to run out (#202)
This commit is contained in:
Родитель
a292d0fddb
Коммит
6ea05fe748
|
@ -222,7 +222,7 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
}
|
||||
|
||||
public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<OrchestrationState> instances, DateTime attempt)
|
||||
public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<(string, OrchestrationState)> instances, DateTime attempt)
|
||||
{
|
||||
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
|
||||
this.Assert(!this.IsReplaying, "query events are never part of the replay");
|
||||
|
|
|
@ -39,7 +39,7 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
}
|
||||
|
||||
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition, DateTime attempt);
|
||||
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<(string,OrchestrationState)> result, Partition partition, DateTime attempt);
|
||||
|
||||
public sealed override void DetermineEffects(EffectTracker effects)
|
||||
{
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
const int batchsize = 11;
|
||||
|
||||
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> instances, Partition partition, DateTime attempt)
|
||||
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<(string,OrchestrationState)> instances, Partition partition, DateTime attempt)
|
||||
{
|
||||
int totalcount = 0;
|
||||
string continuationToken = this.ContinuationToken ?? "";
|
||||
|
@ -30,12 +30,12 @@ namespace DurableTask.Netherite
|
|||
|
||||
using var memoryStream = new MemoryStream();
|
||||
|
||||
await foreach (var orchestrationState in instances)
|
||||
await foreach (var (position, instance) in instances)
|
||||
{
|
||||
// a null is used to indicate that we have read the last instance
|
||||
if (orchestrationState == null)
|
||||
if (instance == null)
|
||||
{
|
||||
continuationToken = null; // indicates completion
|
||||
continuationToken = position;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -52,8 +52,8 @@ namespace DurableTask.Netherite
|
|||
};
|
||||
}
|
||||
|
||||
response.OrchestrationStates.Add(orchestrationState);
|
||||
continuationToken = orchestrationState.OrchestrationInstance.InstanceId;
|
||||
response.OrchestrationStates.Add(instance);
|
||||
continuationToken = position;
|
||||
totalcount++;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ namespace DurableTask.Netherite
|
|||
trackedObject.Process(this, effects);
|
||||
}
|
||||
|
||||
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> instances, Partition partition, DateTime attempt)
|
||||
public async override Task OnQueryCompleteAsync(IAsyncEnumerable<(string,OrchestrationState)> instances, Partition partition, DateTime attempt)
|
||||
{
|
||||
int batchCount = 0;
|
||||
string continuationToken = null;
|
||||
|
@ -50,11 +50,11 @@ namespace DurableTask.Netherite
|
|||
await batch.WhenProcessed.Task;
|
||||
}
|
||||
|
||||
await foreach (var orchestrationState in instances)
|
||||
await foreach (var (position, instance) in instances)
|
||||
{
|
||||
if (orchestrationState != null)
|
||||
if (instance != null)
|
||||
{
|
||||
string instanceId = orchestrationState.OrchestrationInstance.InstanceId;
|
||||
string instanceId = instance.OrchestrationInstance.InstanceId;
|
||||
|
||||
batch.InstanceIds.Add(instanceId);
|
||||
|
||||
|
@ -64,11 +64,11 @@ namespace DurableTask.Netherite
|
|||
batch = makeNewBatchObject();
|
||||
}
|
||||
|
||||
continuationToken = instanceId;
|
||||
continuationToken = position;
|
||||
}
|
||||
else
|
||||
{
|
||||
continuationToken = null;
|
||||
continuationToken = position;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,6 @@ namespace DurableTask.Netherite
|
|||
/// <param name="exceptionTask">A task that throws an exception if the enumeration fails</param>
|
||||
/// <param name="partition">The partition</param>
|
||||
/// <param name="attempt">The timestamp for this query attempt</param>
|
||||
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition, DateTime attempt);
|
||||
public abstract Task OnQueryCompleteAsync(IAsyncEnumerable<(string,OrchestrationState)> result, Partition partition, DateTime attempt);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -693,6 +693,7 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
result = await this.RunUnpagedPartitionQueries(
|
||||
clientQueryId,
|
||||
partitionPositions,
|
||||
(uint partitionId) =>
|
||||
{
|
||||
var request = requestCreator();
|
||||
|
@ -729,6 +730,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
async Task<TResult> RunUnpagedPartitionQueries<TRequest, TResponse, TResult>(
|
||||
string clientQueryId,
|
||||
string[] partitionPositions,
|
||||
Func<uint, TRequest> requestCreator,
|
||||
Func<IEnumerable<TResponse>, TResult> responseAggregator,
|
||||
CancellationToken cancellationToken)
|
||||
|
@ -738,15 +740,25 @@ namespace DurableTask.Netherite
|
|||
async Task<TResponse> QueryPartition(uint partitionId)
|
||||
{
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var request = requestCreator(partitionId);
|
||||
var response = (TResponse) await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
|
||||
this.traceHelper.TraceQueryProgress(clientQueryId, request.EventIdString, partitionId, stopwatch.Elapsed, request.PageSize, response.Count, response.ContinuationToken);
|
||||
return response;
|
||||
if (partitionPositions[partitionId] != null)
|
||||
{
|
||||
var request = requestCreator(partitionId);
|
||||
request.ContinuationToken = partitionPositions[partitionId];
|
||||
var response = (TResponse)await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
|
||||
partitionPositions[partitionId] = response.ContinuationToken;
|
||||
this.traceHelper.TraceQueryProgress(clientQueryId, request.EventIdString, partitionId, stopwatch.Elapsed, request.PageSize, response.Count, response.ContinuationToken);
|
||||
return response;
|
||||
}
|
||||
else
|
||||
{
|
||||
// we have already reached the end of this partition
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
var tasks = Enumerable.Range(0, (int) this.host.NumberPartitions).Select(i => QueryPartition((uint) i)).ToList();
|
||||
ClientEvent[] responses = await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
return responseAggregator(responses.Cast<TResponse>());
|
||||
return responseAggregator(responses.Where(r => r != null).Cast<TResponse>());
|
||||
}
|
||||
|
||||
public async Task<TResult> RunPagedPartitionQueries<TRequest, TResponse, TResult>(
|
||||
|
|
|
@ -122,7 +122,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
public override Netherite.InstanceQuery InstanceQuery => this.request.InstanceQuery;
|
||||
|
||||
public override async Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> result, Partition partition, DateTime attempt)
|
||||
public override async Task OnQueryCompleteAsync(IAsyncEnumerable<(string, OrchestrationState)> result, Partition partition, DateTime attempt)
|
||||
{
|
||||
partition.Assert(this.request.Phase == ClientRequestEventWithQuery.ProcessingPhase.Query, "wrong phase in QueriesState.OnQueryCompleteAsync");
|
||||
|
||||
|
|
|
@ -548,18 +548,21 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
DateTime attempt = DateTime.UtcNow;
|
||||
|
||||
IAsyncEnumerable<OrchestrationState> orchestrationStates;
|
||||
|
||||
IAsyncEnumerable<(string, OrchestrationState)> orchestrationStates;
|
||||
|
||||
var stats = (StatsState)this.singletons[(int)TrackedObjectKey.Stats.ObjectType];
|
||||
|
||||
if (stats.HasInstanceIds)
|
||||
{
|
||||
TimeSpan timeBudget = queryEvent.TimeoutUtc.HasValue ? (queryEvent.TimeoutUtc.Value - attempt) - TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(15);
|
||||
|
||||
orchestrationStates = this.QueryEnumeratedStates(
|
||||
effectTracker,
|
||||
queryEvent,
|
||||
stats.GetEnumerator(queryEvent.InstanceQuery.InstanceIdPrefix, queryEvent.ContinuationToken),
|
||||
queryEvent.PageSize,
|
||||
TimeSpan.FromSeconds(15),
|
||||
timeBudget,
|
||||
attempt);
|
||||
}
|
||||
else
|
||||
|
@ -866,19 +869,19 @@ namespace DurableTask.Netherite.Faster
|
|||
return default;
|
||||
}
|
||||
|
||||
async IAsyncEnumerable<OrchestrationState> QueryEnumeratedStates(
|
||||
async IAsyncEnumerable<(string,OrchestrationState)> QueryEnumeratedStates(
|
||||
EffectTracker effectTracker,
|
||||
PartitionQueryEvent queryEvent,
|
||||
IEnumerator<string> enumerator,
|
||||
int pageSize,
|
||||
TimeSpan pageTime,
|
||||
TimeSpan timeBudget,
|
||||
DateTime attempt
|
||||
)
|
||||
{
|
||||
var instanceQuery = queryEvent.InstanceQuery;
|
||||
string queryId = queryEvent.EventIdString;
|
||||
int? pageLimit = pageSize > 0 ? pageSize : null;
|
||||
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration from {queryEvent.ContinuationToken} with pageLimit={(pageLimit.HasValue ? pageLimit.ToString() : "none")} pageTime={pageTime}");
|
||||
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration from {queryEvent.ContinuationToken} with pageLimit={(pageLimit.HasValue ? pageLimit.ToString() : "none")} timeBudget={timeBudget}");
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
|
||||
var channel = Channel.CreateBounded<(bool last, ValueTask<FasterKV<Key, Value>.ReadAsyncResult<EffectTracker, Output, object>> responseTask)>(200);
|
||||
|
@ -900,17 +903,7 @@ namespace DurableTask.Netherite.Faster
|
|||
continue;
|
||||
}
|
||||
|
||||
while (!await leftToFill.WaitAsync(2000, cancellationToken))
|
||||
{
|
||||
if (stopwatch.Elapsed > pageTime)
|
||||
{
|
||||
// stop issuing more reads and just finish and return what we have so far
|
||||
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration finished because of time limit");
|
||||
channel.Writer.Complete();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
await leftToFill.WaitAsync(cancellationToken);
|
||||
await channel.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false);
|
||||
var readTask = this.ReadOnQuerySessionAsync(enumerator.Current, cancellationToken);
|
||||
await channel.Writer.WriteAsync((false, readTask), cancellationToken).ConfigureAwait(false);
|
||||
|
@ -959,10 +952,17 @@ namespace DurableTask.Netherite.Faster
|
|||
if (item.last)
|
||||
{
|
||||
ReportProgress("completed");
|
||||
yield return null;
|
||||
yield return (null, null);
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (stopwatch.Elapsed > timeBudget)
|
||||
{
|
||||
// stop querying and just return what we have so far
|
||||
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration finished because of time limit");
|
||||
goto pageDone;
|
||||
}
|
||||
|
||||
if (stopwatch.ElapsedMilliseconds - lastReport > 5000)
|
||||
{
|
||||
ReportProgress("underway");
|
||||
|
@ -996,6 +996,10 @@ namespace DurableTask.Netherite.Faster
|
|||
// reading the orchestrationState may race with updating the orchestration state
|
||||
// but it is benign because the OrchestrationState object is immutable
|
||||
orchestrationState = instanceState?.OrchestrationState;
|
||||
|
||||
position = instanceState.InstanceId;
|
||||
|
||||
this.partition.Assert(orchestrationState == null || orchestrationState.OrchestrationInstance.InstanceId == instanceState.InstanceId, "wrong instance id");
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
|
@ -1019,8 +1023,7 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
matched++;
|
||||
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"match instance {enumerator.Current}");
|
||||
yield return orchestrationState;
|
||||
position = orchestrationState.OrchestrationInstance.InstanceId;
|
||||
yield return (position, orchestrationState);
|
||||
|
||||
if (pageLimit.HasValue)
|
||||
{
|
||||
|
@ -1041,6 +1044,9 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pageDone:
|
||||
yield return (position, null);
|
||||
ReportProgress("completed-page");
|
||||
|
||||
done:
|
||||
|
@ -1049,7 +1055,7 @@ namespace DurableTask.Netherite.Faster
|
|||
yield break;
|
||||
}
|
||||
|
||||
IAsyncEnumerable<OrchestrationState> ScanOrchestrationStates(
|
||||
IAsyncEnumerable<(string,OrchestrationState)> ScanOrchestrationStates(
|
||||
EffectTracker effectTracker,
|
||||
PartitionQueryEvent queryEvent,
|
||||
DateTime attempt)
|
||||
|
@ -1059,7 +1065,7 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
// we use a separate thread to iterate, since Faster can iterate synchronously only at the moment
|
||||
// and we don't want it to block thread pool worker threads
|
||||
var channel = Channel.CreateBounded<OrchestrationState>(500);
|
||||
var channel = Channel.CreateBounded<(string,OrchestrationState)>(500);
|
||||
var scanThread = TrackedThreads.MakeTrackedThread(RunScan, $"QueryScan-{queryId}-{attempt:o}");
|
||||
scanThread.Start();
|
||||
|
||||
|
@ -1071,6 +1077,7 @@ namespace DurableTask.Netherite.Faster
|
|||
try
|
||||
{
|
||||
using var _ = EventTraceContext.MakeContext(0, queryId);
|
||||
string startAt = queryEvent.ContinuationToken ?? "";
|
||||
var session = this.CreateASession($"scan-{queryId}-{attempt:o}", true);
|
||||
using (this.TrackTemporarySession(session))
|
||||
{
|
||||
|
@ -1103,7 +1110,7 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
ReportProgress("underway");
|
||||
}
|
||||
|
||||
|
||||
if (key.Val.ObjectType == TrackedObjectKey.TrackedObjectType.Instance)
|
||||
{
|
||||
scanned++;
|
||||
|
@ -1131,15 +1138,16 @@ namespace DurableTask.Netherite.Faster
|
|||
// reading the orchestrationState may race with updating the orchestration state
|
||||
// but it is benign because the OrchestrationState object is immutable
|
||||
var orchestrationState = instanceState?.OrchestrationState;
|
||||
|
||||
string instanceId = orchestrationState.OrchestrationInstance.InstanceId;
|
||||
if (orchestrationState != null
|
||||
&& startAt.CompareTo(instanceId) < 0
|
||||
&& instanceQuery.Matches(orchestrationState))
|
||||
{
|
||||
matched++;
|
||||
|
||||
//this.partition.EventDetailTracer?.TraceEventProcessingDetail($"match instance {key.Val.InstanceId}");
|
||||
|
||||
var task = channel.Writer.WriteAsync(orchestrationState);
|
||||
var task = channel.Writer.WriteAsync((instanceId, orchestrationState));
|
||||
|
||||
if (!task.IsCompleted)
|
||||
{
|
||||
|
@ -1152,7 +1160,7 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
ReportProgress("completed");
|
||||
|
||||
var task1 = channel.Writer.WriteAsync(null);
|
||||
var task1 = channel.Writer.WriteAsync((null, null));
|
||||
if (!task1.IsCompleted)
|
||||
{
|
||||
task1.AsTask().Wait();
|
||||
|
|
|
@ -91,21 +91,38 @@ namespace DurableTask.Netherite
|
|||
return result;
|
||||
}
|
||||
|
||||
IList<OrchestrationState> QueryOrchestrationStates(InstanceQuery query, int pageSize, string continuationToken)
|
||||
IEnumerable<(string, OrchestrationState)> QueryOrchestrationStates(InstanceQuery query, int pageSize, string continuationToken)
|
||||
{
|
||||
return this.trackedObjects
|
||||
var instances = this.trackedObjects
|
||||
.Values
|
||||
.Select(trackedObject => trackedObject as InstanceState)
|
||||
.Select(instanceState => instanceState?.OrchestrationState)
|
||||
.Where(orchestrationState =>
|
||||
.Where(orchestrationState =>
|
||||
orchestrationState != null
|
||||
&& orchestrationState.OrchestrationInstance.InstanceId.CompareTo(continuationToken) > 0
|
||||
&& orchestrationState.OrchestrationInstance.InstanceId.CompareTo(continuationToken) > 0
|
||||
&& (query == null || query.Matches(orchestrationState)))
|
||||
.OrderBy(orchestrationState => orchestrationState.OrchestrationInstance.InstanceId)
|
||||
.Select(orchestrationState => orchestrationState.ClearFieldsImmutably(!query.FetchInput, false))
|
||||
.Append(null)
|
||||
.Take(pageSize == 0 ? int.MaxValue : pageSize)
|
||||
.ToList();
|
||||
.Take(pageSize == 0 ? int.MaxValue : pageSize);
|
||||
|
||||
string last = "";
|
||||
|
||||
foreach (OrchestrationState instance in instances)
|
||||
{
|
||||
if (instance != null)
|
||||
{
|
||||
last = instance.OrchestrationInstance.InstanceId;
|
||||
yield return (last, instance);
|
||||
}
|
||||
else
|
||||
{
|
||||
yield return (null, null);
|
||||
yield break;
|
||||
}
|
||||
}
|
||||
|
||||
yield return (last, null);
|
||||
}
|
||||
|
||||
protected override async Task Process(IList<PartitionEvent> batch)
|
||||
|
@ -146,6 +163,7 @@ namespace DurableTask.Netherite
|
|||
|
||||
case PartitionQueryEvent queryEvent:
|
||||
var instances = this.QueryOrchestrationStates(queryEvent.InstanceQuery, queryEvent.PageSize, queryEvent.ContinuationToken ?? "");
|
||||
|
||||
var backgroundTask = Task.Run(() => this.effects.ProcessQueryResultAsync(queryEvent, instances.ToAsyncEnumerable(), DateTime.UtcNow));
|
||||
break;
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ namespace PerformanceTests
|
|||
try
|
||||
{
|
||||
var queryCondition = new OrchestrationStatusQueryCondition();
|
||||
bool keepGoingUntilDone = true;
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -66,6 +67,13 @@ namespace PerformanceTests
|
|||
queryCondition.PageSize = int.Parse(val);
|
||||
}
|
||||
}
|
||||
{
|
||||
if (parameters.TryGetValue("keepGoingUntilDone", out string val))
|
||||
{
|
||||
parameters.Remove("keepGoingUntilDone");
|
||||
keepGoingUntilDone = bool.Parse(val);
|
||||
}
|
||||
}
|
||||
{
|
||||
if (parameters.TryGetValue("instanceIdPrefix", out string val))
|
||||
{
|
||||
|
@ -73,6 +81,13 @@ namespace PerformanceTests
|
|||
queryCondition.InstanceIdPrefix = val;
|
||||
}
|
||||
}
|
||||
{
|
||||
if (parameters.TryGetValue("continuationToken", out string val))
|
||||
{
|
||||
parameters.Remove("continuationToken");
|
||||
queryCondition.ContinuationToken = val;
|
||||
}
|
||||
}
|
||||
{
|
||||
if (parameters.TryGetValue("showInput", out string val))
|
||||
{
|
||||
|
@ -134,10 +149,11 @@ namespace PerformanceTests
|
|||
}
|
||||
}
|
||||
|
||||
} while (queryCondition.ContinuationToken != null);
|
||||
} while (keepGoingUntilDone && queryCondition.ContinuationToken != null);
|
||||
|
||||
stopwatch.Stop();
|
||||
double querySec = stopwatch.ElapsedMilliseconds / 1000.0;
|
||||
string continuationToken = queryCondition.ContinuationToken;
|
||||
|
||||
var resultObject = new
|
||||
{
|
||||
|
@ -146,6 +162,7 @@ namespace PerformanceTests
|
|||
inputchars,
|
||||
pages,
|
||||
querySec,
|
||||
continuationToken,
|
||||
throughput = records > 0 ? (records/querySec).ToString("F2") : "n/a",
|
||||
};
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче