implement taskhub storage format descriptor. (#35)
This commit is contained in:
Родитель
c2c974a1c2
Коммит
ae3d25c1c2
|
@ -47,7 +47,12 @@ namespace DurableTask.Netherite
|
|||
public DateTime Timestamp { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public bool IsPersisted { get; set; }
|
||||
public PersistFirstStatus PersistFirst { get; set; }
|
||||
|
||||
public enum PersistFirstStatus { NotRequired, Required, Done };
|
||||
|
||||
[DataMember]
|
||||
public int PackPartitionTaskMessages { get; set; }
|
||||
|
||||
[IgnoreDataMember]
|
||||
public OrchestrationWorkItem WorkItemForReuse { get; set; }
|
||||
|
@ -56,7 +61,7 @@ namespace DurableTask.Netherite
|
|||
public string WorkItemId => SessionsState.GetWorkItemId(this.PartitionId, this.SessionId, this.BatchStartPosition);
|
||||
|
||||
[IgnoreDataMember]
|
||||
public override EventId EventId => EventId.MakePartitionInternalEventId(this.IsPersisted ? this.WorkItemId + "P" : this.WorkItemId);
|
||||
public override EventId EventId => EventId.MakePartitionInternalEventId(this.PersistFirst == PersistFirstStatus.Done ? this.WorkItemId + "P" : this.WorkItemId);
|
||||
|
||||
[IgnoreDataMember]
|
||||
public override IEnumerable<(TaskMessage, string)> TracedTaskMessages
|
||||
|
|
|
@ -264,6 +264,11 @@ namespace DurableTask.Netherite
|
|||
|
||||
await this.taskHub.StartAsync().ConfigureAwait(false);
|
||||
|
||||
if (this.Settings.PartitionCount != this.NumberPartitions)
|
||||
{
|
||||
this.Logger.LogWarning("NetheriteOrchestrationService is ignoring configuration setting partitionCount={specifiedPartitions} because existing TaskHub has {actualPartitions} partitions", this.Settings.PartitionCount, this.NumberPartitions);
|
||||
}
|
||||
|
||||
System.Diagnostics.Debug.Assert(this.client != null, "Backend should have added client");
|
||||
}
|
||||
catch (Exception e) when (!Utils.IsFatal(e))
|
||||
|
@ -272,6 +277,11 @@ namespace DurableTask.Netherite
|
|||
string message = $"NetheriteOrchestrationService failed to start: {e.Message}";
|
||||
EtwSource.Log.OrchestrationServiceError(this.StorageAccountName, message, e.ToString(), this.Settings.HubName, this.Settings.WorkerId, TraceUtils.AppName, TraceUtils.ExtensionVersion);
|
||||
this.Logger.LogError("NetheriteOrchestrationService failed to start: {exception}", e);
|
||||
|
||||
this.serviceShutdownSource.Cancel();
|
||||
this.serviceShutdownSource.Dispose();
|
||||
this.serviceShutdownSource = null;
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -660,6 +670,8 @@ namespace DurableTask.Netherite
|
|||
BatchLength = messageBatch.BatchLength,
|
||||
NewEvents = (List<HistoryEvent>)newOrchestrationRuntimeState.NewEvents,
|
||||
WorkItemForReuse = cacheWorkItemForReuse ? orchestrationWorkItem : null,
|
||||
PackPartitionTaskMessages = partition.Settings.PackPartitionTaskMessages,
|
||||
PersistFirst = partition.Settings.PersistStepsFirst ? BatchProcessed.PersistFirstStatus.Required : BatchProcessed.PersistFirstStatus.NotRequired,
|
||||
State = state,
|
||||
ActivityMessages = (List<TaskMessage>)outboundMessages,
|
||||
LocalMessages = localMessages,
|
||||
|
|
|
@ -65,17 +65,17 @@ namespace DurableTask.Netherite
|
|||
|
||||
if (!effects.IsReplaying)
|
||||
{
|
||||
if (!this.Partition.Settings.PersistStepsFirst)
|
||||
if (evt is BatchProcessed batchProcessedEvt
|
||||
&& batchProcessedEvt.PersistFirst == BatchProcessed.PersistFirstStatus.Done)
|
||||
{
|
||||
// we must not send messages until this step has been persisted
|
||||
evt.OutboxBatch = batch;
|
||||
DurabilityListeners.Register(evt, this);
|
||||
}
|
||||
else
|
||||
{
|
||||
// we can send the messages now
|
||||
// in this special case the event is actually already persisted so we can send right away
|
||||
this.Send(batch);
|
||||
return;
|
||||
}
|
||||
|
||||
// register for a durability notification, at which point we will send the batch
|
||||
evt.OutboxBatch = batch;
|
||||
DurabilityListeners.Register(evt, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -184,7 +184,7 @@ namespace DurableTask.Netherite
|
|||
outmessage.SubPosition = ++subPosition;
|
||||
}
|
||||
|
||||
if (effects.Partition.Settings.PackPartitionTaskMessages > 1)
|
||||
if (evt.PackPartitionTaskMessages > 1)
|
||||
{
|
||||
// pack multiple TaskMessages for the same destination into a single TaskMessagesReceived event
|
||||
var sorted = new Dictionary<uint, TaskMessagesReceived>();
|
||||
|
@ -202,7 +202,7 @@ namespace DurableTask.Netherite
|
|||
AddMessage(outmessage, message);
|
||||
|
||||
// send the message if we have reached the pack limit
|
||||
if (outmessage.NumberMessages >= effects.Partition.Settings.PackPartitionTaskMessages)
|
||||
if (outmessage.NumberMessages >= evt.PackPartitionTaskMessages)
|
||||
{
|
||||
batch.OutgoingMessages.Add(outmessage);
|
||||
sorted.Remove(destination);
|
||||
|
|
|
@ -258,16 +258,16 @@ namespace DurableTask.Netherite
|
|||
public void ConfirmDurable(Event evt)
|
||||
{
|
||||
var evtCopy = (BatchProcessed) ((BatchProcessed) evt).Clone();
|
||||
evtCopy.IsPersisted = true;
|
||||
evtCopy.PersistFirst = BatchProcessed.PersistFirstStatus.Done;
|
||||
this.Partition.SubmitInternalEvent(evtCopy);
|
||||
}
|
||||
|
||||
public void Process(BatchProcessed evt, EffectTracker effects)
|
||||
{
|
||||
// if speculation is disabled,
|
||||
if (effects.Partition.Settings.PersistStepsFirst)
|
||||
if (evt.PersistFirst != BatchProcessed.PersistFirstStatus.NotRequired)
|
||||
{
|
||||
if (!evt.IsPersisted)
|
||||
if (evt.PersistFirst == BatchProcessed.PersistFirstStatus.Required)
|
||||
{
|
||||
// we do not process this event right away
|
||||
// but persist it first, and then submit it again, before processing it.
|
||||
|
|
|
@ -5,11 +5,13 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
using DurableTask.Core.Common;
|
||||
using FASTER.core;
|
||||
using ImpromptuInterface;
|
||||
using Microsoft.Azure.Storage;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using Microsoft.Azure.Storage.RetryPolicies;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
|
@ -102,6 +104,44 @@ namespace DurableTask.Netherite.Faster
|
|||
20, // 1MB
|
||||
};
|
||||
|
||||
const int StorageFormatVersion = 1;
|
||||
|
||||
public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings)
|
||||
{
|
||||
return JsonConvert.SerializeObject(new
|
||||
{
|
||||
UseAlternateObjectStore = settings.UseAlternateObjectStore,
|
||||
UsePSFQueries = settings.UsePSFQueries,
|
||||
FormatVersion = StorageFormatVersion,
|
||||
},
|
||||
Formatting.None);
|
||||
}
|
||||
|
||||
public static void CheckStorageFormat(string format, NetheriteOrchestrationServiceSettings settings)
|
||||
{
|
||||
try
|
||||
{
|
||||
JObject json = JsonConvert.DeserializeObject<JObject>(format);
|
||||
|
||||
if ((bool)json["UseAlternateObjectStore"] != settings.UseAlternateObjectStore)
|
||||
{
|
||||
throw new InvalidOperationException("The Netherite configuration setting 'UseAlternateObjectStore' is incompatible with the existing taskhub.");
|
||||
}
|
||||
if ((bool)json["UsePSFQueries"] != settings.UsePSFQueries)
|
||||
{
|
||||
throw new InvalidOperationException("The Netherite configuration setting 'UsePSFQueries' is incompatible with the existing taskhub.");
|
||||
}
|
||||
if ((int)json["FormatVersion"] != StorageFormatVersion)
|
||||
{
|
||||
throw new InvalidOperationException("The current storage format version is incompatible with the existing taskhub.");
|
||||
}
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
throw new InvalidOperationException("The taskhub has an incompatible storage format", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
//TODO figure out what is supposed to go here
|
||||
|
|
|
@ -13,6 +13,7 @@ namespace DurableTask.Netherite.EventHubs
|
|||
using Microsoft.Azure.Storage;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using Newtonsoft.Json;
|
||||
using DurableTask.Netherite.Faster;
|
||||
|
||||
/// <summary>
|
||||
/// The EventHubs transport implementation.
|
||||
|
@ -107,6 +108,7 @@ namespace DurableTask.Netherite.EventHubs
|
|||
TaskhubName = this.settings.HubName,
|
||||
TaskhubGuid = Guid.NewGuid(),
|
||||
CreationTimestamp = DateTime.UtcNow,
|
||||
StorageFormat = BlobManager.GetStorageFormat(this.settings),
|
||||
PartitionHubs = EventHubsTransport.PartitionHubs,
|
||||
ClientHubs = EventHubsTransport.ClientHubs,
|
||||
PartitionConsumerGroup = EventHubsTransport.PartitionConsumerGroup,
|
||||
|
@ -161,8 +163,11 @@ namespace DurableTask.Netherite.EventHubs
|
|||
throw new InvalidOperationException($"The specified taskhub name does not match the task hub name in {this.taskhubParameters.Name}");
|
||||
}
|
||||
|
||||
this.host.NumberPartitions = (uint)this.parameters.StartPositions.Length;
|
||||
// check that the storage format is supported
|
||||
BlobManager.CheckStorageFormat(this.parameters.StorageFormat, this.settings);
|
||||
|
||||
this.host.NumberPartitions = (uint)this.parameters.StartPositions.Length;
|
||||
|
||||
this.connections = new EventHubsConnections(this.settings.ResolvedTransportConnectionString, this.parameters.PartitionHubs, this.parameters.ClientHubs)
|
||||
{
|
||||
Host = host,
|
||||
|
|
|
@ -22,6 +22,9 @@ namespace DurableTask.Netherite.EventHubs
|
|||
[DataMember]
|
||||
public DateTime CreationTimestamp { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public string StorageFormat { get; set; }
|
||||
|
||||
[DataMember]
|
||||
public string[] PartitionHubs { get; set; }
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче