Merge branch 'main' of https://github.com/microsoft/durabletask-netherite into main
This commit is contained in:
Коммит
94edddc01d
|
@ -33,6 +33,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventProducer", "test\Event
|
|||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventConsumer", "test\EventConsumer\EventConsumer.csproj", "{1D16A6FD-944C-49A1-8727-6236861F437A}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScalingTests", "ScalingTests\ScalingTests.csproj", "{2F4D331C-62E4-47E8-852E-163166944DF8}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
|
@ -71,6 +73,10 @@ Global
|
|||
{1D16A6FD-944C-49A1-8727-6236861F437A}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{1D16A6FD-944C-49A1-8727-6236861F437A}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{1D16A6FD-944C-49A1-8727-6236861F437A}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{2F4D331C-62E4-47E8-852E-163166944DF8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{2F4D331C-62E4-47E8-852E-163166944DF8}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{2F4D331C-62E4-47E8-852E-163166944DF8}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{2F4D331C-62E4-47E8-852E-163166944DF8}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
|
@ -84,6 +90,7 @@ Global
|
|||
{654DA6B4-2E2F-4386-BB9F-7CE5A13998DE} = {AB958467-9236-402E-833C-B8DE4841AB9F}
|
||||
{5F2A1A00-13A2-40C5-A1D6-A3518E4645E8} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
|
||||
{1D16A6FD-944C-49A1-8727-6236861F437A} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
|
||||
{2F4D331C-62E4-47E8-852E-163166944DF8} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {238A9613-5411-41CF-BDEC-168CCD5C03FB}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
namespace ScalingTests
|
||||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
public class ConsoleLoggerProvider : ILoggerProvider
|
||||
{
|
||||
public ILogger CreateLogger(string categoryName)
|
||||
=> new Logger(categoryName);
|
||||
|
||||
public void Dispose()
|
||||
{ }
|
||||
|
||||
public class Logger : ILogger
|
||||
{
|
||||
readonly string categoryName;
|
||||
|
||||
public Logger(string categoryName)
|
||||
{
|
||||
this.categoryName = categoryName;
|
||||
}
|
||||
|
||||
public IDisposable BeginScope<TState>(TState state) => NoopDisposable.Instance;
|
||||
|
||||
public bool IsEnabled(LogLevel logLevel) => LogLevel.Trace <= logLevel;
|
||||
|
||||
public void Log<TState>(LogLevel logLevel, Microsoft.Extensions.Logging.EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
|
||||
{
|
||||
// Write the information to the system trace
|
||||
string formattedString = formatter(state, exception);
|
||||
|
||||
switch (logLevel)
|
||||
{
|
||||
case LogLevel.Information:
|
||||
case LogLevel.Debug:
|
||||
case LogLevel.Trace:
|
||||
Console.WriteLine(formattedString);
|
||||
break;
|
||||
case LogLevel.Error:
|
||||
case LogLevel.Critical:
|
||||
Console.Error.WriteLine(formattedString);
|
||||
if (exception != null)
|
||||
Console.Error.WriteLine(exception.ToString());
|
||||
break;
|
||||
case LogLevel.Warning:
|
||||
Console.Error.WriteLine(formattedString);
|
||||
if (exception != null)
|
||||
Console.Error.WriteLine(exception.ToString());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
class NoopDisposable : IDisposable
|
||||
{
|
||||
public static NoopDisposable Instance = new NoopDisposable();
|
||||
public void Dispose()
|
||||
{ }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
namespace ScalingTests
|
||||
{
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using DurableTask.Netherite;
|
||||
using DurableTask.Netherite.Scaling;
|
||||
using Dynamitey;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
class Program
|
||||
{
|
||||
static async Task Main(string[] args)
|
||||
{
|
||||
var settings = new NetheriteOrchestrationServiceSettings()
|
||||
{
|
||||
HubName = "perftests",
|
||||
PartitionManagement = PartitionManagementOptions.ClientOnly,
|
||||
|
||||
LogLevelLimit = LogLevel.Trace,
|
||||
EventLogLevelLimit = LogLevel.Trace,
|
||||
StorageLogLevelLimit = LogLevel.Trace,
|
||||
TransportLogLevelLimit = LogLevel.Trace,
|
||||
WorkItemLogLevelLimit = LogLevel.Trace,
|
||||
};
|
||||
settings.Validate((string connectionName) => Environment.GetEnvironmentVariable(connectionName));
|
||||
var loggerFactory = new LoggerFactory();
|
||||
loggerFactory.AddProvider(new ConsoleLoggerProvider());
|
||||
var logger = loggerFactory.CreateLogger("Main");
|
||||
|
||||
logger.LogInformation("Starting OrchestrationService...");
|
||||
var service = new NetheriteOrchestrationService(settings, loggerFactory);
|
||||
await service.StartAsync();
|
||||
|
||||
if (service.TryGetScalingMonitor(out ScalingMonitor scalingMonitor))
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
logger.LogInformation("Hit a number to run scale decision for that workercount, or 'q' to exit");
|
||||
ConsoleKeyInfo keyInfo = Console.ReadKey();
|
||||
int workerCount;
|
||||
|
||||
if (keyInfo.KeyChar == 'q')
|
||||
{
|
||||
break;
|
||||
}
|
||||
else if (!int.TryParse($"{keyInfo.KeyChar}", out workerCount))
|
||||
{
|
||||
workerCount = 1;
|
||||
}
|
||||
|
||||
logger.LogInformation("--------- Collecting Metrics...");
|
||||
var metrics = await scalingMonitor.CollectMetrics();
|
||||
logger.LogInformation(JsonConvert.SerializeObject(metrics, Formatting.Indented));
|
||||
logger.LogInformation($"--------- Making scale decision for worker count {workerCount}...");
|
||||
var decision = scalingMonitor.GetScaleRecommendation(workerCount, metrics);
|
||||
logger.LogInformation(JsonConvert.SerializeObject(decision, Formatting.Indented));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogError("failed to create scaling monitor.");
|
||||
}
|
||||
|
||||
// shut down
|
||||
logger.LogInformation("OchestrationService stopping...");
|
||||
await service.StopAsync();
|
||||
logger.LogInformation("Done");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp3.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\test\DurableTask.Netherite.Tests\DurableTask.Netherite.Tests.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
|
@ -36,8 +36,8 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.4.0" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.1" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.4.3" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.2" />
|
||||
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
|
|
|
@ -5,12 +5,17 @@ namespace DurableTask.Netherite.AzureFunctions
|
|||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using DurableTask.Core;
|
||||
using DurableTask.Core.Common;
|
||||
using DurableTask.Netherite;
|
||||
using DurableTask.Netherite.Scaling;
|
||||
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
|
||||
using Microsoft.Azure.WebJobs.Host.Scale;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
|
||||
|
@ -97,5 +102,125 @@ namespace DurableTask.Netherite.AzureFunctions
|
|||
ContinuationToken = result.ContinuationToken,
|
||||
};
|
||||
}
|
||||
|
||||
public override bool TryGetScaleMonitor(
|
||||
string functionId,
|
||||
string functionName,
|
||||
string hubName,
|
||||
string storageConnectionString,
|
||||
out IScaleMonitor scaleMonitor)
|
||||
{
|
||||
if (this.Service.TryGetScalingMonitor(out var monitor))
|
||||
{
|
||||
scaleMonitor = new ScaleMonitor(monitor);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
scaleMonitor = null;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
class ScaleMonitor : IScaleMonitor<ScaleMonitor.NetheriteScaleMetrics>
|
||||
{
|
||||
readonly ScalingMonitor scalingMonitor;
|
||||
readonly ScaleMonitorDescriptor descriptor;
|
||||
|
||||
public ScaleMonitor(ScalingMonitor scalingMonitor)
|
||||
{
|
||||
this.scalingMonitor = scalingMonitor;
|
||||
this.descriptor = new ScaleMonitorDescriptor($"DurableTaskTrigger-Netherite-{this.scalingMonitor.TaskHubName}".ToLower());
|
||||
}
|
||||
|
||||
public ScaleMonitorDescriptor Descriptor => this.descriptor;
|
||||
|
||||
public class NetheriteScaleMetrics : ScaleMetrics
|
||||
{
|
||||
public ScalingMonitor.Metrics Metrics { get; set; }
|
||||
}
|
||||
|
||||
async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
|
||||
{
|
||||
return await this.GetMetricsAsync();
|
||||
}
|
||||
|
||||
public async Task<NetheriteScaleMetrics> GetMetricsAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
Stopwatch sw = new Stopwatch();
|
||||
sw.Start();
|
||||
var metrics = await this.scalingMonitor.CollectMetrics();
|
||||
sw.Stop();
|
||||
|
||||
this.scalingMonitor.Logger.LogInformation(
|
||||
"Collected scale info for {partitionCount} partitions in {latencyMs:F2}ms.",
|
||||
metrics.LoadInformation.Count, sw.Elapsed.TotalMilliseconds);
|
||||
|
||||
return new NetheriteScaleMetrics()
|
||||
{
|
||||
Metrics = metrics
|
||||
};
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
{
|
||||
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetMetricsAsync() failed: {exception}", e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
|
||||
{
|
||||
return this.GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<NetheriteScaleMetrics>().ToArray());
|
||||
}
|
||||
|
||||
public ScaleStatus GetScaleStatus(ScaleStatusContext<NetheriteScaleMetrics> context)
|
||||
{
|
||||
return this.GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
|
||||
}
|
||||
|
||||
ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
|
||||
{
|
||||
try
|
||||
{
|
||||
ScaleStatus scaleStatus = new ScaleStatus();
|
||||
ScaleRecommendation recommendation;
|
||||
|
||||
if (metrics.Length == 0)
|
||||
{
|
||||
recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "missing metrics");
|
||||
}
|
||||
else
|
||||
{
|
||||
recommendation = this.scalingMonitor.GetScaleRecommendation(workerCount, metrics[^1].Metrics);
|
||||
}
|
||||
|
||||
switch (recommendation.Action)
|
||||
{
|
||||
case ScaleAction.AddWorker:
|
||||
scaleStatus.Vote = ScaleVote.ScaleOut;
|
||||
break;
|
||||
case ScaleAction.RemoveWorker:
|
||||
scaleStatus.Vote = ScaleVote.ScaleIn;
|
||||
break;
|
||||
default:
|
||||
scaleStatus.Vote = ScaleVote.None;
|
||||
break;
|
||||
}
|
||||
|
||||
this.scalingMonitor.Logger.LogInformation(
|
||||
"Autoscaler recommends: {scaleRecommendation} because: {reason}",
|
||||
scaleStatus.Vote.ToString(), recommendation.Reason);
|
||||
|
||||
return scaleStatus;
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
{
|
||||
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetScaleStatus() failed: {exception}", e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ namespace DurableTask.Netherite.AzureFunctions
|
|||
|
||||
internal static BlobLogger BlobLogger { get; set; }
|
||||
|
||||
public string Name => "Netherite";
|
||||
|
||||
// Called by the Azure Functions runtime dependency injection infrastructure
|
||||
public NetheriteProviderFactory(
|
||||
IOptions<DurableTaskOptions> extensionOptions,
|
||||
|
|
|
@ -41,13 +41,13 @@
|
|||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.3.0-beta.4" />
|
||||
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.3.1" />
|
||||
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
|
||||
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.1" />
|
||||
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.2" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" />
|
||||
<PackageReference Include="Microsoft.FASTER.Core" Version="1.8.0" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.1" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.2" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
|
||||
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.*" PrivateAssets="All" />
|
||||
<PackageReference Include="System.Threading.Channels" Version="5.0.0" />
|
||||
|
|
|
@ -132,6 +132,31 @@ namespace DurableTask.Netherite
|
|||
DurableTask.Core.Tracing.DefaultEventSource.Log.IsTraceEnabled);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get a scaling monitor for autoscaling.
|
||||
/// </summary>
|
||||
/// <param name="monitor">The returned scaling monitor.</param>
|
||||
/// <returns>true if autoscaling is supported, false otherwise</returns>
|
||||
public bool TryGetScalingMonitor(out ScalingMonitor monitor)
|
||||
{
|
||||
if (this.configuredStorage == TransportConnectionString.StorageChoices.Faster
|
||||
|| this.configuredTransport == TransportConnectionString.TransportChoices.EventHubs)
|
||||
{
|
||||
monitor = new ScalingMonitor(
|
||||
this.Settings.ResolvedStorageConnectionString,
|
||||
this.Settings.ResolvedTransportConnectionString,
|
||||
this.Settings.LoadInformationAzureTableName,
|
||||
this.Settings.HubName,
|
||||
this.LoggerFactory.CreateLogger($"{LoggerCategoryName}.Scaling"));
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
monitor = null;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/******************************/
|
||||
// storage provider
|
||||
/******************************/
|
||||
|
|
|
@ -165,7 +165,7 @@ namespace DurableTask.Netherite
|
|||
this.Assert(this.ErrorHandler.IsTerminated);
|
||||
|
||||
// tell the load publisher to send all buffered info
|
||||
this.LoadPublisher?.Flush();
|
||||
await this.LoadPublisher?.FlushAsync();
|
||||
|
||||
this.TraceHelper.TraceProgress("Stopped partition");
|
||||
}
|
||||
|
|
|
@ -86,10 +86,27 @@ namespace DurableTask.Netherite
|
|||
public override void UpdateLoadInfo(PartitionLoadInfo info)
|
||||
{
|
||||
info.Activities = this.Pending.Count + this.LocalBacklog.Count + this.QueuedRemotes.Count;
|
||||
info.ActivityLatencyMs = Enumerable.Concat(this.LocalBacklog, this.QueuedRemotes)
|
||||
|
||||
if (info.Activities > 0)
|
||||
{
|
||||
var maxLatencyInQueue = Enumerable.Concat(this.LocalBacklog, this.QueuedRemotes)
|
||||
.Select(a => (long)(DateTime.UtcNow - a.IssueTime).TotalMilliseconds)
|
||||
.DefaultIfEmpty()
|
||||
.Max();
|
||||
|
||||
if (maxLatencyInQueue < 100)
|
||||
{
|
||||
info.MarkActive();
|
||||
}
|
||||
else if (maxLatencyInQueue < 1000)
|
||||
{
|
||||
info.MarkMediumLatency();
|
||||
}
|
||||
else
|
||||
{
|
||||
info.MarkHighLatency();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override string ToString()
|
||||
|
|
|
@ -65,10 +65,27 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
|
||||
public override void UpdateLoadInfo(PartitionLoadInfo info)
|
||||
{
|
||||
if (this.Sessions.Count > 0)
|
||||
{
|
||||
info.WorkItems += this.Sessions.Count;
|
||||
|
||||
double now = this.Partition.CurrentTimeMs;
|
||||
info.WorkItemLatencyMs = (long) this.Sessions.Values.Select(session => session.CurrentBatch?.WaitTimeMs(now) ?? 0).DefaultIfEmpty().Max();
|
||||
var maxLatencyInQueue = (long)this.Sessions.Values.Select(session => session.CurrentBatch?.WaitTimeMs(now) ?? 0).DefaultIfEmpty().Max();
|
||||
|
||||
if (maxLatencyInQueue < 100)
|
||||
{
|
||||
info.MarkActive();
|
||||
}
|
||||
else if (maxLatencyInQueue < 1000)
|
||||
{
|
||||
info.MarkMediumLatency();
|
||||
}
|
||||
else
|
||||
{
|
||||
info.MarkHighLatency();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override string ToString()
|
||||
|
|
|
@ -76,7 +76,7 @@ namespace DurableTask.Netherite.Scaling
|
|||
|
||||
public async Task<Dictionary<uint, PartitionLoadInfo>> QueryAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var query = new TableQuery<PartitionInfoEntity>().Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, this.taskHubName));
|
||||
var query = new TableQuery<PartitionInfoEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, this.taskHubName));
|
||||
TableContinuationToken continuationToken = null;
|
||||
Dictionary<uint, PartitionLoadInfo> result = new Dictionary<uint, PartitionLoadInfo>();
|
||||
do
|
||||
|
@ -94,8 +94,6 @@ namespace DurableTask.Netherite.Scaling
|
|||
Outbox = e.Outbox,
|
||||
InputQueuePosition = e.InputQueuePosition,
|
||||
CommitLogPosition = e.CommitLogPosition,
|
||||
ActivityLatencyMs = e.ActivityLatencyMs,
|
||||
WorkItemLatencyMs = e.WorkItemLatencyMs,
|
||||
WorkerId = e.WorkerId,
|
||||
LatencyTrend = e.LatencyTrend,
|
||||
MissRate = e.MissRate,
|
||||
|
@ -116,8 +114,6 @@ namespace DurableTask.Netherite.Scaling
|
|||
public DateTime? NextTimer { get; set; }
|
||||
public long InputQueuePosition { get; set; }
|
||||
public long CommitLogPosition { get; set; }
|
||||
public long ActivityLatencyMs { get; set; }
|
||||
public long WorkItemLatencyMs { get; set; }
|
||||
public string WorkerId { get; set; }
|
||||
public string LatencyTrend { get; set; }
|
||||
public double MissRate { get; set; }
|
||||
|
@ -145,8 +141,6 @@ namespace DurableTask.Netherite.Scaling
|
|||
this.Outbox = info.Outbox;
|
||||
this.InputQueuePosition = info.InputQueuePosition;
|
||||
this.CommitLogPosition = info.CommitLogPosition;
|
||||
this.ActivityLatencyMs = info.ActivityLatencyMs;
|
||||
this.WorkItemLatencyMs = info.WorkItemLatencyMs;
|
||||
this.WorkerId = info.WorkerId;
|
||||
this.LatencyTrend = info.LatencyTrend;
|
||||
this.MissRate = info.MissRate;
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace DurableTask.Netherite.Scaling
|
|||
readonly ILogger logger;
|
||||
|
||||
// we are pushing the aggregated load information on a somewhat slower interval
|
||||
public static TimeSpan AggregatePublishInterval = TimeSpan.FromSeconds(15);
|
||||
public static TimeSpan AggregatePublishInterval = TimeSpan.FromSeconds(2);
|
||||
readonly CancellationTokenSource cancelWait = new CancellationTokenSource();
|
||||
|
||||
public LoadPublisher(ILoadMonitorService service, CancellationToken token, ILogger logger) : base(nameof(LoadPublisher), false, int.MaxValue, token)
|
||||
|
@ -25,14 +25,13 @@ namespace DurableTask.Netherite.Scaling
|
|||
this.cancelWait = new CancellationTokenSource();
|
||||
}
|
||||
|
||||
public void Flush()
|
||||
public Task FlushAsync()
|
||||
{
|
||||
this.cancelWait.Cancel(); // so that we don't have to wait the whole delay
|
||||
return this.WaitForCompletionAsync();
|
||||
}
|
||||
|
||||
protected override async Task Process(IList<(uint, PartitionLoadInfo)> batch)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (batch.Count != 0)
|
||||
{
|
||||
|
@ -51,10 +50,10 @@ namespace DurableTask.Netherite.Scaling
|
|||
{
|
||||
// o.k. during shutdown
|
||||
}
|
||||
catch
|
||||
catch (Exception exception)
|
||||
{
|
||||
// we swallow exceptions so we can tolerate temporary Azure storage errors
|
||||
// TODO log
|
||||
this.logger.LogWarning("LoadPublisher failed: {exception}", exception);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,11 +65,6 @@ namespace DurableTask.Netherite.Scaling
|
|||
{
|
||||
}
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
this.logger.LogWarning("Could not publish load {exception}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -5,6 +5,7 @@ namespace DurableTask.Netherite.Scaling
|
|||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
|
||||
/// <summary>
|
||||
|
@ -42,6 +43,40 @@ namespace DurableTask.Netherite.Scaling
|
|||
/// </summary>
|
||||
public DateTime? Wakeup { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Checks if this partition has pending work
|
||||
/// </summary>
|
||||
/// <returns>a description of some of the work, or null if no work</returns>
|
||||
public string IsBusy()
|
||||
{
|
||||
if (this.Activities > 0)
|
||||
{
|
||||
return $"has {this.Activities} activities pending";
|
||||
}
|
||||
|
||||
if (this.WorkItems > 0)
|
||||
{
|
||||
return $"has {this.WorkItems} work items pending";
|
||||
}
|
||||
|
||||
if (this.Requests > 0)
|
||||
{
|
||||
return $"has {this.Requests} requests pending";
|
||||
}
|
||||
|
||||
if (this.Outbox > 0)
|
||||
{
|
||||
return $"has {this.Outbox} unsent messages";
|
||||
}
|
||||
|
||||
if (this.Wakeup.HasValue && this.Wakeup.Value < DateTime.UtcNow + TimeSpan.FromSeconds(10))
|
||||
{
|
||||
return $"has timer waking up at {this.Wakeup.Value}";
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The input queue position of this partition, which is the next expected EventHubs sequence number.
|
||||
/// </summary>
|
||||
|
@ -52,16 +87,6 @@ namespace DurableTask.Netherite.Scaling
|
|||
/// </summary>
|
||||
public long CommitLogPosition { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The latency of the activity queue.
|
||||
/// </summary>
|
||||
public long ActivityLatencyMs { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The latency of the work item queue.
|
||||
/// </summary>
|
||||
public long WorkItemLatencyMs { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The worker id of the host that is currently running this partition.
|
||||
/// </summary>
|
||||
|
@ -107,5 +132,77 @@ namespace DurableTask.Netherite.Scaling
|
|||
/// </summary>
|
||||
public static int LatencyTrendLength = 5;
|
||||
|
||||
/// <summary>
|
||||
/// Whether a latency trend indicates the partition has been idle for a long time
|
||||
/// </summary>
|
||||
public static bool IsLongIdle(string latencyTrend) => latencyTrend.Count() == LatencyTrendLength && latencyTrend.All(c => c == Idle);
|
||||
|
||||
/// <summary>
|
||||
/// Copy the load info for the next measuring interval
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public static PartitionLoadInfo FirstFrame(string workerId)
|
||||
{
|
||||
return new PartitionLoadInfo()
|
||||
{
|
||||
InputQueuePosition = 0,
|
||||
CommitLogPosition = 0,
|
||||
WorkerId = workerId,
|
||||
LatencyTrend = Idle.ToString(),
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Copy the load info for the next measuring interval
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public PartitionLoadInfo NextFrame()
|
||||
{
|
||||
var copy = new PartitionLoadInfo()
|
||||
{
|
||||
InputQueuePosition = this.InputQueuePosition,
|
||||
CommitLogPosition = this.CommitLogPosition,
|
||||
WorkerId = this.WorkerId,
|
||||
LatencyTrend = this.LatencyTrend,
|
||||
};
|
||||
|
||||
if (copy.LatencyTrend.Length == PartitionLoadInfo.LatencyTrendLength)
|
||||
{
|
||||
copy.LatencyTrend = $"{copy.LatencyTrend[1..PartitionLoadInfo.LatencyTrendLength]}{Idle}";
|
||||
}
|
||||
else
|
||||
{
|
||||
copy.LatencyTrend = $"{copy.LatencyTrend}{Idle}";
|
||||
}
|
||||
|
||||
return copy;
|
||||
}
|
||||
|
||||
public void MarkActive()
|
||||
{
|
||||
char last = this.LatencyTrend[^1];
|
||||
if (last == Idle)
|
||||
{
|
||||
this.LatencyTrend = $"{this.LatencyTrend[0..^1]}{LowLatency}";
|
||||
}
|
||||
}
|
||||
|
||||
public void MarkMediumLatency()
|
||||
{
|
||||
char last = this.LatencyTrend[^1];
|
||||
if (last == Idle || last == LowLatency)
|
||||
{
|
||||
this.LatencyTrend = $"{this.LatencyTrend[0..^1]}{MediumLatency}";
|
||||
}
|
||||
}
|
||||
|
||||
public void MarkHighLatency()
|
||||
{
|
||||
char last = this.LatencyTrend[^1];
|
||||
if (last == Idle || last == LowLatency || last == MediumLatency)
|
||||
{
|
||||
this.LatencyTrend = $"{this.LatencyTrend[0..^1]}{HighLatency}";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
namespace DurableTask.Netherite.Scaling
|
||||
{
|
||||
using System;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Converters;
|
||||
|
||||
/// <summary>
|
||||
/// Represents a scale recommendation for the task hub given the current performance metrics.
|
||||
|
@ -26,6 +28,7 @@ namespace DurableTask.Netherite.Scaling
|
|||
/// <summary>
|
||||
/// Gets the recommended scale action for the current task hub.
|
||||
/// </summary>
|
||||
[JsonConverter(typeof(StringEnumConverter))]
|
||||
public ScaleAction Action { get; }
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -11,6 +11,7 @@ namespace DurableTask.Netherite.Scaling
|
|||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using DurableTask.Netherite.EventHubs;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
/// <summary>
|
||||
/// Monitors the performance of the Netherite backend and makes scaling decisions.
|
||||
|
@ -25,6 +26,18 @@ namespace DurableTask.Netherite.Scaling
|
|||
|
||||
readonly AzureLoadMonitorTable table;
|
||||
|
||||
Metrics? CachedMetrics;
|
||||
|
||||
/// <summary>
|
||||
/// The name of the taskhub.
|
||||
/// </summary>
|
||||
public string TaskHubName => this.taskHubName;
|
||||
|
||||
/// <summary>
|
||||
/// A logger for scaling events.
|
||||
/// </summary>
|
||||
public ILogger Logger { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates an instance of the scaling monitor, with the given parameters.
|
||||
/// </summary>
|
||||
|
@ -32,39 +45,86 @@ namespace DurableTask.Netherite.Scaling
|
|||
/// <param name="eventHubsConnectionString">The connection string for the transport layer.</param>
|
||||
/// <param name="partitionLoadTableName">The name of the storage table with the partition load information.</param>
|
||||
/// <param name="taskHubName">The name of the taskhub.</param>
|
||||
public ScalingMonitor(string storageConnectionString, string eventHubsConnectionString, string partitionLoadTableName, string taskHubName)
|
||||
public ScalingMonitor(
|
||||
string storageConnectionString,
|
||||
string eventHubsConnectionString,
|
||||
string partitionLoadTableName,
|
||||
string taskHubName,
|
||||
ILogger logger)
|
||||
{
|
||||
this.storageConnectionString = storageConnectionString;
|
||||
this.eventHubsConnectionString = eventHubsConnectionString;
|
||||
this.partitionLoadTableName = partitionLoadTableName;
|
||||
this.taskHubName = taskHubName;
|
||||
this.Logger = logger;
|
||||
|
||||
TransportConnectionString.Parse(eventHubsConnectionString, out _, out this.configuredTransport);
|
||||
|
||||
this.table = new AzureLoadMonitorTable(storageConnectionString, partitionLoadTableName, taskHubName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The metrics that are collected prior to making a scaling decision
|
||||
/// </summary>
|
||||
public struct Metrics
|
||||
{
|
||||
/// <summary>
|
||||
/// the most recent load information published for each partition
|
||||
/// </summary>
|
||||
public Dictionary<uint, PartitionLoadInfo> LoadInformation { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// A reason why the taskhub is not idle, or null if it is idle
|
||||
/// </summary>
|
||||
public string Busy { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Whether the taskhub is idle
|
||||
/// </summary>
|
||||
public bool TaskHubIsIdle => string.IsNullOrEmpty(this.Busy);
|
||||
|
||||
/// <summary>
|
||||
/// The time at which the metrics were collected
|
||||
/// </summary>
|
||||
public DateTime Timestamp;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Collect the metrics for making the scaling decision.
|
||||
/// </summary>
|
||||
/// <returns>The collected metrics.</returns>
|
||||
public async Task<Metrics> CollectMetrics()
|
||||
{
|
||||
DateTime now = DateTime.UtcNow;
|
||||
|
||||
if (this.CachedMetrics.HasValue && now - this.CachedMetrics.Value.Timestamp < TimeSpan.FromSeconds(1.5))
|
||||
{
|
||||
return this.CachedMetrics.Value;
|
||||
}
|
||||
|
||||
var loadInformation = await this.table.QueryAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
var busy = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false);
|
||||
|
||||
return (this.CachedMetrics = new Metrics()
|
||||
{
|
||||
LoadInformation = loadInformation,
|
||||
Busy = busy,
|
||||
Timestamp = now,
|
||||
}).Value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Makes a scale recommendation.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task<ScaleRecommendation> GetScaleRecommendation(int workerCount)
|
||||
public ScaleRecommendation GetScaleRecommendation(int workerCount, Metrics metrics)
|
||||
{
|
||||
Dictionary<uint, PartitionLoadInfo> loadInformation = await this.table.QueryAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
|
||||
bool taskHubIsIdle = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false);
|
||||
|
||||
if (workerCount == 0 && !taskHubIsIdle)
|
||||
if (workerCount == 0 && !metrics.TaskHubIsIdle)
|
||||
{
|
||||
return new ScaleRecommendation(ScaleAction.AddWorker, keepWorkersAlive: true, reason: "First worker");
|
||||
return new ScaleRecommendation(ScaleAction.AddWorker, keepWorkersAlive: true, reason: metrics.Busy);
|
||||
}
|
||||
|
||||
if (loadInformation.Values.Any(partitionLoadInfo => partitionLoadInfo.LatencyTrend.Length < PartitionLoadInfo.LatencyTrendLength))
|
||||
{
|
||||
return new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: !taskHubIsIdle, reason: "Not enough samples");
|
||||
}
|
||||
|
||||
if (taskHubIsIdle)
|
||||
if (metrics.TaskHubIsIdle)
|
||||
{
|
||||
return new ScaleRecommendation(
|
||||
scaleAction: workerCount > 0 ? ScaleAction.RemoveWorker : ScaleAction.None,
|
||||
|
@ -72,23 +132,23 @@ namespace DurableTask.Netherite.Scaling
|
|||
reason: "Task hub is idle");
|
||||
}
|
||||
|
||||
int numberOfSlowPartitions = loadInformation.Values.Count(info => info.LatencyTrend.Last() == PartitionLoadInfo.HighLatency);
|
||||
int numberOfSlowPartitions = metrics.LoadInformation.Values.Count(info => info.LatencyTrend.Length > 1 && info.LatencyTrend.Last() == PartitionLoadInfo.HighLatency);
|
||||
|
||||
if (workerCount < numberOfSlowPartitions)
|
||||
{
|
||||
// Some partitions are busy, so scale out until workerCount == partitionCount.
|
||||
var partition = loadInformation.First(kvp => kvp.Value.LatencyTrend.Last() == PartitionLoadInfo.HighLatency);
|
||||
// scale up to the number of busy partitions
|
||||
var partition = metrics.LoadInformation.First(kvp => kvp.Value.LatencyTrend.Last() == PartitionLoadInfo.HighLatency);
|
||||
return new ScaleRecommendation(
|
||||
ScaleAction.AddWorker,
|
||||
keepWorkersAlive: true,
|
||||
reason: $"High latency in partition {partition.Key}: {partition.Value.LatencyTrend}");
|
||||
}
|
||||
|
||||
int numberOfNonIdlePartitions = loadInformation.Values.Count(info => info.LatencyTrend.Any(c => c != PartitionLoadInfo.Idle));
|
||||
int numberOfNonIdlePartitions = metrics.LoadInformation.Values.Count(info => ! PartitionLoadInfo.IsLongIdle(info.LatencyTrend));
|
||||
|
||||
if (workerCount > numberOfNonIdlePartitions)
|
||||
{
|
||||
// If the work item queues are idle, scale down to the number of non-idle control queues.
|
||||
// scale down to the number of non-idle partitions.
|
||||
return new ScaleRecommendation(
|
||||
ScaleAction.RemoveWorker,
|
||||
keepWorkersAlive: true,
|
||||
|
@ -100,10 +160,11 @@ namespace DurableTask.Netherite.Scaling
|
|||
// We also want to avoid scaling in unnecessarily when we've reached optimal scale-out. To balance these
|
||||
// goals, we check for low latencies and vote to scale down 10% of the time when we see this. The thought is
|
||||
// that it's a slow scale-in that will get automatically corrected once latencies start increasing again.
|
||||
if (workerCount > 1 && (new Random()).Next(10) == 0)
|
||||
if (workerCount > 1 && (new Random()).Next(8) == 0)
|
||||
{
|
||||
bool allPartitionsAreFast = !loadInformation.Values.Any(
|
||||
info => info.LatencyTrend.Any(c => c == PartitionLoadInfo.MediumLatency || c == PartitionLoadInfo.HighLatency));
|
||||
bool allPartitionsAreFast = !metrics.LoadInformation.Values.Any(
|
||||
info => info.LatencyTrend.Length == PartitionLoadInfo.LatencyTrendLength
|
||||
&& info.LatencyTrend.Any(c => c == PartitionLoadInfo.MediumLatency || c == PartitionLoadInfo.HighLatency));
|
||||
|
||||
if (allPartitionsAreFast)
|
||||
{
|
||||
|
@ -119,19 +180,15 @@ namespace DurableTask.Netherite.Scaling
|
|||
return new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: $"Partition latencies are healthy");
|
||||
}
|
||||
|
||||
async Task<bool> TaskHubIsIdleAsync(Dictionary<uint, PartitionLoadInfo> loadInformation)
|
||||
public async Task<string> TaskHubIsIdleAsync(Dictionary<uint, PartitionLoadInfo> loadInformation)
|
||||
{
|
||||
// first, check if any of the partitions have queued work or are scheduled to wake up
|
||||
foreach (var p in loadInformation.Values)
|
||||
foreach (var kvp in loadInformation)
|
||||
{
|
||||
if (p.Activities > 0 || p.WorkItems > 0 || p.Requests > 0 || p.Outbox > 0)
|
||||
string busy = kvp.Value.IsBusy();
|
||||
if (!string.IsNullOrEmpty(busy))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (p.Wakeup.HasValue && p.Wakeup.Value < DateTime.UtcNow + TimeSpan.FromSeconds(10))
|
||||
{
|
||||
return false;
|
||||
return $"P{kvp.Key:D2} {busy}";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,30 +197,36 @@ namespace DurableTask.Netherite.Scaling
|
|||
|
||||
long[] positions;
|
||||
|
||||
switch (this.configuredTransport)
|
||||
if (this.configuredTransport == TransportConnectionString.TransportChoices.EventHubs)
|
||||
{
|
||||
case TransportConnectionString.TransportChoices.EventHubs:
|
||||
positions = await EventHubs.EventHubsConnections.GetQueuePositionsAsync(this.eventHubsConnectionString, EventHubsTransport.PartitionHubs).ConfigureAwait(false);
|
||||
break;
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
for (uint i = 0; i < positions.Length; i++)
|
||||
{
|
||||
if (!loadInformation.TryGetValue(i, out var loadInfo))
|
||||
{
|
||||
return false;
|
||||
return $"P{i:D2} has no load information published yet";
|
||||
}
|
||||
if (positions[i] > loadInfo.InputQueuePosition)
|
||||
{
|
||||
return false;
|
||||
return $"P{i:D2} has input queue position {loadInfo.InputQueuePosition} which is {positions[i] - loadInfo.InputQueuePosition} behind latest position {positions[i]}";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// finally, check if we have waited long enough
|
||||
foreach (var kvp in loadInformation)
|
||||
{
|
||||
string latencyTrend = kvp.Value.LatencyTrend;
|
||||
|
||||
if (!PartitionLoadInfo.IsLongIdle(latencyTrend))
|
||||
{
|
||||
return $"P{kvp.Key:D2} had some activity recently, latency trend is {latencyTrend}";
|
||||
}
|
||||
}
|
||||
|
||||
// we have concluded that there are no pending work items, timers, or unprocessed input queue entries
|
||||
return true;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -170,9 +170,6 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
this.TraceHelper.FasterProgress("Recovery complete");
|
||||
}
|
||||
|
||||
var ignoredTask = this.IdleLoop();
|
||||
|
||||
return this.storeWorker.InputQueuePosition;
|
||||
}
|
||||
|
||||
|
@ -182,7 +179,6 @@ namespace DurableTask.Netherite.Faster
|
|||
this.logWorker.StartProcessing();
|
||||
}
|
||||
|
||||
|
||||
public async Task CleanShutdown(bool takeFinalCheckpoint)
|
||||
{
|
||||
this.TraceHelper.FasterProgress("Stopping workers");
|
||||
|
@ -216,24 +212,6 @@ namespace DurableTask.Netherite.Faster
|
|||
this.logWorker.SubmitInternalEvent(evt);
|
||||
}
|
||||
|
||||
async Task IdleLoop()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
await Task.Delay(StoreWorker.IdlingPeriod, this.terminationToken).ConfigureAwait(false);
|
||||
|
||||
//await this.TestStorageLatency();
|
||||
|
||||
if (this.terminationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// periodically bump the store worker so it can check if enough time has elapsed for doing a checkpoint or a load publish
|
||||
this.storeWorker.Notify();
|
||||
}
|
||||
}
|
||||
|
||||
public Task Prefetch(IEnumerable<TrackedObjectKey> keys)
|
||||
{
|
||||
return this.storeWorker.RunPrefetchSession(keys.ToAsyncEnumerable<TrackedObjectKey>());
|
||||
|
|
|
@ -36,12 +36,11 @@ namespace DurableTask.Netherite.Faster
|
|||
long timeOfNextCheckpoint;
|
||||
|
||||
// periodic load publishing
|
||||
long lastPublishedCommitLogPosition = 0;
|
||||
long lastPublishedInputQueuePosition = 0;
|
||||
string lastPublishedLatencyTrend = "";
|
||||
DateTime lastPublishedTime = DateTime.MinValue;
|
||||
public static TimeSpan PublishInterval = TimeSpan.FromSeconds(10);
|
||||
public static TimeSpan IdlingPeriod = TimeSpan.FromSeconds(10.5);
|
||||
PartitionLoadInfo loadInfo;
|
||||
DateTime lastPublished;
|
||||
string lastPublishedLatencyTrend;
|
||||
public static TimeSpan PublishInterval = TimeSpan.FromSeconds(8);
|
||||
public static TimeSpan PokePeriod = TimeSpan.FromSeconds(3); // allows storeworker to checkpoint and publish load even while idle
|
||||
|
||||
|
||||
public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHelper traceHelper, BlobManager blobManager, CancellationToken cancellationToken)
|
||||
|
@ -54,7 +53,10 @@ namespace DurableTask.Netherite.Faster
|
|||
this.traceHelper = traceHelper;
|
||||
this.blobManager = blobManager;
|
||||
this.random = new Random();
|
||||
//this.Tracer = (string message) => this.traceHelper.FasterProgress($"{this.Name} {message}");
|
||||
|
||||
this.loadInfo = PartitionLoadInfo.FirstFrame(this.partition.Settings.WorkerId);
|
||||
this.lastPublished = DateTime.MinValue;
|
||||
this.lastPublishedLatencyTrend = "";
|
||||
|
||||
// construct an effect tracker that we use to apply effects to the store
|
||||
this.effectTracker = new EffectTracker(
|
||||
|
@ -88,6 +90,7 @@ namespace DurableTask.Netherite.Faster
|
|||
public void StartProcessing()
|
||||
{
|
||||
this.Resume();
|
||||
var pokeLoop = this.PokeLoop();
|
||||
}
|
||||
|
||||
public void SetCheckpointPositionsAfterRecovery(long commitLogPosition, long inputQueuePosition)
|
||||
|
@ -156,94 +159,51 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
async Task PublishPartitionLoad()
|
||||
{
|
||||
var info = new PartitionLoadInfo()
|
||||
{
|
||||
CommitLogPosition = this.CommitLogPosition,
|
||||
InputQueuePosition = this.InputQueuePosition,
|
||||
WorkerId = this.partition.Settings.WorkerId,
|
||||
LatencyTrend = this.lastPublishedLatencyTrend,
|
||||
MissRate = this.store.StoreStats.GetMissRate(),
|
||||
};
|
||||
foreach (var k in TrackedObjectKey.GetSingletons())
|
||||
{
|
||||
(await this.store.ReadAsync(k, this.effectTracker).ConfigureAwait(false))?.UpdateLoadInfo(info);
|
||||
(await this.store.ReadAsync(k, this.effectTracker).ConfigureAwait(false)).UpdateLoadInfo(this.loadInfo);
|
||||
}
|
||||
|
||||
this.UpdateLatencyTrend(info);
|
||||
this.loadInfo.MissRate = this.store.StoreStats.GetMissRate();
|
||||
|
||||
// to avoid unnecessary traffic for statically provisioned deployments,
|
||||
// suppress load publishing if the state is not changing
|
||||
if (info.CommitLogPosition == this.lastPublishedCommitLogPosition
|
||||
&& info.InputQueuePosition == this.lastPublishedInputQueuePosition
|
||||
&& info.LatencyTrend == this.lastPublishedLatencyTrend)
|
||||
if (this.loadInfo.IsBusy() != null)
|
||||
{
|
||||
return;
|
||||
this.loadInfo.MarkActive();
|
||||
}
|
||||
|
||||
// we suppress the load publishing if a partition is long time idle and positions are unchanged
|
||||
bool publish = false;
|
||||
|
||||
if (this.loadInfo.CommitLogPosition < this.CommitLogPosition)
|
||||
{
|
||||
this.loadInfo.CommitLogPosition = this.CommitLogPosition;
|
||||
publish = true;
|
||||
}
|
||||
if (this.loadInfo.InputQueuePosition < this.InputQueuePosition)
|
||||
{
|
||||
this.loadInfo.InputQueuePosition = this.InputQueuePosition;
|
||||
publish = true;
|
||||
}
|
||||
if (!PartitionLoadInfo.IsLongIdle(this.loadInfo.LatencyTrend) || this.loadInfo.LatencyTrend != this.lastPublishedLatencyTrend)
|
||||
{
|
||||
publish = true;
|
||||
}
|
||||
|
||||
if (publish)
|
||||
{
|
||||
// take the current load info and put the next frame in its place
|
||||
var loadInfoToPublish = this.loadInfo;
|
||||
this.loadInfo = loadInfoToPublish.NextFrame();
|
||||
this.lastPublished = DateTime.UtcNow;
|
||||
this.lastPublishedLatencyTrend = loadInfoToPublish.LatencyTrend;
|
||||
|
||||
this.partition.TraceHelper.TracePartitionLoad(loadInfoToPublish);
|
||||
|
||||
// to avoid publishing not-yet committed state, publish
|
||||
// only after the current log is persisted.
|
||||
var task = this.LogWorker.WaitForCompletionAsync()
|
||||
.ContinueWith((t) => this.partition.LoadPublisher?.Submit((this.partition.PartitionId, info)));
|
||||
|
||||
this.lastPublishedCommitLogPosition = this.CommitLogPosition;
|
||||
this.lastPublishedInputQueuePosition = this.InputQueuePosition;
|
||||
this.lastPublishedLatencyTrend = info.LatencyTrend;
|
||||
this.lastPublishedTime = DateTime.UtcNow;
|
||||
|
||||
this.partition.TraceHelper.TracePartitionLoad(info);
|
||||
|
||||
// trace top load
|
||||
// this.partition.TraceHelper.TraceProgress($"LockMonitor top {LockMonitor.TopN}: {LockMonitor.Instance.Report()}");
|
||||
// LockMonitor.Instance.Reset();
|
||||
.ContinueWith((t) => this.partition.LoadPublisher?.Submit((this.partition.PartitionId, loadInfoToPublish)));
|
||||
}
|
||||
|
||||
void UpdateLatencyTrend(PartitionLoadInfo info)
|
||||
{
|
||||
int activityLatencyCategory;
|
||||
|
||||
if (info.Activities == 0)
|
||||
{
|
||||
activityLatencyCategory = 0;
|
||||
}
|
||||
else if (info.ActivityLatencyMs < 100)
|
||||
{
|
||||
activityLatencyCategory = 1;
|
||||
}
|
||||
else if (info.ActivityLatencyMs < 1000)
|
||||
{
|
||||
activityLatencyCategory = 2;
|
||||
}
|
||||
else
|
||||
{
|
||||
activityLatencyCategory = 3;
|
||||
}
|
||||
|
||||
int workItemLatencyCategory;
|
||||
|
||||
if (info.WorkItems == 0)
|
||||
{
|
||||
workItemLatencyCategory = 0;
|
||||
}
|
||||
else if (info.WorkItemLatencyMs < 100)
|
||||
{
|
||||
workItemLatencyCategory = 1;
|
||||
}
|
||||
else if (info.WorkItemLatencyMs < 1000)
|
||||
{
|
||||
workItemLatencyCategory = 2;
|
||||
}
|
||||
else
|
||||
{
|
||||
workItemLatencyCategory = 3;
|
||||
}
|
||||
|
||||
if (info.LatencyTrend.Length == PartitionLoadInfo.LatencyTrendLength)
|
||||
{
|
||||
info.LatencyTrend = info.LatencyTrend.Substring(1, PartitionLoadInfo.LatencyTrendLength - 1);
|
||||
}
|
||||
|
||||
info.LatencyTrend = info.LatencyTrend
|
||||
+ PartitionLoadInfo.LatencyCategories[Math.Max(activityLatencyCategory, workItemLatencyCategory)];
|
||||
}
|
||||
|
||||
enum CheckpointTrigger
|
||||
|
@ -336,6 +296,9 @@ namespace DurableTask.Netherite.Faster
|
|||
this.partition.Assert(partitionEvent.NextInputQueuePosition > this.InputQueuePosition);
|
||||
this.InputQueuePosition = partitionEvent.NextInputQueuePosition;
|
||||
}
|
||||
|
||||
// since we are processing actual events, our latency category is at least "low"
|
||||
this.loadInfo.MarkActive();
|
||||
}
|
||||
|
||||
if (this.isShuttingDown || this.cancellationToken.IsCancellationRequested)
|
||||
|
@ -379,7 +342,7 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
}
|
||||
|
||||
if (this.lastPublishedTime + PublishInterval < DateTime.UtcNow)
|
||||
if (this.lastPublished + PublishInterval < DateTime.UtcNow)
|
||||
{
|
||||
await this.PublishPartitionLoad().ConfigureAwait(false);
|
||||
}
|
||||
|
@ -521,5 +484,21 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
this.numberEventsSinceLastCheckpoint++;
|
||||
}
|
||||
|
||||
async Task PokeLoop()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
await Task.Delay(StoreWorker.PokePeriod, this.cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (this.cancellationToken.IsCancellationRequested || this.isShuttingDown)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// periodically poke so we can checkpoint, and publish load
|
||||
this.Notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -228,10 +228,10 @@ namespace DurableTask.Netherite
|
|||
}
|
||||
|
||||
[Event(246, Level = EventLevel.Informational, Version = 1)]
|
||||
public void PartitionLoadPublished(string Account, string TaskHub, int PartitionId, int WorkItems, int Activities, int Timers, int Requests, int Outbox, string NextTimer, long ActivityLatencyMs, long WorkItemLatencyMs, string WorkerId, string LatencyTrend, double MissRate, long InputQueuePosition, long CommitLogPosition, string ExtensionVersion)
|
||||
public void PartitionLoadPublished(string Account, string TaskHub, int PartitionId, int WorkItems, int Activities, int Timers, int Requests, int Outbox, string NextTimer, string WorkerId, string LatencyTrend, double MissRate, long InputQueuePosition, long CommitLogPosition, string ExtensionVersion)
|
||||
{
|
||||
SetCurrentThreadActivityId(serviceInstanceId);
|
||||
this.WriteEvent(246, Account, TaskHub, PartitionId, WorkItems, Activities, Timers, Requests, Outbox, NextTimer, ActivityLatencyMs, WorkItemLatencyMs, WorkerId, LatencyTrend, MissRate, InputQueuePosition, CommitLogPosition, ExtensionVersion);
|
||||
this.WriteEvent(246, Account, TaskHub, PartitionId, WorkItems, Activities, Timers, Requests, Outbox, NextTimer, WorkerId, LatencyTrend, MissRate, InputQueuePosition, CommitLogPosition, ExtensionVersion);
|
||||
}
|
||||
|
||||
// ----- Faster Storage
|
||||
|
|
|
@ -43,12 +43,12 @@ namespace DurableTask.Netherite
|
|||
{
|
||||
if (this.logLevelLimit <= LogLevel.Information)
|
||||
{
|
||||
this.logger.LogInformation("Part{partition:D2} Publishing LoadInfo WorkItems={workItems} Activities={activities} Timers={timers} Requests={requests} Outbox={outbox} Wakeup={wakeup} ActivityLatencyMs={activityLatencyMs} WorkItemLatencyMs={workItemLatencyMs} WorkerId={workerId} LatencyTrend={latencyTrend} MissRate={missRate} InputQueuePosition={inputQueuePosition} CommitLogPosition={commitLogPosition}",
|
||||
this.partitionId, info.WorkItems, info.Activities, info.Timers, info.Requests, info.Outbox, info.Wakeup, info.ActivityLatencyMs, info.WorkItemLatencyMs, info.WorkerId, info.LatencyTrend, info.MissRate, info.InputQueuePosition, info.CommitLogPosition);
|
||||
this.logger.LogInformation("Part{partition:D2} Publishing LoadInfo WorkItems={workItems} Activities={activities} Timers={timers} Requests={requests} Outbox={outbox} Wakeup={wakeup} WorkerId={workerId} LatencyTrend={latencyTrend} MissRate={missRate} InputQueuePosition={inputQueuePosition} CommitLogPosition={commitLogPosition}",
|
||||
this.partitionId, info.WorkItems, info.Activities, info.Timers, info.Requests, info.Outbox, info.Wakeup, info.WorkerId, info.LatencyTrend, info.MissRate, info.InputQueuePosition, info.CommitLogPosition);
|
||||
|
||||
if (EtwSource.Log.IsEnabled())
|
||||
{
|
||||
EtwSource.Log.PartitionLoadPublished(this.account, this.taskHub, this.partitionId, info.WorkItems, info.Activities, info.Timers, info.Requests, info.Outbox, info.Wakeup?.ToString("o") ?? "", info.ActivityLatencyMs, info.WorkItemLatencyMs, info.WorkerId, info.LatencyTrend, info.MissRate, info.InputQueuePosition, info.CommitLogPosition, TraceUtils.ExtensionVersion);
|
||||
EtwSource.Log.PartitionLoadPublished(this.account, this.taskHub, this.partitionId, info.WorkItems, info.Activities, info.Timers, info.Requests, info.Outbox, info.Wakeup?.ToString("o") ?? "", info.WorkerId, info.LatencyTrend, info.MissRate, info.InputQueuePosition, info.CommitLogPosition, TraceUtils.ExtensionVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\DurableTask.Netherite.AzureFunctions\DurableTask.Netherite.AzureFunctions.csproj" />
|
||||
<ProjectReference Include="..\DurableTask.Netherite.Tests\DurableTask.Netherite.Tests.csproj" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.1" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.2" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
<AzureFunctionsVersion>v3</AzureFunctionsVersion>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.4.0" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.4.3" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="4.2.0" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.3" />
|
||||
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.1" />
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
"rollbackEntityOperationsOnExceptions": "true",
|
||||
"UseGracefulShutdown": "true",
|
||||
"storageProvider": {
|
||||
"type": "Netherite",
|
||||
"StorageConnectionName": "AzureWebJobsStorage",
|
||||
"EventHubsConnectionName": "EventHubsConnection",
|
||||
"partitionCount": 12,
|
||||
|
|
|
@ -4,8 +4,8 @@
|
|||
<AzureFunctionsVersion>v3</AzureFunctionsVersion>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.4.0" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.1" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.4.3" />
|
||||
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.2" />
|
||||
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.1" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.8" />
|
||||
</ItemGroup>
|
||||
|
|
|
@ -58,6 +58,8 @@
|
|||
|
||||
"storageProvider": {
|
||||
|
||||
"type": "Netherite",
|
||||
|
||||
"StorageConnectionName": "AzureWebJobsStorage",
|
||||
"EventHubsConnectionName": "EventHubsConnection",
|
||||
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
#!/usr/bin/pwsh
|
||||
param (
|
||||
$Plan="EP2",
|
||||
$NumNodes="4",
|
||||
$MinNodes="1",
|
||||
$MaxNodes="20",
|
||||
$Configuration="Release"
|
||||
)
|
||||
|
||||
|
@ -30,9 +31,17 @@ else
|
|||
Write-Host "Function app already exists."
|
||||
}
|
||||
|
||||
Write-Host "Configuring Scale=$NumNodes..."
|
||||
az functionapp plan update -g $groupName -n $functionAppName --max-burst $numNodes --number-of-workers $numNodes --min-instances $numNodes
|
||||
az resource update -n $functionAppName/config/web -g $groupName --set properties.minimumElasticInstanceCount=$numNodes --resource-type Microsoft.Web/sites
|
||||
Write-Host "Configuring Scale=$MinNodes-$MaxNodes"
|
||||
az functionapp plan update -g $groupName -n $functionAppName --max-burst $MaxNodes --number-of-workers $MinNodes --min-instances $MinNodes
|
||||
az resource update -n $functionAppName/config/web -g $groupName --set properties.minimumElasticInstanceCount=$MinNodes --resource-type Microsoft.Web/sites
|
||||
if ($MinNode -eq $MaxNodes)
|
||||
{
|
||||
az resource update -n $functionAppName/config/web -g $groupName --set properties.functionsRuntimeScaleMonitoringEnabled=0 --resource-type Microsoft.Web/sites
|
||||
}
|
||||
else
|
||||
{
|
||||
az resource update -n $functionAppName/config/web -g $groupName --set properties.functionsRuntimeScaleMonitoringEnabled=1 --resource-type Microsoft.Web/sites
|
||||
}
|
||||
|
||||
Write-Host "Publishing Code to Function App..."
|
||||
func azure functionapp publish $functionAppName
|
||||
|
|
Загрузка…
Ссылка в новой задаче