зеркало из https://github.com/microsoft/BuildXL.git
Merged PR 718995: Migration to Azure.Messaging.EventHubs (integration fixed)
We want to start using SAS tokens but to do that we need to update to the new EventHub package (because Microsoft.Azure.EventHubs does not support SAS). Migration guide: https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md **Original PR**: https://dev.azure.com/mseng/Domino/_git/BuildXL.Internal/pullrequest/715628 **Issue**: Conflict between different versions of _System.Memory.Data_ (during integration with CloudBuild) **Logs**: https://cbtest.microsoft.com/stamp/SN_S1/getfile?path=\SA2PNPF00025749\d:\dbs\sh\cb_m\0517_105752\private\BuildEngine\Cache\CacheMissAnalysis\Logs\Retail\Amd64\build.log **Difference**: Only the last commit that changes the _System.Memory.Data_ version.
This commit is contained in:
Родитель
cb22c828b6
Коммит
e8cef0c55c
|
@ -28,7 +28,7 @@ export const kustoPackages = [
|
|||
@@public
|
||||
export function getSerializationPackages(includeNetStandard: boolean) : (Managed.ManagedNugetPackage | Managed.Assembly)[] {
|
||||
return [
|
||||
importFrom("System.Memory.Data").pkg,
|
||||
importFrom("System.Memory.Data").withQualifier({targetFramework: "netstandard2.0"}).pkg,
|
||||
...getSystemTextJson(includeNetStandard),
|
||||
...getSerializationPackagesWithoutNetStandard()
|
||||
];
|
||||
|
|
|
@ -7,7 +7,7 @@ namespace Distributed {
|
|||
|
||||
@@public
|
||||
export const eventHubPackages = [
|
||||
importFrom("Microsoft.Azure.EventHubs").pkg,
|
||||
importFrom("Azure.Messaging.EventHubs").pkg,
|
||||
importFrom("Azure.Identity").pkg,
|
||||
importFrom("Azure.Core").pkg,
|
||||
importFrom("Microsoft.Azure.Services.AppAuthentication").pkg,
|
||||
|
@ -19,7 +19,9 @@ namespace Distributed {
|
|||
importFrom("Microsoft.IdentityModel.Logging").pkg
|
||||
]
|
||||
: []),
|
||||
importFrom("Azure.Core.Amqp").pkg,
|
||||
importFrom("Microsoft.Azure.Amqp").pkg,
|
||||
importFrom("System.Memory.Data").withQualifier({targetFramework: "netstandard2.0"}).pkg,
|
||||
];
|
||||
|
||||
@@public
|
||||
|
|
|
@ -9,8 +9,11 @@ using BuildXL.Cache.ContentStore.Tracing.Internal;
|
|||
using BuildXL.Cache.ContentStore.Utils;
|
||||
using BuildXL.Cache.Host.Configuration;
|
||||
using BuildXL.Utilities.Core.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.Services.AppAuthentication;
|
||||
using Azure.Messaging.EventHubs;
|
||||
using Azure.Messaging.EventHubs.Producer;
|
||||
using Azure.Messaging.EventHubs.Consumer;
|
||||
using System.Collections.Generic;
|
||||
using Azure.Identity;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
||||
{
|
||||
|
@ -21,13 +24,10 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
{
|
||||
private const string PartitionId = "0";
|
||||
|
||||
private EventHubClient _eventHubClient;
|
||||
private PartitionSender _partitionSender;
|
||||
private readonly EventHubContentLocationEventStoreConfiguration _configuration;
|
||||
|
||||
private readonly string _hostName = Guid.NewGuid().ToString();
|
||||
|
||||
private PartitionReceiver _partitionReceiver;
|
||||
private EventHubProducerClient _partitionSender;
|
||||
private PartitionReceiverWrapper _partitionReceiver;
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override Tracer Tracer { get; } = new Tracer(nameof(AzureEventHubClient));
|
||||
|
@ -45,16 +45,15 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
|
||||
if (_partitionReceiver == null)
|
||||
{
|
||||
_partitionReceiver = _eventHubClient.CreateReceiver(
|
||||
_configuration.ConsumerGroupName,
|
||||
PartitionId,
|
||||
GetInitialOffset(context, sequencePoint),
|
||||
new ReceiverOptions()
|
||||
{
|
||||
Identifier = _hostName
|
||||
});
|
||||
|
||||
_partitionReceiver.SetReceiveHandler(processor);
|
||||
if (ManagedIdentityUriHelper.TryParseForManagedIdentity(_configuration.EventHubConnectionString, out string eventHubNamespace, out string eventHubName, out string managedIdentityId))
|
||||
{
|
||||
_partitionReceiver = new PartitionReceiverWrapper(_configuration.ConsumerGroupName, PartitionId, GetInitialOffset(context, sequencePoint), eventHubNamespace, eventHubName, new DefaultAzureCredential());
|
||||
}
|
||||
else
|
||||
{
|
||||
_partitionReceiver = new PartitionReceiverWrapper(_configuration.ConsumerGroupName, PartitionId, GetInitialOffset(context, sequencePoint), _configuration.EventHubConnectionString, _configuration.EventHubName);
|
||||
}
|
||||
_partitionReceiver.SetReceiveHandler(context, processor);
|
||||
}
|
||||
|
||||
return BoolResult.Success;
|
||||
|
@ -81,28 +80,15 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
|
||||
// Retry behavior in the Azure Event Hubs Client Library is controlled by the RetryPolicy property on the EventHubClient class.
|
||||
// The default policy retries with exponential backoff when Azure Event Hub returns a transient EventHubsException or an OperationCanceledException.
|
||||
if (ManagedIdentityUriHelper.TryParseForManagedIdentity(_configuration.EventHubConnectionString, out Uri eventHubNamespaceUri, out string eventHubName, out string managedIdentityId))
|
||||
if (ManagedIdentityUriHelper.TryParseForManagedIdentity(_configuration.EventHubConnectionString, out string eventHubNamespace, out string eventHubName, out string managedIdentityId))
|
||||
{
|
||||
// https://docs.microsoft.com/en-us/dotnet/api/overview/azure/service-to-service-authentication#connection-string-support
|
||||
var tokenProvider = new ManagedIdentityTokenProvider(new AzureServiceTokenProvider($"RunAs=App;AppId={managedIdentityId}"));
|
||||
_eventHubClient = EventHubClient.CreateWithTokenProvider(
|
||||
eventHubNamespaceUri,
|
||||
eventHubName,
|
||||
tokenProvider);
|
||||
_partitionSender = new EventHubProducerClient(eventHubNamespace, eventHubName, new DefaultAzureCredential());
|
||||
}
|
||||
else
|
||||
{
|
||||
var connectionStringBuilder =
|
||||
new EventHubsConnectionStringBuilder(_configuration.EventHubConnectionString)
|
||||
{
|
||||
EntityPath = _configuration.EventHubName,
|
||||
};
|
||||
|
||||
_eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
|
||||
_partitionSender = new EventHubProducerClient(_configuration.EventHubConnectionString, _configuration.EventHubName);
|
||||
}
|
||||
|
||||
_partitionSender = _eventHubClient.CreatePartitionSender(PartitionId);
|
||||
|
||||
return BoolResult.Success;
|
||||
}
|
||||
|
||||
|
@ -116,9 +102,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
await _partitionSender.CloseAsync();
|
||||
}
|
||||
|
||||
if (_eventHubClient != null)
|
||||
if (_partitionReceiver != null)
|
||||
{
|
||||
await _eventHubClient.CloseAsync();
|
||||
await _partitionReceiver.CloseAsync();
|
||||
}
|
||||
|
||||
return await base.ShutdownCoreAsync(context);
|
||||
|
@ -130,7 +116,13 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
context.Token.ThrowIfCancellationRequested();
|
||||
try
|
||||
{
|
||||
await _partitionSender.SendAsync(eventData);
|
||||
// Each element in a separate call because partial success is not possible
|
||||
var eventDataAsList = new List<EventData> {eventData};
|
||||
var sendEventOptions = new SendEventOptions
|
||||
{
|
||||
PartitionId = PartitionId
|
||||
};
|
||||
await _partitionSender.SendAsync(eventDataAsList, sendEventOptions);
|
||||
}
|
||||
catch (InvalidOperationException) when(context.Token.IsCancellationRequested || ShutdownStarted)
|
||||
{
|
||||
|
|
|
@ -18,7 +18,7 @@ using BuildXL.Utilities;
|
|||
using BuildXL.Utilities.Serialization;
|
||||
using BuildXL.Utilities.Core;
|
||||
using BuildXL.Utilities.Core.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Azure.Messaging.EventHubs;
|
||||
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
||||
|
@ -589,12 +589,10 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
{
|
||||
if (eventTimeUtc == null)
|
||||
{
|
||||
Contract.Assert(message.SystemProperties != null, "Either eventTimeUtc argument must be provided or message.SystemProperties must not be null. Did you forget to provide eventTimeUtc arguments in tests?");
|
||||
eventTimeUtc = message.SystemProperties.EnqueuedTimeUtc;
|
||||
eventTimeUtc = message.EnqueuedTime.UtcDateTime;
|
||||
}
|
||||
|
||||
var data = message.Body;
|
||||
return _reader.DeserializeSequence(data.AsMemory(), reader => ContentLocationEventData.Deserialize(reader, eventTimeUtc.Value));
|
||||
return _reader.DeserializeSequence(message.Body, reader => ContentLocationEventData.Deserialize(reader, eventTimeUtc.Value));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -607,13 +605,10 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
{
|
||||
if (eventTimeUtc == null)
|
||||
{
|
||||
Contract.Assert(
|
||||
message.SystemProperties != null,
|
||||
"Either eventTimeUtc argument must be provided or message.SystemProperties must not be null. Did you forget to provide eventTimeUtc arguments in tests?");
|
||||
eventTimeUtc = message.SystemProperties.EnqueuedTimeUtc;
|
||||
eventTimeUtc = message.EnqueuedTime.UtcDateTime;
|
||||
}
|
||||
|
||||
var dataReader = message.Body.AsSpan().AsReader();
|
||||
var dataReader = message.Body.Span.AsReader();
|
||||
var result = new List<ContentLocationEventData>();
|
||||
while (!dataReader.IsEnd)
|
||||
{
|
||||
|
@ -627,7 +622,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
{
|
||||
Tracer.Error(context, e, "Unexpected error deserializing events with span-based reader. Falling back to BxlReader.");
|
||||
// Printing at least some portion of the blob to figure out whats the issue is.
|
||||
Tracer.Debug(context, $"Non-deserializable blob. Size: {message.Body.Count}, Blob: {HexHelper.SpanToHex(message.Body.AsSpan(start: 0, length: 512))}");
|
||||
Tracer.Debug(context, $"Non-deserializable blob. Size: {message.Body.Length}, Blob: {message.Body.Span.SpanToHex(512)}");
|
||||
return DeserializeEventsLegacy(message, eventTimeUtc);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -619,7 +619,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
|
||||
/// <summary>
|
||||
/// Starts receiving events from the event store.
|
||||
/// NOTE: This may be called event if the event store is already processing events. It is the responsibility of the event store to handle this appropriately.
|
||||
/// NOTE: This may be called even if the event store is already processing events. It is the responsibility of the event store to handle this appropriately.
|
||||
/// </summary>
|
||||
public BoolResult StartProcessing(OperationContext context, EventSequencePoint sequencePoint)
|
||||
{
|
||||
|
|
|
@ -23,8 +23,10 @@ using BuildXL.Utilities.ParallelAlgorithms;
|
|||
using BuildXL.Utilities.Core.Tasks;
|
||||
using BuildXL.Utilities.Core.Tracing;
|
||||
using BuildXL.Utilities.Tracing;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Azure.Messaging.EventHubs;
|
||||
using static BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming.ContentLocationEventStoreCounters;
|
||||
using System.IO;
|
||||
using System.Net.Sockets;
|
||||
|
||||
#nullable enable
|
||||
|
||||
|
@ -188,8 +190,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
|
||||
Tracer.Info(
|
||||
context,
|
||||
$"{Tracer.Name}: Sending {eventNumber}/{events.Length} event. OpId={operationId}, Epoch='{_configuration.Epoch}', Size={eventData.Body.Count}.");
|
||||
counters[SentMessagesTotalSize].Add(eventData.Body.Count);
|
||||
$"{Tracer.Name}: Sending {eventNumber}/{events.Length} event. OpId={operationId}, Epoch='{_configuration.Epoch}', Size={eventData.Body.Length}.");
|
||||
counters[SentMessagesTotalSize].Add(eventData.Body.Length);
|
||||
eventData.Properties[OperationIdEventKey] = operationId.ToString();
|
||||
|
||||
// Even though event hub client has it's own built-in retry strategy, we have to wrap all the calls into a separate
|
||||
|
@ -200,7 +202,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
{
|
||||
await _eventHubClient.SendAsync(context, eventData);
|
||||
}
|
||||
catch (ServerBusyException exception)
|
||||
catch (EventHubsException exception) when (exception.Reason == EventHubsException.FailureReason.ServiceBusy)
|
||||
{
|
||||
// TODO: Verify that the HResult is 50002. Documentation shows that this should be the error code for throttling,
|
||||
// but documentation is done for Microsoft.ServiceBus.Messaging.ServerBusyException and not Microsoft.Azure.EventHubs.ServerBusyException
|
||||
|
@ -330,18 +332,18 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
|
||||
var sender = TryGetMessageSender(message) ?? "Unknown sender";
|
||||
|
||||
var eventTimeUtc = message.SystemProperties.EnqueuedTimeUtc;
|
||||
var eventTimeUtc = message.EnqueuedTime.UtcDateTime;
|
||||
var eventProcessingDelay = DateTime.UtcNow - eventTimeUtc;
|
||||
EventQueueDelays[input.EventQueueDelayIndex] = eventProcessingDelay; // Need to check if index is valid and has entry in list
|
||||
|
||||
// Creating nested context with operationId as a guid. This helps to correlate operations on a worker and a master machines.
|
||||
context = CreateNestedContext(context, operationId?.ToString());
|
||||
|
||||
Tracer.Info(context, $"{Tracer.Name}.ReceivedEvent: ProcessingDelay={eventProcessingDelay}, Sender={sender}, OpId={operationId}, SeqNo={message.SystemProperties.SequenceNumber}, EQT={eventTimeUtc}, Filter={eventFilter}, Size={message.Body.Count}.");
|
||||
Tracer.Info(context, $"{Tracer.Name}.ReceivedEvent: ProcessingDelay={eventProcessingDelay}, Sender={sender}, OpId={operationId}, SeqNo={message.SequenceNumber}, EQT={eventTimeUtc}, Filter={eventFilter}, Size={message.Body.Length}.");
|
||||
|
||||
Tracer.TrackMetric(context, EventProcessingDelayInSecondsMetricName, (long)eventProcessingDelay.TotalSeconds);
|
||||
|
||||
counters[ReceivedMessagesTotalSize].Add(message.Body.Count);
|
||||
counters[ReceivedMessagesTotalSize].Add(message.Body.Length);
|
||||
counters[ReceivedEventBatchCount].Increment();
|
||||
CacheActivityTracker.AddValue(CaSaaSActivityTrackingCounters.ProcessedEventHubMessages, value: 1);
|
||||
|
||||
|
@ -529,7 +531,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
{
|
||||
Context = context;
|
||||
Store = store;
|
||||
SequenceNumber = messages[messages.Count - 1].SystemProperties.SequenceNumber;
|
||||
SequenceNumber = messages[messages.Count - 1].SequenceNumber;
|
||||
_remainingMessageCount = messages.Count;
|
||||
store._pendingEventProcessingStates.Enqueue(this);
|
||||
}
|
||||
|
@ -596,27 +598,40 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
|
||||
private static class TransientEventHubErrorDetectionStrategy
|
||||
{
|
||||
public static bool IsRetryable(Exception exception)
|
||||
public static bool IsRetryable(Exception? exception)
|
||||
{
|
||||
if (exception is AggregateException ae)
|
||||
{
|
||||
return ae.InnerExceptions.All(e => IsRetryable(e));
|
||||
}
|
||||
|
||||
if (Microsoft.Azure.EventHubs.RetryPolicy.IsRetryableException(exception))
|
||||
if (exception is TimeoutException || (exception is EventHubsException eh && eh.Reason == EventHubsException.FailureReason.ServiceBusy))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// IsRetryableException covers TaskCanceledException, EventHubException, OperationCanceledException and SocketException
|
||||
|
||||
// Need to cover some additional cases here.
|
||||
if (exception is TimeoutException || exception is ServerBusyException)
|
||||
if (exception is OperationCanceledException)
|
||||
{
|
||||
return true;
|
||||
exception = exception.InnerException;
|
||||
}
|
||||
|
||||
return false;
|
||||
switch (exception)
|
||||
{
|
||||
case null:
|
||||
return false;
|
||||
|
||||
case EventHubsException ex:
|
||||
return ex.IsTransient;
|
||||
|
||||
case TimeoutException _:
|
||||
case SocketException _:
|
||||
case IOException _:
|
||||
case UnauthorizedAccessException _:
|
||||
return true;
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Text.Json.Serialization;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Azure.Messaging.EventHubs.Consumer;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
||||
{
|
||||
|
|
|
@ -7,7 +7,7 @@ using BuildXL.Cache.ContentStore.Interfaces.Stores;
|
|||
using BuildXL.Cache.ContentStore.Tracing;
|
||||
using BuildXL.Cache.ContentStore.Tracing.Internal;
|
||||
using BuildXL.Cache.ContentStore.Utils;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Azure.Messaging.EventHubs;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
||||
{
|
||||
|
@ -58,4 +58,25 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
|||
return BoolResult.Success;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A handler interface for the receive operation for Azure.Messaging.EventHubs
|
||||
/// <summary>
|
||||
public interface IPartitionReceiveHandler
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets or sets the maximum batch size.
|
||||
/// </summary>
|
||||
int MaxBatchSize { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Implement this method to specify the action to be performed on the received events.
|
||||
/// </summary>
|
||||
Task ProcessEventsAsync(System.Collections.Generic.IEnumerable<EventData> events);
|
||||
|
||||
/// <summary>
|
||||
/// Implement in order to handle exceptions that are thrown during receipt of events.
|
||||
/// </summary>
|
||||
Task ProcessErrorAsync(System.Exception error);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading;
|
||||
using Azure.Core;
|
||||
using Azure.Messaging.EventHubs.Consumer;
|
||||
using Azure.Messaging.EventHubs.Primitives;
|
||||
using System;
|
||||
using Azure.Messaging.EventHubs;
|
||||
using System.Collections.Generic;
|
||||
using BuildXL.Cache.ContentStore.Tracing;
|
||||
using BuildXL.Cache.ContentStore.Tracing.Internal;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
|
||||
{
|
||||
// Class based on https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpPartitionReceiver.cs (legacy Microsoft.Azure.EventHubs library)
|
||||
internal class PartitionReceiverWrapper : PartitionReceiver
|
||||
{
|
||||
private readonly object _receivePumpLock;
|
||||
private IPartitionReceiveHandler _receiveHandler;
|
||||
private Task _receivePumpTask;
|
||||
private CancellationTokenSource _receivePumpCancellationSource;
|
||||
private const int ReceiveHandlerDefaultBatchSize = 10;
|
||||
|
||||
protected Tracer Tracer { get; } = new Tracer(nameof(PartitionReceiverWrapper));
|
||||
|
||||
public PartitionReceiverWrapper(string consumerGroup,
|
||||
string partitionId,
|
||||
EventPosition eventPosition,
|
||||
string fullyQualifiedNamespace,
|
||||
string eventHubName,
|
||||
TokenCredential credential) : base(consumerGroup, partitionId, eventPosition, fullyQualifiedNamespace, eventHubName, credential)
|
||||
{
|
||||
_receivePumpLock = new object();
|
||||
}
|
||||
|
||||
public PartitionReceiverWrapper(string consumerGroup,
|
||||
string partitionId,
|
||||
EventPosition eventPosition,
|
||||
string connectionString,
|
||||
string eventHubName) : base(consumerGroup, partitionId, eventPosition, connectionString, eventHubName)
|
||||
{
|
||||
_receivePumpLock = new object();
|
||||
}
|
||||
|
||||
public void SetReceiveHandler(OperationContext context, IPartitionReceiveHandler newReceiveHandler)
|
||||
{
|
||||
lock (_receivePumpLock)
|
||||
{
|
||||
if (newReceiveHandler != null)
|
||||
{
|
||||
_receiveHandler = newReceiveHandler;
|
||||
|
||||
// We have a new receiveHandler, ensure pump is running.
|
||||
if (_receivePumpTask == null)
|
||||
{
|
||||
_receivePumpCancellationSource = new CancellationTokenSource();
|
||||
_receivePumpTask = ReceivePumpAsync(context, _receivePumpCancellationSource.Token);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task CloseAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
await ReceiveHandlerClose().ConfigureAwait(false);
|
||||
await base.CloseAsync();
|
||||
}
|
||||
|
||||
private Task ReceiveHandlerClose()
|
||||
{
|
||||
Task task = null;
|
||||
|
||||
lock (_receivePumpLock)
|
||||
{
|
||||
if (_receiveHandler != null)
|
||||
{
|
||||
if (_receivePumpTask != null)
|
||||
{
|
||||
task = _receivePumpTask;
|
||||
_receivePumpCancellationSource.Cancel();
|
||||
_receivePumpCancellationSource.Dispose();
|
||||
_receivePumpCancellationSource = null;
|
||||
_receivePumpTask = null;
|
||||
}
|
||||
|
||||
_receiveHandler = null;
|
||||
}
|
||||
}
|
||||
|
||||
return task ?? Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task ReceivePumpAsync(OperationContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Loop until pump is shutdown or an error is hit.
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
IEnumerable<EventData> receivedEvents;
|
||||
|
||||
try
|
||||
{
|
||||
int batchSize;
|
||||
|
||||
lock (_receivePumpLock)
|
||||
{
|
||||
if (_receiveHandler == null)
|
||||
{
|
||||
// Pump has been shutdown, nothing more to do.
|
||||
return;
|
||||
}
|
||||
|
||||
batchSize = _receiveHandler.MaxBatchSize > 0 ? _receiveHandler.MaxBatchSize : ReceiveHandlerDefaultBatchSize;
|
||||
}
|
||||
|
||||
receivedEvents = await ReceiveBatchAsync(batchSize, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (EventHubsException e) when (e.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
|
||||
{
|
||||
// ConsumerDisconnectedException is a special case where we know we cannot recover the pump.
|
||||
break;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Tracer.Error(context, e, "Error during ReceiveBatchAsync");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (receivedEvents != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ReceiveHandlerProcessEventsAsync(receivedEvents).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception userCodeError)
|
||||
{
|
||||
await ReceiveHandlerProcessErrorAsync(userCodeError).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// This should never throw
|
||||
Tracer.Error(context, ex, "EventHub ReceivePumpAsync failed");
|
||||
}
|
||||
}
|
||||
|
||||
// Encapsulates taking the receivePumpLock, checking this.receiveHandler for null,
|
||||
// calls this.receiveHandler.ProcessEventsAsync (starting this operation inside the ReceivePumpAsync).
|
||||
private Task ReceiveHandlerProcessEventsAsync(IEnumerable<EventData> eventDatas)
|
||||
{
|
||||
Task processEventsTask = null;
|
||||
|
||||
lock (_receivePumpLock)
|
||||
{
|
||||
if (_receiveHandler != null)
|
||||
{
|
||||
processEventsTask = _receiveHandler.ProcessEventsAsync(eventDatas);
|
||||
}
|
||||
}
|
||||
|
||||
return processEventsTask ?? Task.FromResult(0);
|
||||
}
|
||||
|
||||
// Encapsulates taking the receivePumpLock, checking this.receiveHandler for null,
|
||||
// calls this.receiveHandler.ProcessErrorAsync (starting this operation inside the ReceivePumpAsync).
|
||||
private Task ReceiveHandlerProcessErrorAsync(Exception error)
|
||||
{
|
||||
Task processErrorTask = null;
|
||||
lock (_receivePumpLock)
|
||||
{
|
||||
if (_receiveHandler != null)
|
||||
{
|
||||
processErrorTask = _receiveHandler.ProcessErrorAsync(error);
|
||||
}
|
||||
}
|
||||
|
||||
return processErrorTask ?? Task.FromResult(0);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using System;
|
||||
using Azure.Messaging.EventHubs;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.InMemory
|
||||
{
|
||||
/// <summary>
|
||||
/// This class allows us to create instances of Azure.Messaging.EventHubs.EventData for test purposes
|
||||
/// <summary>
|
||||
public class EventDataWrapper : EventData
|
||||
{
|
||||
public EventDataWrapper(
|
||||
BinaryData eventBody,
|
||||
IDictionary<string, object> properties,
|
||||
IReadOnlyDictionary<string, object> systemProperties,
|
||||
long sequenceNumber,
|
||||
long offset,
|
||||
DateTimeOffset enqueuedTime,
|
||||
string partitionKey) : base(eventBody: eventBody, properties: properties, systemProperties: systemProperties, sequenceNumber: sequenceNumber, offset: offset, enqueuedTime: enqueuedTime, partitionKey: partitionKey)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
|
@ -12,7 +12,8 @@ using BuildXL.Cache.ContentStore.Tracing;
|
|||
using BuildXL.Cache.ContentStore.Tracing.Internal;
|
||||
using BuildXL.Cache.ContentStore.Utils;
|
||||
using BuildXL.Utilities.Threading;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Azure.Messaging.EventHubs;
|
||||
using BuildXL.Cache.ContentStore.Distributed.NuCache.InMemory;
|
||||
|
||||
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
||||
{
|
||||
|
@ -98,11 +99,6 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
/// </summary>
|
||||
public sealed class EventHub
|
||||
{
|
||||
// EventData system property names (copied from event hub codebase)
|
||||
private const string EnqueuedTimeUtcName = "x-opt-enqueued-time";
|
||||
private const string SequenceNumberName = "x-opt-sequence-number";
|
||||
|
||||
private readonly PropertyInfo _systemPropertiesPropertyInfo = typeof(EventData).GetProperty(nameof(EventData.SystemProperties));
|
||||
private readonly List<EventData> _eventStream = new List<EventData>();
|
||||
|
||||
internal IReadOnlyList<EventData> EventStream => _eventStream;
|
||||
|
@ -120,13 +116,18 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
{
|
||||
handler = OnEvent;
|
||||
|
||||
_eventStream.Add(eventData);
|
||||
// We need to modify some readonly values so we create a new instance
|
||||
var eventDataWrapper = new EventDataWrapper(
|
||||
eventBody: eventData.EventBody,
|
||||
properties: eventData.Properties,
|
||||
systemProperties: eventData.SystemProperties,
|
||||
sequenceNumber: (long)_eventStream.Count,
|
||||
offset: eventData.Offset,
|
||||
enqueuedTime: DateTimeOffset.Now,
|
||||
partitionKey: eventData.PartitionKey
|
||||
);
|
||||
|
||||
// HACK: Use reflect to set system properties property since its internal
|
||||
_systemPropertiesPropertyInfo.SetValue(eventData, Activator.CreateInstance(typeof(EventData.SystemPropertiesCollection), nonPublic: true));
|
||||
|
||||
eventData.SystemProperties[SequenceNumberName] = (long)_eventStream.Count;
|
||||
eventData.SystemProperties[EnqueuedTimeUtcName] = DateTime.UtcNow;
|
||||
_eventStream.Add(eventDataWrapper);
|
||||
}
|
||||
|
||||
handler?.Invoke(eventData);
|
||||
|
@ -153,11 +154,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
|
|||
{
|
||||
if (sequencePoint.SequenceNumber != null)
|
||||
{
|
||||
return eventData.SystemProperties.SequenceNumber < sequencePoint.SequenceNumber.Value;
|
||||
return eventData.SequenceNumber < sequencePoint.SequenceNumber.Value;
|
||||
}
|
||||
else
|
||||
{
|
||||
return eventData.SystemProperties.EnqueuedTimeUtc < sequencePoint.EventStartCursorTimeUtc.Value;
|
||||
return eventData.EnqueuedTime.UtcDateTime < sequencePoint.EventStartCursorTimeUtc.Value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ namespace DistributedTest {
|
|||
NetFx.System.Xml.Linq.dll
|
||||
),
|
||||
...addIf(BuildXLSdk.isFullFramework,
|
||||
importFrom("Azure.Core.Amqp").pkg,
|
||||
importFrom("Microsoft.Azure.Amqp").pkg,
|
||||
importFrom("Microsoft.Azure.Services.AppAuthentication").pkg,
|
||||
importFrom("Microsoft.IdentityModel.Clients.ActiveDirectory").pkg,
|
||||
|
|
|
@ -28,7 +28,7 @@ using System.Threading;
|
|||
using BuildXL.Cache.ContentStore.InterfacesTest.Results;
|
||||
using BuildXL.Cache.ContentStore.Utils;
|
||||
using BuildXL.Utilities.Serialization;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Azure.Messaging.EventHubs;
|
||||
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
|
||||
|
||||
namespace ContentStoreTest.Distributed.ContentLocation.NuCache
|
||||
|
@ -177,7 +177,7 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
|
|||
configuration.Hub.EventStream.Count.Should().BeGreaterThan(0);
|
||||
foreach (var rawEvent in configuration.Hub.EventStream)
|
||||
{
|
||||
rawEvent.Body.Count.Should().BeLessOrEqualTo(ContentLocationEventDataSerializer.MaxEventDataPayloadSize);
|
||||
rawEvent.Body.Length.Should().BeLessOrEqualTo(ContentLocationEventDataSerializer.MaxEventDataPayloadSize);
|
||||
}
|
||||
|
||||
if (canSplit)
|
||||
|
|
|
@ -31,7 +31,6 @@ using BuildXL.Utilities.Core;
|
|||
using BuildXL.Utilities.Collections;
|
||||
using ContentStoreTest.Test;
|
||||
using FluentAssertions;
|
||||
using Microsoft.Azure.Amqp.Framing;
|
||||
using Xunit;
|
||||
using Xunit.Abstractions;
|
||||
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
|
||||
|
@ -416,4 +415,4 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test
|
|||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -37,7 +37,6 @@ using BuildXL.Utilities.Collections;
|
|||
using ContentStoreTest.Distributed.Redis;
|
||||
using ContentStoreTest.Test;
|
||||
using FluentAssertions;
|
||||
using Microsoft.Azure.Amqp.Framing;
|
||||
using Xunit;
|
||||
using Xunit.Abstractions;
|
||||
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
|
||||
|
@ -387,4 +386,4 @@ namespace BuildXL.Cache.ContentStore.Distributed.Test
|
|||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -26,14 +26,14 @@ namespace BuildXL.Cache.Host.Configuration
|
|||
return eventHubNamespaceUri.AbsoluteUri + '?' + CreateQueryVariable(EventHubNameOption, eventHubName) + '&' + CreateQueryVariable(ManagedIdentityIdOption, managedIdentityId);
|
||||
}
|
||||
|
||||
public static bool TryParseForManagedIdentity(string uriString, [NotNullWhen(true)] out Uri? eventHubNamespaceUri, [NotNullWhen(true)] out string? eventHubName, [NotNullWhen(true)] out string? managedIdentityId)
|
||||
public static bool TryParseForManagedIdentity(string uriString, [NotNullWhen(true)] out string? eventHubNamespace, [NotNullWhen(true)] out string? eventHubName, [NotNullWhen(true)] out string? managedIdentityId)
|
||||
{
|
||||
if (Uri.TryCreate(uriString, UriKind.Absolute, out Uri? uri))
|
||||
{
|
||||
NameValueCollection? queryVariables = HttpUtility.ParseQueryString(uri.Query);
|
||||
if (queryVariables.Count > 0)
|
||||
{
|
||||
eventHubNamespaceUri = new Uri(uri.Scheme + Uri.SchemeDelimiter + uri.Host);
|
||||
eventHubNamespace = uri.Scheme + Uri.SchemeDelimiter + uri.Host;
|
||||
eventHubName = queryVariables[EventHubNameOption];
|
||||
managedIdentityId = queryVariables[ManagedIdentityIdOption];
|
||||
|
||||
|
@ -44,7 +44,7 @@ namespace BuildXL.Cache.Host.Configuration
|
|||
}
|
||||
}
|
||||
|
||||
eventHubNamespaceUri = null;
|
||||
eventHubNamespace = null;
|
||||
eventHubName = null;
|
||||
managedIdentityId = null;
|
||||
return false;
|
||||
|
|
|
@ -15,12 +15,12 @@ namespace BuildXL.Cache.Host.Test
|
|||
[Fact]
|
||||
public void RoundTrip()
|
||||
{
|
||||
Uri uri = new Uri("sb://www.bing.com");
|
||||
string uri = "sb://www.bing.com";
|
||||
string ehName = "eventHub";
|
||||
string identity = Guid.NewGuid().ToString();
|
||||
var newUriString = ManagedIdentityUriHelper.BuildString(uri, ehName, identity);
|
||||
var newUriString = ManagedIdentityUriHelper.BuildString(new Uri(uri), ehName, identity);
|
||||
|
||||
ManagedIdentityUriHelper.TryParseForManagedIdentity(newUriString, out Uri? eventHubNamespaceUri, out string? foundEventHubName, out string? foundManagedIdentityId)
|
||||
ManagedIdentityUriHelper.TryParseForManagedIdentity(newUriString, out string? eventHubNamespaceUri, out string? foundEventHubName, out string? foundManagedIdentityId)
|
||||
.Should().BeTrue();
|
||||
|
||||
eventHubNamespaceUri.Should().Be(uri);
|
||||
|
@ -37,12 +37,12 @@ namespace BuildXL.Cache.Host.Test
|
|||
|
||||
ManagedIdentityUriHelper.TryParseForManagedIdentity(
|
||||
$"{uri}/?name={eventHubName}&identity={identity}",
|
||||
out Uri? foundEventHubNamespaceUri,
|
||||
out string? foundEventHubNamespaceUri,
|
||||
out string? foundEventHubName,
|
||||
out string? foundManagedIdentityId)
|
||||
.Should().BeTrue();
|
||||
|
||||
foundEventHubNamespaceUri.Should().Be(uri);
|
||||
foundEventHubNamespaceUri.Should().Be(uri.ToLower());
|
||||
foundEventHubName.Should().Be(eventHubName);
|
||||
foundManagedIdentityId.Should().Be(identity);
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace Library {
|
|||
importFrom("Microsoft.Bcl.AsyncInterfaces").pkg,
|
||||
...importFrom("BuildXL.Cache.ContentStore").getAzureBlobStorageSdkPackages(true),
|
||||
importFrom("Microsoft.IdentityModel.Abstractions").pkg,
|
||||
importFrom("System.Memory.Data").pkg,
|
||||
importFrom("System.Memory.Data").withQualifier({targetFramework: "netstandard2.0"}).pkg,
|
||||
importFrom("NLog").pkg,
|
||||
importFrom("BuildXL.Cache.ContentStore").Hashing.dll,
|
||||
importFrom("BuildXL.Cache.ContentStore").Interfaces.dll,
|
||||
|
|
|
@ -364,7 +364,8 @@ namespace NugetPackages {
|
|||
buildXLUtilitiesCoreIdentity,
|
||||
buildXLNativeIdentity,
|
||||
|
||||
importFrom("Microsoft.Azure.EventHubs").withQualifier(net472packageQualifier).pkg,
|
||||
importFrom("Azure.Messaging.EventHubs").withQualifier(net472packageQualifier).pkg,
|
||||
importFrom("Azure.Core.Amqp").withQualifier(net472packageQualifier).pkg,
|
||||
importFrom("Microsoft.Azure.Amqp").withQualifier(net472packageQualifier).pkg,
|
||||
importFrom("System.Threading.Tasks.Dataflow").withQualifier(net472packageQualifier).pkg,
|
||||
...BuildXLSdk.withQualifier(net472packageQualifier).bclAsyncPackages,
|
||||
|
|
|
@ -64,6 +64,15 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"Component": {
|
||||
"Type": "NuGet",
|
||||
"NuGet": {
|
||||
"Name": "Azure.Core.Amqp",
|
||||
"Version": "1.3.0"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"Component": {
|
||||
"Type": "NuGet",
|
||||
|
@ -82,6 +91,15 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"Component": {
|
||||
"Type": "NuGet",
|
||||
"NuGet": {
|
||||
"Name": "Azure.Messaging.EventHubs",
|
||||
"Version": "5.9.0"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"Component": {
|
||||
"Type": "NuGet",
|
||||
|
@ -708,16 +726,7 @@
|
|||
"Type": "NuGet",
|
||||
"NuGet": {
|
||||
"Name": "Microsoft.Azure.Amqp",
|
||||
"Version": "2.5.10"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"Component": {
|
||||
"Type": "NuGet",
|
||||
"NuGet": {
|
||||
"Name": "Microsoft.Azure.EventHubs",
|
||||
"Version": "4.3.2"
|
||||
"Version": "2.6.1"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -2976,7 +2985,7 @@
|
|||
"Type": "NuGet",
|
||||
"NuGet": {
|
||||
"Name": "System.Diagnostics.DiagnosticSource",
|
||||
"Version": "4.5.1"
|
||||
"Version": "4.6.0"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -3318,7 +3327,7 @@
|
|||
"Type": "NuGet",
|
||||
"NuGet": {
|
||||
"Name": "System.Memory.Data",
|
||||
"Version": "6.0.0"
|
||||
"Version": "1.0.2"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
10
config.dsc
10
config.dsc
|
@ -83,7 +83,8 @@ config({
|
|||
|
||||
{ id: "Microsoft.NETFramework.ReferenceAssemblies.net472", version: "1.0.0" },
|
||||
|
||||
{ id: "System.Diagnostics.DiagnosticSource", version: "4.5.1" },
|
||||
{ id: "System.Diagnostics.DiagnosticSource", version: "4.6.0",
|
||||
dependentPackageIdsToSkip: ["System.Memory"] },
|
||||
{ id: "System.Diagnostics.DiagnosticSource", version: "4.0.0-beta-23516", alias: "System.Diagnostics.DiagnosticsSource.ForEventHub"},
|
||||
|
||||
// Roslyn
|
||||
|
@ -187,8 +188,9 @@ config({
|
|||
{ id: "JsonDiffPatch.Net", version: "2.1.0" },
|
||||
|
||||
// Event hubs
|
||||
{ id: "Microsoft.Azure.Amqp", version: "2.5.10" },
|
||||
{ id: "Microsoft.Azure.EventHubs", version: "4.3.2",
|
||||
{ id: "Microsoft.Azure.Amqp", version: "2.6.1" },
|
||||
{ id: "Azure.Core.Amqp", version: "1.3.0"},
|
||||
{ id: "Azure.Messaging.EventHubs", version: "5.9.0",
|
||||
dependentPackageIdsToSkip: ["System.Net.Http", "System.Reflection.TypeExtensions", "System.Runtime.Serialization.Primitives", "Newtonsoft.Json", "System.Diagnostics.DiagnosticSource"],
|
||||
},
|
||||
{ id: "Microsoft.Azure.KeyVault.Core", version: "1.0.0" },
|
||||
|
@ -209,7 +211,7 @@ config({
|
|||
{ id: "Microsoft.Identity.Client.Extensions.Msal", version: "2.18.4" },
|
||||
{ id: "Azure.Core", version: "1.31.0",
|
||||
dependentPackageIdsToSkip: ["System.Buffers", "System.Text.Encodings.Web", "System.Text.Json", "System.Memory", "System.Memory.Data", "System.Numerics.Vectors", "Microsoft.Bcl.AsyncInterfaces" ] },
|
||||
{ id: "System.Memory.Data", version: "6.0.0",
|
||||
{ id: "System.Memory.Data", version: "1.0.2",
|
||||
dependentPackageIdsToSkip: [ "System.Memory", "System.Text.Json" ] },
|
||||
|
||||
// Package sets
|
||||
|
|
Загрузка…
Ссылка в новой задаче