Minor tracing changes and refactoring (#116)

* use explicit interface for IOrchestrationService, and add some eventhubs tracing

* remove configureawait and make tracing for received events more visible

* update offset provider

* two more tracing changes
This commit is contained in:
Sebastian Burckhardt 2022-01-25 07:27:20 -08:00 коммит произвёл GitHub
Родитель 079eceb208
Коммит a9e6dbca23
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 98 добавлений и 61 удалений

Просмотреть файл

@ -243,45 +243,47 @@ namespace DurableTask.Netherite
/******************************/
/// <inheritdoc />
public async Task CreateAsync() => await ((IOrchestrationService)this).CreateAsync(true).ConfigureAwait(false);
async Task IOrchestrationService.CreateAsync() => await ((IOrchestrationService)this).CreateAsync(true);
/// <inheritdoc />
public async Task CreateAsync(bool recreateInstanceStore)
async Task IOrchestrationService.CreateAsync(bool recreateInstanceStore)
{
if (await this.taskHub.ExistsAsync().ConfigureAwait(false))
if (await this.taskHub.ExistsAsync())
{
if (recreateInstanceStore)
{
await this.taskHub.DeleteAsync().ConfigureAwait(false);
await this.taskHub.CreateIfNotExistsAsync().ConfigureAwait(false);
this.TraceHelper.TraceProgress("Creating");
await this.taskHub.DeleteAsync();
await this.taskHub.CreateIfNotExistsAsync();
}
}
else
{
await this.taskHub.CreateIfNotExistsAsync().ConfigureAwait(false);
await this.taskHub.CreateIfNotExistsAsync();
}
if (!(this.LoadMonitorService is null))
await this.LoadMonitorService.CreateIfNotExistsAsync(CancellationToken.None).ConfigureAwait(false);
await this.LoadMonitorService.CreateIfNotExistsAsync(CancellationToken.None);
}
/// <inheritdoc />
public async Task CreateIfNotExistsAsync() => await ((IOrchestrationService)this).CreateAsync(false).ConfigureAwait(false);
async Task IOrchestrationService.CreateIfNotExistsAsync() => await ((IOrchestrationService)this).CreateAsync(false);
/// <inheritdoc />
public async Task DeleteAsync()
async Task IOrchestrationService.DeleteAsync()
{
await this.taskHub.DeleteAsync().ConfigureAwait(false);
await this.taskHub.DeleteAsync();
if (!(this.LoadMonitorService is null))
await this.LoadMonitorService.DeleteIfExistsAsync(CancellationToken.None).ConfigureAwait(false);
await this.LoadMonitorService.DeleteIfExistsAsync(CancellationToken.None);
}
/// <inheritdoc />
public async Task DeleteAsync(bool deleteInstanceStore) => await this.DeleteAsync().ConfigureAwait(false);
async Task IOrchestrationService.DeleteAsync(bool deleteInstanceStore) => await ((IOrchestrationService)this).DeleteAsync();
/// <inheritdoc />
public async Task StartAsync()
async Task IOrchestrationService.StartAsync()
{
try
{
@ -343,7 +345,7 @@ namespace DurableTask.Netherite
}
/// <inheritdoc />
public async Task StopAsync(bool quickly)
async Task IOrchestrationService.StopAsync(bool quickly)
{
try
{
@ -375,7 +377,7 @@ namespace DurableTask.Netherite
}
/// <inheritdoc />
public Task StopAsync() => ((IOrchestrationService)this).StopAsync(false);
Task IOrchestrationService.StopAsync() => ((IOrchestrationService)this).StopAsync(false);
/// <inheritdoc/>
public void Dispose() => this.taskHub.StopAsync();

Просмотреть файл

@ -142,7 +142,7 @@ namespace DurableTask.Netherite.EventHubs
{
try
{
await Task.Delay(-1, prior.ErrorHandler.Token).ConfigureAwait(false);
await Task.Delay(-1, prior.ErrorHandler.Token);
}
catch (OperationCanceledException)
{
@ -153,7 +153,7 @@ namespace DurableTask.Netherite.EventHubs
// we are now becoming the current incarnation
this.currentIncarnation = prior.Next;
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is restarting partition (incarnation {incarnation}) soon", this.eventHubName, this.eventHubPartition, c.Incarnation);
await Task.Delay(TimeSpan.FromSeconds(12), this.eventProcessorShutdown.Token).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(12), this.eventProcessorShutdown.Token);
}
}
@ -182,7 +182,7 @@ namespace DurableTask.Netherite.EventHubs
// start this partition (which may include waiting for the lease to become available)
c.Partition = this.host.AddPartition(this.partitionId, this.sender);
c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters.StartPositions[this.partitionId]).ConfigureAwait(false);
c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters.StartPositions[this.partitionId]);
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} started partition (incarnation {incarnation}), next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, c.Incarnation, c.NextPacketToReceive);
@ -241,7 +241,7 @@ namespace DurableTask.Netherite.EventHubs
else
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} stopping partition (incarnation: {incarnation}, quickly: {quickly})", this.eventHubName, this.eventHubPartition, current.Incarnation, quickly);
await current.Partition.StopAsync(quickly).ConfigureAwait(false);
await current.Partition.StopAsync(quickly);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} stopped partition (incarnation {incarnation})", this.eventHubName, this.eventHubPartition, current.Incarnation);
}
@ -269,7 +269,7 @@ namespace DurableTask.Netherite.EventHubs
if (reason != CloseReason.LeaseLost)
{
await this.SaveEventHubsReceiverCheckpoint(context, 0).ConfigureAwait(false);
await this.SaveEventHubsReceiverCheckpoint(context, 0);
}
await this.IdempotentShutdown("CloseAsync", reason == CloseReason.LeaseLost);
@ -290,7 +290,7 @@ namespace DurableTask.Netherite.EventHubs
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} is checkpointing receive position through #{seqno}", this.eventHubName, this.eventHubPartition, checkpoint.SequenceNumber);
try
{
await context.CheckpointAsync(checkpoint).ConfigureAwait(false);
await context.CheckpointAsync(checkpoint);
this.lastCheckpointedOffset = long.Parse(checkpoint.Offset);
}
catch (Exception e) when (!Utils.IsFatal(e))
@ -334,7 +334,7 @@ namespace DurableTask.Netherite.EventHubs
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> packets)
{
this.traceHelper.LogTrace("EventHubsProcessor {eventHubName}/{eventHubPartition} is receiving #{seqno}", this.eventHubName, this.eventHubPartition, packets.First().SystemProperties.SequenceNumber);
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} is receiving events starting with #{seqno}", this.eventHubName, this.eventHubPartition, packets.First().SystemProperties.SequenceNumber);
if (!this.lastCheckpointedOffset.HasValue)
{
@ -343,11 +343,11 @@ namespace DurableTask.Netherite.EventHubs
this.lastCheckpointedOffset = first == null ? null : long.Parse(first.SystemProperties.Offset);
}
PartitionIncarnation current = await this.currentIncarnation.ConfigureAwait(false);
PartitionIncarnation current = await this.currentIncarnation;
while (current != null && current.ErrorHandler.IsTerminated)
{
current = await current.Next.ConfigureAwait(false);
current = await current.Next;
}
@ -434,7 +434,7 @@ namespace DurableTask.Netherite.EventHubs
current.Partition.SubmitEvents(batch);
}
await this.SaveEventHubsReceiverCheckpoint(context, 600000).ConfigureAwait(false);
await this.SaveEventHubsReceiverCheckpoint(context, 600000);
// can use this for testing: terminates partition after every one packet received, but
// that packet is then processed once the partition recovers, so in the end there is progress

Просмотреть файл

@ -82,7 +82,7 @@ namespace DurableTask.Netherite.EventHubs
// try load the taskhub parameters
try
{
var jsonText = await this.taskhubParameters.DownloadTextAsync().ConfigureAwait(false);
var jsonText = await this.taskhubParameters.DownloadTextAsync();
return JsonConvert.DeserializeObject<TaskhubParameters>(jsonText);
}
catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == 404)
@ -93,15 +93,23 @@ namespace DurableTask.Netherite.EventHubs
async Task<bool> ExistsAsync()
{
var parameters = await this.TryLoadExistingTaskhubAsync().ConfigureAwait(false);
var parameters = await this.TryLoadExistingTaskhubAsync();
return (parameters != null && parameters.TaskhubName == this.settings.HubName);
}
async Task<bool> CreateIfNotExistsAsync()
{
await this.cloudBlobContainer.CreateIfNotExistsAsync().ConfigureAwait(false);
bool containerCreated = await this.cloudBlobContainer.CreateIfNotExistsAsync();
if (containerCreated)
{
this.traceHelper.LogInformation("Created new blob container at {container}", this.cloudBlobContainer.Uri);
}
else
{
this.traceHelper.LogInformation("Using existing blob container at {container}", this.cloudBlobContainer.Uri);
}
// ensure the task hubs exist, creating them if necessary
// ensure the event hubs exist, creating them if necessary
var tasks = new List<Task>();
tasks.Add(EventHubsUtil.EnsureEventHubExistsAsync(this.settings.ResolvedTransportConnectionString, PartitionHubs[0], this.settings.PartitionCount));
if (ActivityScheduling.RequiresLoadMonitor(this.settings.ActivityScheduler))
@ -118,6 +126,8 @@ namespace DurableTask.Netherite.EventHubs
(long[] startPositions, DateTime[] creationTimestamps, string namespaceEndpoint)
= await EventHubsConnections.GetPartitionInfo(this.settings.ResolvedTransportConnectionString, EventHubsTransport.PartitionHubs);
this.traceHelper.LogInformation("Confirmed eventhubs positions=[{positions}] endpoint={endpoint}", string.Join(",", startPositions.Select(x => $"#{x}")), namespaceEndpoint);
var taskHubParameters = new TaskhubParameters()
{
TaskhubName = this.settings.HubName,
@ -142,11 +152,14 @@ namespace DurableTask.Netherite.EventHubs
new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None });
var noOverwrite = AccessCondition.GenerateIfNoneMatchCondition("*");
await this.taskhubParameters.UploadTextAsync(jsonText, null, noOverwrite, null, null).ConfigureAwait(false);
await this.taskhubParameters.UploadTextAsync(jsonText, null, noOverwrite, null, null);
this.traceHelper.LogInformation("Created new taskhub");
}
catch(StorageException e) when (BlobUtils.BlobAlreadyExists(e))
catch (StorageException e) when (BlobUtils.BlobAlreadyExists(e))
{
// taskhub already exists, possibly because a different node created it faster
this.traceHelper.LogInformation("Confirmed existing taskhub");
return false;
}
@ -156,13 +169,13 @@ namespace DurableTask.Netherite.EventHubs
async Task DeleteAsync()
{
if (await this.taskhubParameters.ExistsAsync().ConfigureAwait(false))
if (await this.taskhubParameters.ExistsAsync())
{
await BlobUtils.ForceDeleteAsync(this.taskhubParameters).ConfigureAwait(false);
await BlobUtils.ForceDeleteAsync(this.taskhubParameters);
}
// todo delete consumption checkpoints
await this.host.StorageProvider.DeleteAllPartitionStatesAsync().ConfigureAwait(false);
await this.host.StorageProvider.DeleteAllPartitionStatesAsync();
}
async Task StartAsync()
@ -170,7 +183,7 @@ namespace DurableTask.Netherite.EventHubs
this.shutdownSource = new CancellationTokenSource();
// load the taskhub parameters
var jsonText = await this.taskhubParameters.DownloadTextAsync().ConfigureAwait(false);
var jsonText = await this.taskhubParameters.DownloadTextAsync();
this.parameters = JsonConvert.DeserializeObject<TaskhubParameters>(jsonText);
this.taskhubGuid = this.parameters.TaskhubGuid.ToByteArray();
@ -230,11 +243,11 @@ namespace DurableTask.Netherite.EventHubs
{
if (ActivityScheduling.RequiresLoadMonitor(this.settings.ActivityScheduler))
{
await Task.WhenAll(StartPartitionHost(), StartLoadMonitorHost()).ConfigureAwait(false);
await Task.WhenAll(StartPartitionHost(), StartLoadMonitorHost());
}
else
{
await StartPartitionHost().ConfigureAwait(false);
await StartPartitionHost();
}
}
@ -259,14 +272,26 @@ namespace DurableTask.Netherite.EventHubs
var processorOptions = new EventProcessorOptions()
{
InitialOffsetProvider = (s) => EventPosition.FromSequenceNumber(this.parameters.StartPositions[int.Parse(s)] - 1),
InitialOffsetProvider = (s) => {
var pos = this.parameters.StartPositions[int.Parse(s)];
if (pos > 0)
{
return EventPosition.FromSequenceNumber(pos - 1, inclusive: false);
}
else
{
return EventPosition.FromStart();
}
},
MaxBatchSize = 300,
PrefetchCount = 500,
};
await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
new PartitionEventProcessorFactory(this),
processorOptions).ConfigureAwait(false);
processorOptions);
this.traceHelper.LogInformation($"Partition Host started");
}
else
{
@ -313,7 +338,7 @@ namespace DurableTask.Netherite.EventHubs
await this.loadMonitorHost.RegisterEventProcessorFactoryAsync(
new LoadMonitorEventProcessorFactory(this),
processorOptions).ConfigureAwait(false);
processorOptions);
}
}
@ -368,7 +393,7 @@ namespace DurableTask.Netherite.EventHubs
this.shutdownSource.Cancel(); // initiates shutdown of client and of all partitions
this.traceHelper.LogDebug("Stopping client");
await this.client.StopAsync().ConfigureAwait(false);
await this.client.StopAsync();
if (this.settings.PartitionManagement != PartitionManagementOptions.ClientOnly)
{
@ -377,12 +402,12 @@ namespace DurableTask.Netherite.EventHubs
this.traceHelper.LogDebug("Stopping partition and loadmonitor hosts");
await Task.WhenAll(
this.StopPartitionHost(),
this.loadMonitorHost.UnregisterEventProcessorAsync()).ConfigureAwait(false);
this.loadMonitorHost.UnregisterEventProcessorAsync());
}
else
{
this.traceHelper.LogDebug("Stopping partition host");
await this.eventProcessorHost.UnregisterEventProcessorAsync().ConfigureAwait(false);
await this.eventProcessorHost.UnregisterEventProcessorAsync();
}
}
@ -390,7 +415,7 @@ namespace DurableTask.Netherite.EventHubs
await this.clientProcessTask;
this.traceHelper.LogDebug("Closing connections");
await this.connections.StopAsync().ConfigureAwait(false);
await this.connections.StopAsync();
this.traceHelper.LogInformation("EventHubsBackend shutdown completed");
}
@ -448,6 +473,7 @@ namespace DurableTask.Netherite.EventHubs
// receive a dummy packet to establish connection
// (the packet, if any, cannot be for this receiver because it is fresh)
await receiver.ReceiveAsync(1, TimeSpan.FromMilliseconds(1));
this.traceHelper.LogDebug("Client{clientId}.ch{index} connection established", Client.GetShortId(this.ClientId), index);
}
catch (Exception exception)
{

Просмотреть файл

@ -62,8 +62,10 @@ namespace DurableTask.Netherite.Tests
{
// start the service
var service = new NetheriteOrchestrationService(settings, this.loggerFactory);
await service.CreateAsync();
await service.StartAsync();
var orchestrationService = (IOrchestrationService)service;
var orchestrationServiceClient = (IOrchestrationServiceClient)service;
await orchestrationService.CreateAsync();
await orchestrationService.StartAsync();
var host = (TransportAbstraction.IHost)service;
Assert.Equal(1u, service.NumberPartitions);
var worker = new TaskHubWorker(service);
@ -76,22 +78,24 @@ namespace DurableTask.Netherite.Tests
await client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(20));
// stop the service
await service.StopAsync();
await orchestrationService.StopAsync();
}
{
// start the service
var service = new NetheriteOrchestrationService(settings, this.loggerFactory);
await service.CreateAsync();
await service.StartAsync();
var orchestrationService = (IOrchestrationService)service;
var orchestrationServiceClient = (IOrchestrationServiceClient)service;
await orchestrationService.CreateAsync();
await orchestrationService.StartAsync();
var host = (TransportAbstraction.IHost)service;
Assert.Equal(1u, service.NumberPartitions);
var client = new TaskHubClient(service);
var client = new TaskHubClient(orchestrationServiceClient);
var orchestrationState = await client.GetOrchestrationStateAsync("0");
Assert.Equal(OrchestrationStatus.Completed, orchestrationState.OrchestrationStatus);
// stop the service
await service.StopAsync();
await orchestrationService.StopAsync();
}
}
@ -140,8 +144,9 @@ namespace DurableTask.Netherite.Tests
// start the service
var service = new NetheriteOrchestrationService(settings, this.loggerFactory);
await service.CreateAsync();
await service.StartAsync();
var orchestrationService = (IOrchestrationService)service;
await orchestrationService.CreateAsync();
await orchestrationService.StartAsync();
var host = (TransportAbstraction.IHost)service;
Assert.Equal(1u, service.NumberPartitions);
var worker = new TaskHubWorker(service);
@ -242,7 +247,7 @@ namespace DurableTask.Netherite.Tests
}
// shut down the service
await service.StopAsync();
await orchestrationService.StopAsync();
}
}
}

Просмотреть файл

@ -254,14 +254,15 @@ namespace DurableTask.Netherite.Tests
Trace.WriteLine("Starting the orchestration service...");
var settings = TestConstants.GetNetheriteOrchestrationServiceSettings();
var service = new NetheriteOrchestrationService(settings, this.loggerFactory);
await service.CreateAsync(true);
await service.StartAsync();
var orchestrationService = (IOrchestrationService)service;
await orchestrationService.CreateAsync(true);
await orchestrationService.StartAsync();
Trace.WriteLine("Orchestration service is started.");
var _ = await ((IOrchestrationServiceQueryClient)service).GetOrchestrationStateAsync();
Trace.WriteLine("shutting down the orchestration service...");
await service.StopAsync();
await orchestrationService.StopAsync();
Trace.WriteLine("Orchestration service is shut down.");
}
}

Просмотреть файл

@ -25,13 +25,14 @@ namespace DurableTask.Netherite.Tests
public TestOrchestrationHost(NetheriteOrchestrationServiceSettings settings, ILoggerFactory loggerFactory)
{
this.orchestrationService = new Netherite.NetheriteOrchestrationService(settings, loggerFactory);
var orchestrationService = (IOrchestrationService)this.orchestrationService;
if (TestConstants.DeleteStorageBeforeRunningTests)
{
this.orchestrationService.DeleteAsync().GetAwaiter().GetResult();
orchestrationService.DeleteAsync().GetAwaiter().GetResult();
}
this.orchestrationService.CreateAsync(false).GetAwaiter().GetResult();
orchestrationService.CreateAsync(false).GetAwaiter().GetResult();
this.settings = settings;

Просмотреть файл

@ -5,6 +5,7 @@ namespace ScalingTests
{
using System;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Netherite;
using DurableTask.Netherite.Scaling;
using Microsoft.Extensions.Logging;
@ -31,7 +32,8 @@ namespace ScalingTests
logger.LogInformation("Starting OrchestrationService...");
var service = new NetheriteOrchestrationService(settings, loggerFactory);
await service.StartAsync();
var orchestrationService = (IOrchestrationService)service;
await orchestrationService.StartAsync();
if (service.TryGetScalingMonitor(out ScalingMonitor scalingMonitor))
{
@ -65,7 +67,7 @@ namespace ScalingTests
// shut down
logger.LogInformation("OchestrationService stopping...");
await service.StopAsync();
await orchestrationService.StopAsync();
logger.LogInformation("Done");
}
}