Removing Event Hubs extension
This commit is contained in:
Родитель
f4de5841de
Коммит
d4b3cc247c
|
@ -20,7 +20,6 @@ $projects =
|
|||
"src\Microsoft.Azure.WebJobs.Host\WebJobs.Host.Sources.csproj",
|
||||
"src\Microsoft.Azure.WebJobs.Logging\WebJobs.Logging.csproj",
|
||||
"src\Microsoft.Azure.WebJobs.Logging.ApplicationInsights\WebJobs.Logging.ApplicationInsights.csproj",
|
||||
"src\Microsoft.Azure.WebJobs.Extensions.EventHubs\WebJobs.Extensions.EventHubs.csproj",
|
||||
"src\Microsoft.Azure.WebJobs.Extensions.Storage\WebJobs.Extensions.Storage.csproj",
|
||||
"src\Microsoft.Azure.WebJobs.Host.Storage\WebJobs.Host.Storage.csproj",
|
||||
"test\Microsoft.Azure.WebJobs.Host.TestCommon\WebJobs.Host.TestCommon.csproj"
|
||||
|
|
13
WebJobs.sln
13
WebJobs.sln
|
@ -33,8 +33,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{3B089351
|
|||
build\PublicKey.snk = build\PublicKey.snk
|
||||
EndProjectSection
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extensions.EventHubs.UnitTests", "test\Microsoft.Azure.WebJobs.Extensions.EventHubs.UnitTests\WebJobs.Extensions.EventHubs.UnitTests.csproj", "{27F0E6AC-505E-4BEC-81CA-8DF777DEA9C7}"
|
||||
EndProject
|
||||
Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "WebJobs.Shared", "src\Microsoft.Azure.WebJobs.Shared\WebJobs.Shared.shproj", "{ADD036F5-2170-4B05-9E0A-C2ED0A08A929}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extensions.Storage", "src\Microsoft.Azure.WebJobs.Extensions.Storage\WebJobs.Extensions.Storage.csproj", "{A9733406-267C-4A53-AB07-D3A834E22153}"
|
||||
|
@ -43,8 +41,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Host.Storage", "src
|
|||
EndProject
|
||||
Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "WebJobs.Shared.Storage", "src\Microsoft.Azure.WebJobs.Shared.Storage\WebJobs.Shared.Storage.shproj", "{6BED7F8A-A199-4D9D-85D1-6856EE3292C6}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs\WebJobs.Extensions.EventHubs.csproj", "{8498FA6B-3843-44A4-A351-E35711B7FFDF}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extensions.Storage.UnitTests", "test\Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests\WebJobs.Extensions.Storage.UnitTests.csproj", "{0CC5741F-ACDA-4DB8-9C17-074E8896F244}"
|
||||
EndProject
|
||||
Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "WebJobs.Protocols", "src\Microsoft.Azure.WebJobs.Protocols\WebJobs.Protocols.shproj", "{6FCD0852-6019-4CD5-9B7E-0DE021A72BD7}"
|
||||
|
@ -106,10 +102,6 @@ Global
|
|||
{340AB554-5482-4B3D-B65F-46DFF5AF1684}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{340AB554-5482-4B3D-B65F-46DFF5AF1684}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{340AB554-5482-4B3D-B65F-46DFF5AF1684}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{27F0E6AC-505E-4BEC-81CA-8DF777DEA9C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{27F0E6AC-505E-4BEC-81CA-8DF777DEA9C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{27F0E6AC-505E-4BEC-81CA-8DF777DEA9C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{27F0E6AC-505E-4BEC-81CA-8DF777DEA9C7}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{A9733406-267C-4A53-AB07-D3A834E22153}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{A9733406-267C-4A53-AB07-D3A834E22153}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{A9733406-267C-4A53-AB07-D3A834E22153}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
|
@ -118,10 +110,6 @@ Global
|
|||
{DED33098-FE99-436C-96CC-B59A30BEF027}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{DED33098-FE99-436C-96CC-B59A30BEF027}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{DED33098-FE99-436C-96CC-B59A30BEF027}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{0CC5741F-ACDA-4DB8-9C17-074E8896F244}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{0CC5741F-ACDA-4DB8-9C17-074E8896F244}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{0CC5741F-ACDA-4DB8-9C17-074E8896F244}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
|
@ -145,7 +133,6 @@ Global
|
|||
{C6B834AB-7B6A-47AE-A7C3-C102B0C861FF} = {639967B0-0544-4C52-94AC-9A3D25E33256}
|
||||
{C8EAAE01-E8CF-4131-9D4B-F0FDF00DA4BE} = {639967B0-0544-4C52-94AC-9A3D25E33256}
|
||||
{340AB554-5482-4B3D-B65F-46DFF5AF1684} = {639967B0-0544-4C52-94AC-9A3D25E33256}
|
||||
{27F0E6AC-505E-4BEC-81CA-8DF777DEA9C7} = {639967B0-0544-4C52-94AC-9A3D25E33256}
|
||||
{0CC5741F-ACDA-4DB8-9C17-074E8896F244} = {639967B0-0544-4C52-94AC-9A3D25E33256}
|
||||
{337B79EB-A3CB-4CE0-A7F2-DD5E638AC882} = {639967B0-0544-4C52-94AC-9A3D25E33256}
|
||||
{C5E1A8E8-711F-4377-A8BD-7DB58E6C580D} = {639967B0-0544-4C52-94AC-9A3D25E33256}
|
||||
|
|
|
@ -50,10 +50,6 @@ test_script:
|
|||
|
||||
$success = $success -and $?
|
||||
|
||||
dotnet test .\test\Microsoft.Azure.WebJobs.Extensions.EventHubs.UnitTests\ -v q --no-build
|
||||
|
||||
$success = $success -and $?
|
||||
|
||||
dotnet test .\test\Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests\ -v q --no-build
|
||||
|
||||
$success = $success -and $?
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="3.0.5" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="3.0.6" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="2.1.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" />
|
||||
|
@ -24,7 +25,6 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Extensions.EventHubs\WebJobs.Extensions.EventHubs.csproj" />
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Extensions.Storage\WebJobs.Extensions.Storage.csproj" />
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Host.Storage\WebJobs.Host.Storage.csproj" />
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Host\WebJobs.Host.csproj" />
|
||||
|
|
|
@ -1,152 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs.Description;
|
||||
using Microsoft.Azure.WebJobs.Host.Bindings;
|
||||
using Microsoft.Azure.WebJobs.Host.Config;
|
||||
using Microsoft.Azure.WebJobs.Host.Configuration;
|
||||
using Microsoft.Azure.WebJobs.Logging;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
[Extension("EventHubs", configurationSection: "EventHubs")]
|
||||
internal class EventHubExtensionConfigProvider : IExtensionConfigProvider
|
||||
{
|
||||
public IConfiguration _config;
|
||||
private readonly IOptions<EventHubOptions> _options;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly IConverterManager _converterManager;
|
||||
private readonly INameResolver _nameResolver;
|
||||
private readonly IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> _configuration;
|
||||
|
||||
public EventHubExtensionConfigProvider(IConfiguration config, IOptions<EventHubOptions> options, ILoggerFactory loggerFactory,
|
||||
IConverterManager converterManager, INameResolver nameResolver, IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> configuration)
|
||||
{
|
||||
_config = config;
|
||||
_options = options;
|
||||
_loggerFactory = loggerFactory;
|
||||
_converterManager = converterManager;
|
||||
_nameResolver = nameResolver;
|
||||
_configuration = configuration;
|
||||
}
|
||||
|
||||
internal Action<ExceptionReceivedEventArgs> ExceptionHandler { get; set; }
|
||||
|
||||
private void ExceptionReceivedHandler(ExceptionReceivedEventArgs args)
|
||||
{
|
||||
ExceptionHandler?.Invoke(args);
|
||||
}
|
||||
|
||||
public void Initialize(ExtensionConfigContext context)
|
||||
{
|
||||
if (context == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(context));
|
||||
}
|
||||
|
||||
_options.Value.EventProcessorOptions.SetExceptionHandler(ExceptionReceivedHandler);
|
||||
_configuration.ConfigurationSection.Bind(_options);
|
||||
|
||||
context
|
||||
.AddConverter<string, EventData>(ConvertString2EventData)
|
||||
.AddConverter<EventData, string>(ConvertEventData2String)
|
||||
.AddConverter<byte[], EventData>(ConvertBytes2EventData)
|
||||
.AddConverter<EventData, byte[]>(ConvertEventData2Bytes)
|
||||
.AddOpenConverter<OpenType.Poco, EventData>(ConvertPocoToEventData);
|
||||
|
||||
// register our trigger binding provider
|
||||
var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_config, _nameResolver, _converterManager, _options, _loggerFactory);
|
||||
context.AddBindingRule<EventHubTriggerAttribute>()
|
||||
.BindToTrigger(triggerBindingProvider);
|
||||
|
||||
// register our binding provider
|
||||
context.AddBindingRule<EventHubAttribute>()
|
||||
.BindToCollector(BuildFromAttribute);
|
||||
|
||||
context.AddBindingRule<EventHubAttribute>()
|
||||
.BindToInput(attribute =>
|
||||
{
|
||||
return _options.Value.GetEventHubClient(attribute.EventHubName, attribute.Connection);
|
||||
});
|
||||
|
||||
ExceptionHandler = (e =>
|
||||
{
|
||||
LogExceptionReceivedEvent(e, _loggerFactory);
|
||||
});
|
||||
}
|
||||
|
||||
internal static void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e, ILoggerFactory loggerFactory)
|
||||
{
|
||||
try
|
||||
{
|
||||
var logger = loggerFactory?.CreateLogger(LogCategories.Executor);
|
||||
string message = $"EventProcessorHost error (Action={e.Action}, HostName={e.Hostname}, PartitionId={e.PartitionId})";
|
||||
|
||||
var logLevel = GetLogLevel(e.Exception);
|
||||
logger?.Log(logLevel, 0, message, e.Exception, (s, ex) => message);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// best effort logging
|
||||
}
|
||||
}
|
||||
|
||||
private static LogLevel GetLogLevel(Exception ex)
|
||||
{
|
||||
if (ex is ReceiverDisconnectedException ||
|
||||
ex is LeaseLostException)
|
||||
{
|
||||
// For EventProcessorHost these exceptions can happen as part
|
||||
// of normal partition balancing across instances, so we want to
|
||||
// trace them, but not treat them as errors.
|
||||
return LogLevel.Information;
|
||||
}
|
||||
|
||||
var ehex = ex as EventHubsException;
|
||||
if (!(ex is OperationCanceledException) && (ehex == null || !ehex.IsTransient))
|
||||
{
|
||||
// any non-transient exceptions or unknown exception types
|
||||
// we want to log as errors
|
||||
return LogLevel.Error;
|
||||
}
|
||||
else
|
||||
{
|
||||
// transient messaging errors we log as info so we have a record
|
||||
// of them, but we don't treat them as actual errors
|
||||
return LogLevel.Information;
|
||||
}
|
||||
}
|
||||
|
||||
private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
|
||||
{
|
||||
EventHubClient client = _options.Value.GetEventHubClient(attribute.EventHubName, attribute.Connection);
|
||||
return new EventHubAsyncCollector(client);
|
||||
}
|
||||
|
||||
private static string ConvertEventData2String(EventData x)
|
||||
=> Encoding.UTF8.GetString(ConvertEventData2Bytes(x));
|
||||
|
||||
private static EventData ConvertBytes2EventData(byte[] input)
|
||||
=> new EventData(input);
|
||||
|
||||
private static byte[] ConvertEventData2Bytes(EventData input)
|
||||
=> input.Body.Array;
|
||||
|
||||
private static EventData ConvertString2EventData(string input)
|
||||
=> ConvertBytes2EventData(Encoding.UTF8.GetBytes(input));
|
||||
|
||||
private static Task<object> ConvertPocoToEventData(object arg, Attribute attrResolved, ValueBindingContext context)
|
||||
{
|
||||
return Task.FromResult<object>(ConvertString2EventData(JsonConvert.SerializeObject(arg)));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,407 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.Text;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs.Hosting;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
public class EventHubOptions : IOptionsFormatter
|
||||
{
|
||||
// Event Hub Names are case-insensitive.
|
||||
// The same path can have multiple connection strings with different permissions (sending and receiving),
|
||||
// so we track senders and receivers separately and infer which one to use based on the EventHub (sender) vs. EventHubTrigger (receiver) attribute.
|
||||
// Connection strings may also encapsulate different endpoints.
|
||||
|
||||
// The client cache must be thread safe because clients are accessed/added on the function
|
||||
private readonly ConcurrentDictionary<string, EventHubClient> _clients = new ConcurrentDictionary<string, EventHubClient>(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly Dictionary<string, ReceiverCreds> _receiverCreds = new Dictionary<string, ReceiverCreds>(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly Dictionary<string, EventProcessorHost> _explicitlyProvidedHosts = new Dictionary<string, EventProcessorHost>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
/// <summary>
|
||||
/// Name of the blob container that the EventHostProcessor instances uses to coordinate load balancing listening on an event hub.
|
||||
/// Each event hub gets its own blob prefix within the container.
|
||||
/// </summary>
|
||||
public const string LeaseContainerName = "azure-webjobs-eventhub";
|
||||
private int _batchCheckpointFrequency = 1;
|
||||
|
||||
public EventHubOptions()
|
||||
{
|
||||
EventProcessorOptions = EventProcessorOptions.DefaultOptions;
|
||||
PartitionManagerOptions = new PartitionManagerOptions();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the number of batches to process before creating an EventHub cursor checkpoint. Default 1.
|
||||
/// </summary>
|
||||
public int BatchCheckpointFrequency
|
||||
{
|
||||
get
|
||||
{
|
||||
return _batchCheckpointFrequency;
|
||||
}
|
||||
|
||||
set
|
||||
{
|
||||
if (value <= 0)
|
||||
{
|
||||
throw new InvalidOperationException("Batch checkpoint frequency must be larger than 0.");
|
||||
}
|
||||
_batchCheckpointFrequency = value;
|
||||
}
|
||||
}
|
||||
|
||||
public EventProcessorOptions EventProcessorOptions { get; }
|
||||
|
||||
public PartitionManagerOptions PartitionManagerOptions { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Add an existing client for sending messages to an event hub. Infer the eventHub name from client.path
|
||||
/// </summary>
|
||||
/// <param name="client"></param>
|
||||
public void AddEventHubClient(EventHubClient client)
|
||||
{
|
||||
if (client == null)
|
||||
{
|
||||
throw new ArgumentNullException("client");
|
||||
}
|
||||
string eventHubName = client.EventHubName;
|
||||
AddEventHubClient(eventHubName, client);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add an existing client for sending messages to an event hub. Infer the eventHub name from client.path
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">name of the event hub</param>
|
||||
/// <param name="client"></param>
|
||||
public void AddEventHubClient(string eventHubName, EventHubClient client)
|
||||
{
|
||||
if (eventHubName == null)
|
||||
{
|
||||
throw new ArgumentNullException("eventHubName");
|
||||
}
|
||||
if (client == null)
|
||||
{
|
||||
throw new ArgumentNullException("client");
|
||||
}
|
||||
|
||||
_clients[eventHubName] = client;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a connection for sending messages to an event hub. Connect via the connection string.
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">name of the event hub. </param>
|
||||
/// <param name="sendConnectionString">connection string for sending messages. If this includes an EntityPath, it takes precedence over the eventHubName parameter.</param>
|
||||
public void AddSender(string eventHubName, string sendConnectionString)
|
||||
{
|
||||
if (eventHubName == null)
|
||||
{
|
||||
throw new ArgumentNullException("eventHubName");
|
||||
}
|
||||
if (sendConnectionString == null)
|
||||
{
|
||||
throw new ArgumentNullException("sendConnectionString");
|
||||
}
|
||||
|
||||
EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(sendConnectionString);
|
||||
if (string.IsNullOrWhiteSpace(sb.EntityPath))
|
||||
{
|
||||
sb.EntityPath = eventHubName;
|
||||
}
|
||||
|
||||
var client = EventHubClient.CreateFromConnectionString(sb.ToString());
|
||||
AddEventHubClient(eventHubName, client);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a connection for listening on events from an event hub.
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">Name of the event hub</param>
|
||||
/// <param name="listener">initialized listener object</param>
|
||||
/// <remarks>The EventProcessorHost type is from the ServiceBus SDK.
|
||||
/// Allow callers to bind to EventHubConfiguration without needing to have a direct assembly reference to the ServiceBus SDK.
|
||||
/// The compiler needs to resolve all types in all overloads, so give methods that use the ServiceBus SDK types unique non-overloaded names
|
||||
/// to avoid eager compiler resolution.
|
||||
/// </remarks>
|
||||
public void AddEventProcessorHost(string eventHubName, EventProcessorHost listener)
|
||||
{
|
||||
if (eventHubName == null)
|
||||
{
|
||||
throw new ArgumentNullException("eventHubName");
|
||||
}
|
||||
if (listener == null)
|
||||
{
|
||||
throw new ArgumentNullException("listener");
|
||||
}
|
||||
|
||||
_explicitlyProvidedHosts[eventHubName] = listener;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a connection for listening on events from an event hub. Connect via the connection string and use the SDK's built-in storage account.
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">name of the event hub</param>
|
||||
/// <param name="receiverConnectionString">connection string for receiving messages. This can encapsulate other service bus properties like the namespace and endpoints.</param>
|
||||
public void AddReceiver(string eventHubName, string receiverConnectionString)
|
||||
{
|
||||
if (eventHubName == null)
|
||||
{
|
||||
throw new ArgumentNullException("eventHubName");
|
||||
}
|
||||
if (receiverConnectionString == null)
|
||||
{
|
||||
throw new ArgumentNullException("receiverConnectionString");
|
||||
}
|
||||
|
||||
this._receiverCreds[eventHubName] = new ReceiverCreds
|
||||
{
|
||||
EventHubConnectionString = receiverConnectionString
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a connection for listening on events from an event hub. Connect via the connection string and use the supplied storage account
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">name of the event hub</param>
|
||||
/// <param name="receiverConnectionString">connection string for receiving messages</param>
|
||||
/// <param name="storageConnectionString">storage connection string that the EventProcessorHost client will use to coordinate multiple listener instances. </param>
|
||||
public void AddReceiver(string eventHubName, string receiverConnectionString, string storageConnectionString)
|
||||
{
|
||||
if (eventHubName == null)
|
||||
{
|
||||
throw new ArgumentNullException("eventHubName");
|
||||
}
|
||||
if (receiverConnectionString == null)
|
||||
{
|
||||
throw new ArgumentNullException("receiverConnectionString");
|
||||
}
|
||||
if (storageConnectionString == null)
|
||||
{
|
||||
throw new ArgumentNullException("storageConnectionString");
|
||||
}
|
||||
|
||||
this._receiverCreds[eventHubName] = new ReceiverCreds
|
||||
{
|
||||
EventHubConnectionString = receiverConnectionString,
|
||||
StorageConnectionString = storageConnectionString
|
||||
};
|
||||
}
|
||||
|
||||
internal EventHubClient GetEventHubClient(string eventHubName, string connection)
|
||||
{
|
||||
EventHubClient client;
|
||||
if (_clients.TryGetValue(eventHubName, out client))
|
||||
{
|
||||
return client;
|
||||
}
|
||||
else if (!string.IsNullOrWhiteSpace(connection))
|
||||
{
|
||||
return _clients.GetOrAdd(eventHubName, key =>
|
||||
{
|
||||
AddSender(key, connection);
|
||||
return _clients[key];
|
||||
});
|
||||
}
|
||||
throw new InvalidOperationException("No event hub sender named " + eventHubName);
|
||||
}
|
||||
|
||||
// Lookup a listener for receiving events given the name provided in the [EventHubTrigger] attribute.
|
||||
internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string eventHubName, string consumerGroup)
|
||||
{
|
||||
ReceiverCreds creds;
|
||||
if (this._receiverCreds.TryGetValue(eventHubName, out creds))
|
||||
{
|
||||
// Common case. Create a new EventProcessorHost instance to listen.
|
||||
string eventProcessorHostName = Guid.NewGuid().ToString();
|
||||
|
||||
if (consumerGroup == null)
|
||||
{
|
||||
consumerGroup = PartitionReceiver.DefaultConsumerGroupName;
|
||||
}
|
||||
var storageConnectionString = creds.StorageConnectionString;
|
||||
if (storageConnectionString == null)
|
||||
{
|
||||
string defaultStorageString = config.GetWebJobsConnectionString(ConnectionStringNames.Storage);
|
||||
storageConnectionString = defaultStorageString;
|
||||
}
|
||||
|
||||
// If the connection string provides a hub name, that takes precedence.
|
||||
// Note that connection strings *can't* specify a consumerGroup, so must always be passed in.
|
||||
string actualPath = eventHubName;
|
||||
EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(creds.EventHubConnectionString);
|
||||
if (sb.EntityPath != null)
|
||||
{
|
||||
actualPath = sb.EntityPath;
|
||||
sb.EntityPath = null; // need to remove to use with EventProcessorHost
|
||||
}
|
||||
|
||||
var @namespace = GetEventHubNamespace(sb);
|
||||
var blobPrefix = GetBlobPrefix(actualPath, @namespace);
|
||||
|
||||
// Use blob prefix support available in EPH starting in 2.2.6
|
||||
EventProcessorHost host = new EventProcessorHost(
|
||||
hostName: eventProcessorHostName,
|
||||
eventHubPath: actualPath,
|
||||
consumerGroupName: consumerGroup,
|
||||
eventHubConnectionString: sb.ToString(),
|
||||
storageConnectionString: storageConnectionString,
|
||||
leaseContainerName: LeaseContainerName,
|
||||
storageBlobPrefix: blobPrefix);
|
||||
|
||||
host.PartitionManagerOptions = PartitionManagerOptions;
|
||||
|
||||
return host;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Rare case: a power-user caller specifically provided an event processor host to use.
|
||||
EventProcessorHost host;
|
||||
if (_explicitlyProvidedHosts.TryGetValue(eventHubName, out host))
|
||||
{
|
||||
return host;
|
||||
}
|
||||
}
|
||||
throw new InvalidOperationException("No event hub receiver named " + eventHubName);
|
||||
}
|
||||
|
||||
private static string EscapeStorageCharacter(char character)
|
||||
{
|
||||
var ordinalValue = (ushort)character;
|
||||
if (ordinalValue < 0x100)
|
||||
{
|
||||
return string.Format(CultureInfo.InvariantCulture, ":{0:X2}", ordinalValue);
|
||||
}
|
||||
else
|
||||
{
|
||||
return string.Format(CultureInfo.InvariantCulture, "::{0:X4}", ordinalValue);
|
||||
}
|
||||
}
|
||||
|
||||
// Escape a blob path.
|
||||
// For diagnostics, we want human-readble strings that resemble the input.
|
||||
// Inputs are most commonly alphanumeric with a fex extra chars (dash, underscore, dot).
|
||||
// Escape character is a ':', which is also escaped.
|
||||
// Blob names are case sensitive; whereas input is case insensitive, so normalize to lower.
|
||||
private static string EscapeBlobPath(string path)
|
||||
{
|
||||
StringBuilder sb = new StringBuilder(path.Length);
|
||||
foreach (char c in path)
|
||||
{
|
||||
if (c >= 'a' && c <= 'z')
|
||||
{
|
||||
sb.Append(c);
|
||||
}
|
||||
else if (c == '-' || c == '_' || c == '.')
|
||||
{
|
||||
// Potentially common carahcters.
|
||||
sb.Append(c);
|
||||
}
|
||||
else if (c >= 'A' && c <= 'Z')
|
||||
{
|
||||
sb.Append((char)(c - 'A' + 'a')); // ToLower
|
||||
}
|
||||
else if (c >= '0' && c <= '9')
|
||||
{
|
||||
sb.Append(c);
|
||||
}
|
||||
else
|
||||
{
|
||||
sb.Append(EscapeStorageCharacter(c));
|
||||
}
|
||||
}
|
||||
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
private static string GetEventHubNamespace(EventHubsConnectionStringBuilder connectionString)
|
||||
{
|
||||
// EventHubs only have 1 endpoint.
|
||||
var url = connectionString.Endpoint;
|
||||
var @namespace = url.Host;
|
||||
return @namespace;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the blob prefix used with EventProcessorHost for a given event hub.
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">the event hub path</param>
|
||||
/// <param name="serviceBusNamespace">the event hub's service bus namespace.</param>
|
||||
/// <returns>a blob prefix path that can be passed to EventProcessorHost.</returns>
|
||||
/// <remarks>
|
||||
/// An event hub is defined by it's path and namespace. The namespace is extracted from the connection string.
|
||||
/// This must be an injective one-to-one function because:
|
||||
/// 1. multiple machines listening on the same event hub must use the same blob prefix. This means it must be deterministic.
|
||||
/// 2. different event hubs must not resolve to the same path.
|
||||
/// </remarks>
|
||||
public static string GetBlobPrefix(string eventHubName, string serviceBusNamespace)
|
||||
{
|
||||
if (eventHubName == null)
|
||||
{
|
||||
throw new ArgumentNullException("eventHubName");
|
||||
}
|
||||
if (serviceBusNamespace == null)
|
||||
{
|
||||
throw new ArgumentNullException("serviceBusNamespace");
|
||||
}
|
||||
|
||||
string key = EscapeBlobPath(serviceBusNamespace) + "/" + EscapeBlobPath(eventHubName) + "/";
|
||||
return key;
|
||||
}
|
||||
|
||||
public string Format()
|
||||
{
|
||||
JObject eventProcessorOptions = null;
|
||||
if (EventProcessorOptions != null)
|
||||
{
|
||||
eventProcessorOptions = new JObject
|
||||
{
|
||||
{ nameof(EventProcessorOptions.EnableReceiverRuntimeMetric), EventProcessorOptions.EnableReceiverRuntimeMetric },
|
||||
{ nameof(EventProcessorOptions.InvokeProcessorAfterReceiveTimeout), EventProcessorOptions.InvokeProcessorAfterReceiveTimeout },
|
||||
{ nameof(EventProcessorOptions.MaxBatchSize), EventProcessorOptions.MaxBatchSize },
|
||||
{ nameof(EventProcessorOptions.PrefetchCount), EventProcessorOptions.PrefetchCount },
|
||||
{ nameof(EventProcessorOptions.ReceiveTimeout), EventProcessorOptions.ReceiveTimeout }
|
||||
};
|
||||
}
|
||||
|
||||
JObject partitionManagerOptions = null;
|
||||
if (PartitionManagerOptions != null)
|
||||
{
|
||||
partitionManagerOptions = new JObject
|
||||
{
|
||||
{ nameof(PartitionManagerOptions.LeaseDuration), PartitionManagerOptions.LeaseDuration },
|
||||
{ nameof(PartitionManagerOptions.RenewInterval), PartitionManagerOptions.RenewInterval },
|
||||
};
|
||||
}
|
||||
|
||||
JObject options = new JObject
|
||||
{
|
||||
{ nameof(BatchCheckpointFrequency), BatchCheckpointFrequency },
|
||||
{ nameof(EventProcessorOptions), eventProcessorOptions },
|
||||
{ nameof(PartitionManagerOptions), partitionManagerOptions }
|
||||
};
|
||||
|
||||
return options.ToString(Formatting.Indented);
|
||||
}
|
||||
|
||||
// Hold credentials for a given eventHub name.
|
||||
// Multiple consumer groups (and multiple listeners) on the same hub can share the same credentials.
|
||||
private class ReceiverCreds
|
||||
{
|
||||
// Required.
|
||||
public string EventHubConnectionString { get; set; }
|
||||
|
||||
// Optional. If not found, use the stroage from JobHostConfiguration
|
||||
public string StorageConnectionString { get; set; }
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,49 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.EventHubs;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Microsoft.Extensions.Hosting
|
||||
{
|
||||
public static class EventHubWebJobsBuilderExtensions
|
||||
{
|
||||
public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder)
|
||||
{
|
||||
if (builder == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(builder));
|
||||
}
|
||||
|
||||
builder.AddEventHubs(p => {});
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<EventHubOptions> configure)
|
||||
{
|
||||
if (builder == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(builder));
|
||||
}
|
||||
|
||||
if (configure == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(configure));
|
||||
}
|
||||
|
||||
builder.AddExtension<EventHubExtensionConfigProvider>()
|
||||
.BindOptions<EventHubOptions>();
|
||||
|
||||
builder.Services.Configure<EventHubOptions>(options =>
|
||||
{
|
||||
configure(options);
|
||||
});
|
||||
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.Azure.WebJobs.Description;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs
|
||||
{
|
||||
/// <summary>
|
||||
/// Setup an 'output' binding to an EventHub. This can be any output type compatible with an IAsyncCollector.
|
||||
/// </summary>
|
||||
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
|
||||
[Binding]
|
||||
public sealed class EventHubAttribute : Attribute
|
||||
{
|
||||
/// <summary>
|
||||
/// Initialize a new instance of the <see cref="EventHubAttribute"/>
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">Name of the event hub as resolved against the <see cref="EventHubConfiguration"/> </param>
|
||||
public EventHubAttribute(string eventHubName)
|
||||
{
|
||||
EventHubName = eventHubName;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The name of the event hub. This is resolved against the <see cref="EventHubConfiguration"/>
|
||||
/// </summary>
|
||||
[AutoResolve]
|
||||
public string EventHubName { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the optional connection string name that contains the Event Hub connection string. If missing, tries to use a registered event hub sender.
|
||||
/// </summary>
|
||||
[ConnectionString]
|
||||
public string Connection { get; set; }
|
||||
}
|
||||
}
|
|
@ -1,40 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.Azure.WebJobs.Description;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs
|
||||
{
|
||||
/// <summary>
|
||||
/// Setup an 'trigger' on a parameter to listen on events from an event hub.
|
||||
/// </summary>
|
||||
[AttributeUsage(AttributeTargets.Parameter)]
|
||||
[Binding]
|
||||
public sealed class EventHubTriggerAttribute : Attribute
|
||||
{
|
||||
/// <summary>
|
||||
/// Create an instance of this attribute.
|
||||
/// </summary>
|
||||
/// <param name="eventHubName">Event hub to listen on for messages. </param>
|
||||
public EventHubTriggerAttribute(string eventHubName)
|
||||
{
|
||||
EventHubName = eventHubName;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Name of the event hub.
|
||||
/// </summary>
|
||||
public string EventHubName { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional Name of the consumer group. If missing, then use the default name, "$Default"
|
||||
/// </summary>
|
||||
public string ConsumerGroup { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the optional app setting name that contains the Event Hub connection string. If missing, tries to use a registered event hub receiver.
|
||||
/// </summary>
|
||||
public string Connection { get; set; }
|
||||
}
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using Microsoft.Azure.WebJobs.EventHubs;
|
||||
using Microsoft.Azure.WebJobs.Hosting;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
|
||||
[assembly: WebJobsStartup(typeof(EventHubsWebJobsStartup))]
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
public class EventHubsWebJobsStartup : IWebJobsStartup
|
||||
{
|
||||
public void Configure(IWebJobsBuilder builder)
|
||||
{
|
||||
builder.AddEventHubs();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using static Microsoft.Azure.EventHubs.EventData;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
static internal class SystemPropertiesCollectionExtensions
|
||||
{
|
||||
internal static IDictionary<string, object> ToDictionary(this SystemPropertiesCollection collection)
|
||||
{
|
||||
IDictionary<string, object> modifiedDictionary = new Dictionary<string, object>(collection);
|
||||
|
||||
// Following is needed to maintain structure of bindingdata: https://github.com/Azure/azure-webjobs-sdk/pull/1849
|
||||
modifiedDictionary["SequenceNumber"] = collection.SequenceNumber;
|
||||
modifiedDictionary["Offset"] = collection.Offset;
|
||||
modifiedDictionary["PartitionKey"] = collection.PartitionKey;
|
||||
modifiedDictionary["EnqueuedTimeUtc"] = collection.EnqueuedTimeUtc;
|
||||
return modifiedDictionary;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,233 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs.Host.Executors;
|
||||
using Microsoft.Azure.WebJobs.Host.Listeners;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
internal sealed class EventHubListener : IListener, IEventProcessorFactory
|
||||
{
|
||||
private readonly ITriggeredFunctionExecutor _executor;
|
||||
private readonly EventProcessorHost _eventProcessorHost;
|
||||
private readonly bool _singleDispatch;
|
||||
private readonly EventHubOptions _options;
|
||||
private readonly ILogger _logger;
|
||||
private bool _started;
|
||||
|
||||
public EventHubListener(ITriggeredFunctionExecutor executor, EventProcessorHost eventProcessorHost, bool singleDispatch, EventHubOptions options, ILogger logger)
|
||||
{
|
||||
_executor = executor;
|
||||
_eventProcessorHost = eventProcessorHost;
|
||||
_singleDispatch = singleDispatch;
|
||||
_options = options;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
void IListener.Cancel()
|
||||
{
|
||||
StopAsync(CancellationToken.None).Wait();
|
||||
}
|
||||
|
||||
void IDisposable.Dispose()
|
||||
{
|
||||
}
|
||||
|
||||
public async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.EventProcessorOptions);
|
||||
_started = true;
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_started)
|
||||
{
|
||||
await _eventProcessorHost.UnregisterEventProcessorAsync();
|
||||
}
|
||||
_started = false;
|
||||
}
|
||||
|
||||
IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext context)
|
||||
{
|
||||
return new EventProcessor(_options, _executor, _logger, _singleDispatch);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Wrapper for un-mockable checkpoint APIs to aid in unit testing
|
||||
/// </summary>
|
||||
internal interface ICheckpointer
|
||||
{
|
||||
Task CheckpointAsync(PartitionContext context);
|
||||
}
|
||||
|
||||
// We get a new instance each time Start() is called.
|
||||
// We'll get a listener per partition - so they can potentialy run in parallel even on a single machine.
|
||||
internal class EventProcessor : IEventProcessor, IDisposable, ICheckpointer
|
||||
{
|
||||
private readonly ITriggeredFunctionExecutor _executor;
|
||||
private readonly bool _singleDispatch;
|
||||
private readonly ILogger _logger;
|
||||
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
|
||||
private readonly ICheckpointer _checkpointer;
|
||||
private readonly int _batchCheckpointFrequency;
|
||||
private int _batchCounter = 0;
|
||||
private bool _disposed = false;
|
||||
|
||||
public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, ICheckpointer checkpointer = null)
|
||||
{
|
||||
_checkpointer = checkpointer ?? this;
|
||||
_executor = executor;
|
||||
_singleDispatch = singleDispatch;
|
||||
_batchCheckpointFrequency = options.BatchCheckpointFrequency;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task CloseAsync(PartitionContext context, CloseReason reason)
|
||||
{
|
||||
// signal cancellation for any in progress executions
|
||||
_cts.Cancel();
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task OpenAsync(PartitionContext context)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task ProcessErrorAsync(PartitionContext context, Exception error)
|
||||
{
|
||||
string errorDetails = $"Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}'";
|
||||
|
||||
if (error is ReceiverDisconnectedException ||
|
||||
error is LeaseLostException)
|
||||
{
|
||||
// For EventProcessorHost these exceptions can happen as part
|
||||
// of normal partition balancing across instances, so we want to
|
||||
// trace them, but not treat them as errors.
|
||||
_logger.LogInformation($"An Event Hub exception of type '{error.GetType().Name}' was thrown from {errorDetails}. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.");
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogError(error, $"Error processing event from {errorDetails}");
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
|
||||
{
|
||||
var triggerInput = new EventHubTriggerInput
|
||||
{
|
||||
Events = messages.ToArray(),
|
||||
PartitionContext = context
|
||||
};
|
||||
|
||||
if (_singleDispatch)
|
||||
{
|
||||
// Single dispatch
|
||||
int eventCount = triggerInput.Events.Length;
|
||||
List<Task> invocationTasks = new List<Task>();
|
||||
for (int i = 0; i < eventCount; i++)
|
||||
{
|
||||
if (_cts.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
var input = new TriggeredFunctionData
|
||||
{
|
||||
TriggerValue = triggerInput.GetSingleEventTriggerInput(i)
|
||||
};
|
||||
Task task = _executor.TryExecuteAsync(input, _cts.Token);
|
||||
invocationTasks.Add(task);
|
||||
}
|
||||
|
||||
// Drain the whole batch before taking more work
|
||||
if (invocationTasks.Count > 0)
|
||||
{
|
||||
await Task.WhenAll(invocationTasks);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Batch dispatch
|
||||
var input = new TriggeredFunctionData
|
||||
{
|
||||
TriggerValue = triggerInput
|
||||
};
|
||||
|
||||
await _executor.TryExecuteAsync(input, _cts.Token);
|
||||
}
|
||||
// Dispose all messages to help with memory pressure. If this is missed, the finalizer thread will still get them.
|
||||
bool hasEvents = false;
|
||||
foreach (var message in messages)
|
||||
{
|
||||
hasEvents = true;
|
||||
message.Dispose();
|
||||
}
|
||||
|
||||
// Checkpoint if we procesed any events.
|
||||
// Don't checkpoint if no events. This can reset the sequence counter to 0.
|
||||
// Note: we intentionally checkpoint the batch regardless of function
|
||||
// success/failure. EventHub doesn't support any sort "poison event" model,
|
||||
// so that is the responsibility of the user's function currently. E.g.
|
||||
// the function should have try/catch handling around all event processing
|
||||
// code, and capture/log/persist failed events, since they won't be retried.
|
||||
if (hasEvents)
|
||||
{
|
||||
await CheckpointAsync(context);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CheckpointAsync(PartitionContext context)
|
||||
{
|
||||
if (_batchCheckpointFrequency == 1)
|
||||
{
|
||||
await _checkpointer.CheckpointAsync(context);
|
||||
}
|
||||
else
|
||||
{
|
||||
// only checkpoint every N batches
|
||||
if (++_batchCounter >= _batchCheckpointFrequency)
|
||||
{
|
||||
_batchCounter = 0;
|
||||
await _checkpointer.CheckpointAsync(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
if (!_disposed)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
_cts.Dispose();
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
}
|
||||
|
||||
async Task ICheckpointer.CheckpointAsync(PartitionContext context)
|
||||
{
|
||||
await context.CheckpointAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("WebJobs.Extensions.EventHubs.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")]
|
||||
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
|
|
@ -1,185 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
/// <summary>
|
||||
/// Core object to send events to EventHub.
|
||||
/// Any user parameter that sends EventHub events will eventually get bound to this object.
|
||||
/// This will queue events and send in batches, also keeping under the 256kb event hub limit per batch.
|
||||
/// </summary>
|
||||
internal class EventHubAsyncCollector : IAsyncCollector<EventData>
|
||||
{
|
||||
private readonly EventHubClient _client;
|
||||
|
||||
private readonly Dictionary<string, PartitionCollector> _partitions = new Dictionary<string, PartitionCollector>();
|
||||
|
||||
private const int BatchSize = 100;
|
||||
|
||||
// Suggested to use 240k instead of 256k to leave padding room for headers.
|
||||
private const int MaxByteSize = 240 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Create a sender around the given client.
|
||||
/// </summary>
|
||||
/// <param name="client"></param>
|
||||
public EventHubAsyncCollector(EventHubClient client)
|
||||
{
|
||||
if (client == null)
|
||||
{
|
||||
throw new ArgumentNullException("client");
|
||||
}
|
||||
_client = client;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add an event.
|
||||
/// </summary>
|
||||
/// <param name="item">The event to add</param>
|
||||
/// <param name="cancellationToken">a cancellation token. </param>
|
||||
/// <returns></returns>
|
||||
public async Task AddAsync(EventData item, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
if (item == null)
|
||||
{
|
||||
throw new ArgumentNullException("item");
|
||||
}
|
||||
|
||||
string key = item.SystemProperties?.PartitionKey ?? string.Empty;
|
||||
|
||||
PartitionCollector partition;
|
||||
lock (_partitions)
|
||||
{
|
||||
if (!_partitions.TryGetValue(key, out partition))
|
||||
{
|
||||
partition = new PartitionCollector(this);
|
||||
_partitions[key] = partition;
|
||||
}
|
||||
}
|
||||
await partition.AddAsync(item, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// synchronously flush events that have been queued up via AddAsync.
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken">a cancellation token</param>
|
||||
public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
PartitionCollector partition;
|
||||
lock (_partitions)
|
||||
{
|
||||
if (_partitions.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
var kv = _partitions.First();
|
||||
partition = kv.Value;
|
||||
_partitions.Remove(kv.Key);
|
||||
}
|
||||
|
||||
await partition.FlushAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Send the batch of events. All items in the batch will have the same partition key.
|
||||
/// </summary>
|
||||
/// <param name="batch">the set of events to send</param>
|
||||
protected virtual async Task SendBatchAsync(IEnumerable<EventData> batch)
|
||||
{
|
||||
await _client.SendAsync(batch);
|
||||
}
|
||||
|
||||
// A per-partition sender
|
||||
private class PartitionCollector : IAsyncCollector<EventData>
|
||||
{
|
||||
private readonly EventHubAsyncCollector _parent;
|
||||
|
||||
private List<EventData> _list = new List<EventData>();
|
||||
|
||||
// total size of bytes in _list that we'll be sending in this batch.
|
||||
private int _currentByteSize = 0;
|
||||
|
||||
public PartitionCollector(EventHubAsyncCollector parent)
|
||||
{
|
||||
this._parent = parent;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add an event.
|
||||
/// </summary>
|
||||
/// <param name="item">The event to add</param>
|
||||
/// <param name="cancellationToken">a cancellation token. </param>
|
||||
/// <returns></returns>
|
||||
public async Task AddAsync(EventData item, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
if (item == null)
|
||||
{
|
||||
throw new ArgumentNullException("item");
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
lock (_list)
|
||||
{
|
||||
var size = (int)item.Body.Count;
|
||||
|
||||
if (size > MaxByteSize)
|
||||
{
|
||||
// Single event is too large to add.
|
||||
string msg = string.Format("Event is too large. Event is approximately {0}b and max size is {1}b", size, MaxByteSize);
|
||||
throw new InvalidOperationException(msg);
|
||||
}
|
||||
|
||||
bool flush = (_currentByteSize + size > MaxByteSize) || (_list.Count >= BatchSize);
|
||||
if (!flush)
|
||||
{
|
||||
_list.Add(item);
|
||||
_currentByteSize += size;
|
||||
return;
|
||||
}
|
||||
// We should flush.
|
||||
// Release the lock, flush, and then loop around and try again.
|
||||
}
|
||||
|
||||
await this.FlushAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// synchronously flush events that have been queued up via AddAsync.
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken">a cancellation token</param>
|
||||
public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
EventData[] batch = null;
|
||||
lock (_list)
|
||||
{
|
||||
batch = _list.ToArray();
|
||||
_list.Clear();
|
||||
_currentByteSize = 0;
|
||||
}
|
||||
|
||||
if (batch.Length > 0)
|
||||
{
|
||||
await _parent.SendBatchAsync(batch);
|
||||
|
||||
// Dispose all messages to help with memory pressure. If this is missed, the finalizer thread will still get them.
|
||||
foreach (var msg in batch)
|
||||
{
|
||||
msg.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,82 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Reflection;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Azure.WebJobs.Host.Bindings;
|
||||
using Microsoft.Azure.WebJobs.Host.Listeners;
|
||||
using Microsoft.Azure.WebJobs.Host.Triggers;
|
||||
using Microsoft.Azure.WebJobs.Logging;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
internal class EventHubTriggerAttributeBindingProvider : ITriggerBindingProvider
|
||||
{
|
||||
private readonly INameResolver _nameResolver;
|
||||
private readonly ILogger _logger;
|
||||
private readonly IConfiguration _config;
|
||||
private readonly IOptions<EventHubOptions> _options;
|
||||
private readonly IConverterManager _converterManager;
|
||||
|
||||
public EventHubTriggerAttributeBindingProvider(
|
||||
IConfiguration configuration,
|
||||
INameResolver nameResolver,
|
||||
IConverterManager converterManager,
|
||||
IOptions<EventHubOptions> options,
|
||||
ILoggerFactory loggerFactory)
|
||||
{
|
||||
_config = configuration;
|
||||
_nameResolver = nameResolver;
|
||||
_converterManager = converterManager;
|
||||
_options = options;
|
||||
_logger = loggerFactory?.CreateLogger(LogCategories.CreateTriggerCategory("EventHub"));
|
||||
}
|
||||
|
||||
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
|
||||
public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
|
||||
{
|
||||
if (context == null)
|
||||
{
|
||||
throw new ArgumentNullException("context");
|
||||
}
|
||||
|
||||
ParameterInfo parameter = context.Parameter;
|
||||
EventHubTriggerAttribute attribute = parameter.GetCustomAttribute<EventHubTriggerAttribute>(inherit: false);
|
||||
|
||||
if (attribute == null)
|
||||
{
|
||||
return Task.FromResult<ITriggerBinding>(null);
|
||||
}
|
||||
|
||||
string resolvedEventHubName = _nameResolver.ResolveWholeString(attribute.EventHubName);
|
||||
|
||||
string consumerGroup = attribute.ConsumerGroup ?? PartitionReceiver.DefaultConsumerGroupName;
|
||||
string resolvedConsumerGroup = _nameResolver.ResolveWholeString(consumerGroup);
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(attribute.Connection))
|
||||
{
|
||||
attribute.Connection = _nameResolver.ResolveWholeString(attribute.Connection);
|
||||
var connectionString = _config.GetConnectionStringOrSetting(attribute.Connection);
|
||||
_options.Value.AddReceiver(resolvedEventHubName, connectionString);
|
||||
}
|
||||
|
||||
var eventHostListener = _options.Value.GetEventProcessorHost(_config, resolvedEventHubName, resolvedConsumerGroup);
|
||||
|
||||
Func<ListenerFactoryContext, bool, Task<IListener>> createListener =
|
||||
(factoryContext, singleDispatch) =>
|
||||
{
|
||||
IListener listener = new EventHubListener(factoryContext.Executor, eventHostListener, singleDispatch, _options.Value, _logger);
|
||||
return Task.FromResult(listener);
|
||||
};
|
||||
|
||||
ITriggerBinding binding = BindingFactory.GetTriggerBinding(new EventHubTriggerBindingStrategy(), parameter, _converterManager, createListener);
|
||||
return Task.FromResult<ITriggerBinding>(binding);
|
||||
}
|
||||
} // end class
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
// The core object we get when an EventHub is triggered.
|
||||
// This gets converted to the user type (EventData, string, poco, etc)
|
||||
internal sealed class EventHubTriggerInput
|
||||
{
|
||||
// If != -1, then only process a single event in this batch.
|
||||
private int _selector = -1;
|
||||
|
||||
internal EventData[] Events { get; set; }
|
||||
|
||||
internal PartitionContext PartitionContext { get; set; }
|
||||
|
||||
public bool IsSingleDispatch
|
||||
{
|
||||
get
|
||||
{
|
||||
return _selector != -1;
|
||||
}
|
||||
}
|
||||
|
||||
public static EventHubTriggerInput New(EventData eventData)
|
||||
{
|
||||
return new EventHubTriggerInput
|
||||
{
|
||||
PartitionContext = null,
|
||||
Events = new EventData[]
|
||||
{
|
||||
eventData
|
||||
},
|
||||
_selector = 0,
|
||||
};
|
||||
}
|
||||
|
||||
public EventHubTriggerInput GetSingleEventTriggerInput(int idx)
|
||||
{
|
||||
return new EventHubTriggerInput
|
||||
{
|
||||
Events = this.Events,
|
||||
PartitionContext = this.PartitionContext,
|
||||
_selector = idx
|
||||
};
|
||||
}
|
||||
|
||||
public EventData GetSingleEventData()
|
||||
{
|
||||
return this.Events[this._selector];
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,143 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs.Host.Bindings;
|
||||
using Microsoft.Azure.WebJobs.Host.Triggers;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs
|
||||
{
|
||||
// Binding strategy for an event hub triggers.
|
||||
internal class EventHubTriggerBindingStrategy : ITriggerBindingStrategy<EventData, EventHubTriggerInput>
|
||||
{
|
||||
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
|
||||
public EventHubTriggerInput ConvertFromString(string input)
|
||||
{
|
||||
byte[] bytes = Encoding.UTF8.GetBytes(input);
|
||||
EventData eventData = new EventData(bytes);
|
||||
|
||||
// Return a single event. Doesn't support multiple dispatch
|
||||
return EventHubTriggerInput.New(eventData);
|
||||
}
|
||||
|
||||
// Single instance: Core --> EventData
|
||||
public EventData BindSingle(EventHubTriggerInput value, ValueBindingContext context)
|
||||
{
|
||||
if (value == null)
|
||||
{
|
||||
throw new ArgumentNullException("value");
|
||||
}
|
||||
return value.GetSingleEventData();
|
||||
}
|
||||
|
||||
public EventData[] BindMultiple(EventHubTriggerInput value, ValueBindingContext context)
|
||||
{
|
||||
if (value == null)
|
||||
{
|
||||
throw new ArgumentNullException("value");
|
||||
}
|
||||
return value.Events;
|
||||
}
|
||||
|
||||
public Dictionary<string, Type> GetBindingContract(bool isSingleDispatch = true)
|
||||
{
|
||||
var contract = new Dictionary<string, Type>(StringComparer.OrdinalIgnoreCase);
|
||||
contract.Add("PartitionContext", typeof(PartitionContext));
|
||||
|
||||
AddBindingContractMember(contract, "PartitionKey", typeof(string), isSingleDispatch);
|
||||
AddBindingContractMember(contract, "Offset", typeof(string), isSingleDispatch);
|
||||
AddBindingContractMember(contract, "SequenceNumber", typeof(long), isSingleDispatch);
|
||||
AddBindingContractMember(contract, "EnqueuedTimeUtc", typeof(DateTime), isSingleDispatch);
|
||||
AddBindingContractMember(contract, "Properties", typeof(IDictionary<string, object>), isSingleDispatch);
|
||||
AddBindingContractMember(contract, "SystemProperties", typeof(IDictionary<string, object>), isSingleDispatch);
|
||||
|
||||
return contract;
|
||||
}
|
||||
|
||||
private static void AddBindingContractMember(Dictionary<string, Type> contract, string name, Type type, bool isSingleDispatch)
|
||||
{
|
||||
if (!isSingleDispatch)
|
||||
{
|
||||
name += "Array";
|
||||
}
|
||||
contract.Add(name, isSingleDispatch ? type : type.MakeArrayType());
|
||||
}
|
||||
|
||||
public Dictionary<string, object> GetBindingData(EventHubTriggerInput value)
|
||||
{
|
||||
if (value == null)
|
||||
{
|
||||
throw new ArgumentNullException("value");
|
||||
}
|
||||
|
||||
var bindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
|
||||
SafeAddValue(() => bindingData.Add(nameof(value.PartitionContext), value.PartitionContext));
|
||||
|
||||
if (value.IsSingleDispatch)
|
||||
{
|
||||
AddBindingData(bindingData, value.GetSingleEventData());
|
||||
}
|
||||
else
|
||||
{
|
||||
AddBindingData(bindingData, value.Events);
|
||||
}
|
||||
|
||||
return bindingData;
|
||||
}
|
||||
|
||||
internal static void AddBindingData(Dictionary<string, object> bindingData, EventData[] events)
|
||||
{
|
||||
int length = events.Length;
|
||||
var partitionKeys = new string[length];
|
||||
var offsets = new string[length];
|
||||
var sequenceNumbers = new long[length];
|
||||
var enqueuedTimesUtc = new DateTime[length];
|
||||
var properties = new IDictionary<string, object>[length];
|
||||
var systemProperties = new IDictionary<string, object>[length];
|
||||
|
||||
SafeAddValue(() => bindingData.Add("PartitionKeyArray", partitionKeys));
|
||||
SafeAddValue(() => bindingData.Add("OffsetArray", offsets));
|
||||
SafeAddValue(() => bindingData.Add("SequenceNumberArray", sequenceNumbers));
|
||||
SafeAddValue(() => bindingData.Add("EnqueuedTimeUtcArray", enqueuedTimesUtc));
|
||||
SafeAddValue(() => bindingData.Add("PropertiesArray", properties));
|
||||
SafeAddValue(() => bindingData.Add("SystemPropertiesArray", systemProperties));
|
||||
|
||||
for (int i = 0; i < events.Length; i++)
|
||||
{
|
||||
partitionKeys[i] = events[i].SystemProperties?.PartitionKey;
|
||||
offsets[i] = events[i].SystemProperties?.Offset;
|
||||
sequenceNumbers[i] = events[i].SystemProperties?.SequenceNumber ?? 0;
|
||||
enqueuedTimesUtc[i] = events[i].SystemProperties?.EnqueuedTimeUtc ?? DateTime.MinValue;
|
||||
properties[i] = events[i].Properties;
|
||||
systemProperties[i] = events[i].SystemProperties?.ToDictionary();
|
||||
}
|
||||
}
|
||||
|
||||
private static void AddBindingData(Dictionary<string, object> bindingData, EventData eventData)
|
||||
{
|
||||
SafeAddValue(() => bindingData.Add(nameof(eventData.SystemProperties.PartitionKey), eventData.SystemProperties?.PartitionKey));
|
||||
SafeAddValue(() => bindingData.Add(nameof(eventData.SystemProperties.Offset), eventData.SystemProperties?.Offset));
|
||||
SafeAddValue(() => bindingData.Add(nameof(eventData.SystemProperties.SequenceNumber), eventData.SystemProperties?.SequenceNumber ?? 0));
|
||||
SafeAddValue(() => bindingData.Add(nameof(eventData.SystemProperties.EnqueuedTimeUtc), eventData.SystemProperties?.EnqueuedTimeUtc ?? DateTime.MinValue));
|
||||
SafeAddValue(() => bindingData.Add(nameof(eventData.Properties), eventData.Properties));
|
||||
SafeAddValue(() => bindingData.Add(nameof(eventData.SystemProperties), eventData.SystemProperties?.ToDictionary()));
|
||||
}
|
||||
|
||||
private static void SafeAddValue(Action addValue)
|
||||
{
|
||||
try
|
||||
{
|
||||
addValue();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// some message propery getters can throw, based on the
|
||||
// state of the message
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<Import Project="..\..\build\common.props" />
|
||||
|
||||
<PropertyGroup>
|
||||
<Version>$(ExtensionsEventHubsVersion)</Version>
|
||||
<TargetFramework>netstandard2.0</TargetFramework>
|
||||
<AssemblyName>Microsoft.Azure.WebJobs.EventHubs</AssemblyName>
|
||||
<RootNamespace>Microsoft.Azure.WebJobs.EventHubs</RootNamespace>
|
||||
<PackageId>Microsoft.Azure.WebJobs.Extensions.EventHubs</PackageId>
|
||||
<Description>Microsoft Azure WebJobs SDK EventHubs Extension</Description>
|
||||
</PropertyGroup>
|
||||
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
|
||||
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<WarningsAsErrors />
|
||||
</PropertyGroup>
|
||||
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
|
||||
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<WarningsAsErrors />
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="2.1.0" />
|
||||
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
</PackageReference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Microsoft.Azure.WebJobs.Host\WebJobs.Host.csproj" />
|
||||
<ProjectReference Include="..\Microsoft.Azure.WebJobs\WebJobs.csproj" />
|
||||
</ItemGroup>
|
||||
</Project>
|
|
@ -1,268 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.WebJobs.Host.TestCommon;
|
||||
using Xunit;
|
||||
|
||||
using static Microsoft.Azure.EventHubs.EventData;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
|
||||
{
|
||||
public class EventHubAsyncCollectorTests
|
||||
{
|
||||
[Fact]
|
||||
public void NullArgumentCheck()
|
||||
{
|
||||
Assert.Throws<ArgumentNullException>(() => new EventHubAsyncCollector(null));
|
||||
}
|
||||
|
||||
public EventData CreateEvent(byte[] body, string partitionKey)
|
||||
{
|
||||
var data = new EventData(body);
|
||||
IDictionary<string, object> sysProps = TestHelpers.New<SystemPropertiesCollection>();
|
||||
sysProps["x-opt-partition-key"] = partitionKey;
|
||||
TestHelpers.SetField(data, "SystemProperties", sysProps);
|
||||
return data;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SendMultiplePartitions()
|
||||
{
|
||||
var collector = new TestEventHubAsyncCollector();
|
||||
|
||||
await collector.AddAsync(this.CreateEvent(new byte[] { 1 }, "pk1"));
|
||||
await collector.AddAsync(CreateEvent(new byte[] { 2 }, "pk2"));
|
||||
|
||||
// Not physically sent yet since we haven't flushed
|
||||
Assert.Empty(collector.SentEvents);
|
||||
|
||||
await collector.FlushAsync();
|
||||
|
||||
// Partitions aren't flushed in a specific order.
|
||||
Assert.Equal(2, collector.SentEvents.Count);
|
||||
var items = collector.SentEvents.ToArray();
|
||||
|
||||
var item0 = items[0];
|
||||
var item1 = items[1];
|
||||
Assert.Equal(3, item0[0] + item1[0]); // either order.
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NotSentUntilFlushed()
|
||||
{
|
||||
var collector = new TestEventHubAsyncCollector();
|
||||
|
||||
await collector.FlushAsync(); // should be nop.
|
||||
|
||||
var payload = new byte[] { 1, 2, 3 };
|
||||
var e1 = new EventData(payload);
|
||||
await collector.AddAsync(e1);
|
||||
|
||||
// Not physically sent yet since we haven't flushed
|
||||
Assert.Empty(collector.SentEvents);
|
||||
|
||||
await collector.FlushAsync();
|
||||
Assert.Single(collector.SentEvents);
|
||||
Assert.Equal(payload, collector.SentEvents[0]);
|
||||
}
|
||||
|
||||
// If we send enough events, that will eventually tip over and flush.
|
||||
[Fact]
|
||||
public async Task FlushAfterLotsOfSmallEvents()
|
||||
{
|
||||
var collector = new TestEventHubAsyncCollector();
|
||||
|
||||
// Sending a bunch of little events
|
||||
for (int i = 0; i < 150; i++)
|
||||
{
|
||||
var e1 = new EventData(new byte[] { 1, 2, 3 });
|
||||
await collector.AddAsync(e1);
|
||||
}
|
||||
|
||||
Assert.True(collector.SentEvents.Count > 0);
|
||||
}
|
||||
|
||||
// If we send enough events, that will eventually tip over and flush.
|
||||
[Fact]
|
||||
public async Task FlushAfterSizeThreshold()
|
||||
{
|
||||
var collector = new TestEventHubAsyncCollector();
|
||||
|
||||
// Trip the 256k EventHub limit.
|
||||
for (int i = 0; i < 10; i++)
|
||||
{
|
||||
var e1 = new EventData(new byte[10 * 1024]);
|
||||
await collector.AddAsync(e1);
|
||||
}
|
||||
|
||||
// Not yet
|
||||
Assert.Empty(collector.SentEvents);
|
||||
|
||||
// This will push it over the theshold
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
var e1 = new EventData(new byte[10 * 1024]);
|
||||
await collector.AddAsync(e1);
|
||||
}
|
||||
|
||||
Assert.True(collector.SentEvents.Count > 0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CantSentGiantEvent()
|
||||
{
|
||||
var collector = new TestEventHubAsyncCollector();
|
||||
|
||||
// event hub max is 256k payload.
|
||||
var hugePayload = new byte[300 * 1024];
|
||||
var e1 = new EventData(hugePayload);
|
||||
|
||||
try
|
||||
{
|
||||
await collector.AddAsync(e1);
|
||||
Assert.False(true);
|
||||
}
|
||||
catch (InvalidOperationException e)
|
||||
{
|
||||
// Exact error message (and serialization byte size) is subject to change.
|
||||
Assert.Contains("Event is too large", e.Message);
|
||||
}
|
||||
|
||||
// Verify we didn't queue anything
|
||||
await collector.FlushAsync();
|
||||
Assert.Empty(collector.SentEvents);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CantSendNullEvent()
|
||||
{
|
||||
var collector = new TestEventHubAsyncCollector();
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(
|
||||
async () => await collector.AddAsync(null));
|
||||
}
|
||||
|
||||
// Send lots of events from multiple threads and ensure that all events are precisely accounted for.
|
||||
[Fact]
|
||||
public async Task SendLotsOfEvents()
|
||||
{
|
||||
var collector = new TestEventHubAsyncCollector();
|
||||
|
||||
int numEvents = 1000;
|
||||
int numThreads = 10;
|
||||
|
||||
HashSet<string> expected = new HashSet<string>();
|
||||
|
||||
// Send from different physical threads.
|
||||
Thread[] threads = new Thread[numThreads];
|
||||
for (int iThread = 0; iThread < numThreads; iThread++)
|
||||
{
|
||||
var x = iThread;
|
||||
threads[x] = new Thread(
|
||||
() =>
|
||||
{
|
||||
for (int i = 0; i < numEvents; i++)
|
||||
{
|
||||
var idx = (x * numEvents) + i;
|
||||
var payloadStr = idx.ToString();
|
||||
var payload = Encoding.UTF8.GetBytes(payloadStr);
|
||||
lock (expected)
|
||||
{
|
||||
expected.Add(payloadStr);
|
||||
}
|
||||
collector.AddAsync(new EventData(payload)).Wait();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
foreach (var thread in threads)
|
||||
{
|
||||
thread.Start();
|
||||
}
|
||||
|
||||
foreach (var thread in threads)
|
||||
{
|
||||
thread.Join();
|
||||
}
|
||||
|
||||
// Add more events to trip flushing of the original batch without calling Flush()
|
||||
const string ignore = "ignore";
|
||||
byte[] ignoreBytes = Encoding.UTF8.GetBytes(ignore);
|
||||
for (int i = 0; i < 100; i++)
|
||||
{
|
||||
await collector.AddAsync(new EventData(ignoreBytes));
|
||||
}
|
||||
|
||||
// Verify that every event we sent is accounted for; and that there are no duplicates.
|
||||
int count = 0;
|
||||
foreach (var payloadBytes in collector.SentEvents)
|
||||
{
|
||||
count++;
|
||||
var payloadStr = Encoding.UTF8.GetString(payloadBytes);
|
||||
if (payloadStr == ignore)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!expected.Remove(payloadStr))
|
||||
{
|
||||
// Already removed!
|
||||
Assert.False(true, "event payload occured multiple times");
|
||||
}
|
||||
}
|
||||
|
||||
Assert.Empty(expected); // Some events where missed.
|
||||
}
|
||||
|
||||
internal class TestEventHubAsyncCollector : EventHubAsyncCollector
|
||||
{
|
||||
private static EventHubClient testClient = EventHubClient.CreateFromConnectionString(FakeConnectionString1);
|
||||
|
||||
// EventData is disposed after sending. So track raw bytes; not the actual EventData.
|
||||
private List<byte[]> sentEvents = new List<byte[]>();
|
||||
|
||||
// A fake connection string for event hubs. This just needs to parse. It won't actually get invoked.
|
||||
private const string FakeConnectionString = "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2";
|
||||
|
||||
public TestEventHubAsyncCollector()
|
||||
: base(TestClient)
|
||||
{
|
||||
}
|
||||
|
||||
public static EventHubClient TestClient { get => testClient; set => testClient = value; }
|
||||
|
||||
public static string FakeConnectionString1 => FakeConnectionString;
|
||||
|
||||
public List<byte[]> SentEvents { get => sentEvents; set => sentEvents = value; }
|
||||
|
||||
protected override Task SendBatchAsync(IEnumerable<EventData> batch)
|
||||
{
|
||||
// Assert they all have the same partition key (could be null)
|
||||
var partitionKey = batch.First().SystemProperties?.PartitionKey;
|
||||
foreach (var e in batch)
|
||||
{
|
||||
Assert.Equal(partitionKey, e.SystemProperties?.PartitionKey);
|
||||
}
|
||||
|
||||
lock (SentEvents)
|
||||
{
|
||||
foreach (var e in batch)
|
||||
{
|
||||
var payloadBytes = e.Body.Array;
|
||||
Assert.NotNull(payloadBytes);
|
||||
SentEvents.Add(payloadBytes);
|
||||
}
|
||||
}
|
||||
|
||||
return Task.FromResult(0);
|
||||
}
|
||||
}
|
||||
} // end class
|
||||
}
|
|
@ -1,206 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Azure.WebJobs.Host.TestCommon;
|
||||
using Microsoft.Azure.WebJobs.Host.Triggers;
|
||||
using Microsoft.Azure.WebJobs.Logging;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
|
||||
{
|
||||
public class EventHubConfigurationTests
|
||||
{
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly TestLoggerProvider _loggerProvider;
|
||||
|
||||
public EventHubConfigurationTests()
|
||||
{
|
||||
_loggerFactory = new LoggerFactory();
|
||||
|
||||
_loggerProvider = new TestLoggerProvider();
|
||||
_loggerFactory.AddProvider(_loggerProvider);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ConfigureOptions_AppliesValuesCorrectly()
|
||||
{
|
||||
EventHubOptions options = CreateOptions();
|
||||
|
||||
Assert.Equal(123, options.EventProcessorOptions.MaxBatchSize);
|
||||
Assert.Equal(TimeSpan.FromSeconds(33), options.EventProcessorOptions.ReceiveTimeout);
|
||||
Assert.Equal(true, options.EventProcessorOptions.EnableReceiverRuntimeMetric);
|
||||
Assert.Equal(123, options.EventProcessorOptions.PrefetchCount);
|
||||
Assert.Equal(true, options.EventProcessorOptions.InvokeProcessorAfterReceiveTimeout);
|
||||
Assert.Equal(5, options.BatchCheckpointFrequency);
|
||||
Assert.Equal(31, options.PartitionManagerOptions.LeaseDuration.TotalSeconds);
|
||||
Assert.Equal(21, options.PartitionManagerOptions.RenewInterval.TotalSeconds);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ConfigureOptions_Format_Returns_Expected()
|
||||
{
|
||||
EventHubOptions options = CreateOptions();
|
||||
|
||||
string format = options.Format();
|
||||
JObject iObj = JObject.Parse(format);
|
||||
EventHubOptions result = iObj.ToObject<EventHubOptions>();
|
||||
|
||||
Assert.Equal(result.BatchCheckpointFrequency, 5);
|
||||
Assert.Equal(result.EventProcessorOptions.EnableReceiverRuntimeMetric, true);
|
||||
Assert.Equal(result.EventProcessorOptions.InvokeProcessorAfterReceiveTimeout, true);
|
||||
Assert.Equal(result.EventProcessorOptions.MaxBatchSize, 123);
|
||||
Assert.Equal(result.EventProcessorOptions.PrefetchCount, 123);
|
||||
Assert.Equal(result.EventProcessorOptions.ReceiveTimeout, TimeSpan.FromSeconds(33));
|
||||
Assert.Equal(result.PartitionManagerOptions.LeaseDuration, TimeSpan.FromSeconds(31));
|
||||
Assert.Equal(result.PartitionManagerOptions.RenewInterval, TimeSpan.FromSeconds(21));
|
||||
}
|
||||
|
||||
private EventHubOptions CreateOptions()
|
||||
{
|
||||
string extensionPath = "AzureWebJobs:Extensions:EventHubs";
|
||||
var values = new Dictionary<string, string>
|
||||
{
|
||||
{ $"{extensionPath}:EventProcessorOptions:MaxBatchSize", "123" },
|
||||
{ $"{extensionPath}:EventProcessorOptions:ReceiveTimeout", "00:00:33" },
|
||||
{ $"{extensionPath}:EventProcessorOptions:EnableReceiverRuntimeMetric", "true" },
|
||||
{ $"{extensionPath}:EventProcessorOptions:PrefetchCount", "123" },
|
||||
{ $"{extensionPath}:EventProcessorOptions:InvokeProcessorAfterReceiveTimeout", "true" },
|
||||
{ $"{extensionPath}:BatchCheckpointFrequency", "5" },
|
||||
{ $"{extensionPath}:PartitionManagerOptions:LeaseDuration", "00:00:31" },
|
||||
{ $"{extensionPath}:PartitionManagerOptions:RenewInterval", "00:00:21" }
|
||||
};
|
||||
|
||||
return TestHelpers.GetConfiguredOptions<EventHubOptions>(b =>
|
||||
{
|
||||
b.AddEventHubs();
|
||||
}, values);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Initialize_PerformsExpectedRegistrations()
|
||||
{
|
||||
var host = new HostBuilder()
|
||||
.ConfigureDefaultTestHost(builder =>
|
||||
{
|
||||
builder.AddEventHubs();
|
||||
})
|
||||
.ConfigureServices(c =>
|
||||
{
|
||||
c.AddSingleton<INameResolver>(new RandomNameResolver());
|
||||
})
|
||||
.Build();
|
||||
|
||||
IExtensionRegistry extensions = host.Services.GetService<IExtensionRegistry>();
|
||||
|
||||
// ensure the EventHubTriggerAttributeBindingProvider was registered
|
||||
var triggerBindingProviders = extensions.GetExtensions<ITriggerBindingProvider>().ToArray();
|
||||
EventHubTriggerAttributeBindingProvider triggerBindingProvider = triggerBindingProviders.OfType<EventHubTriggerAttributeBindingProvider>().Single();
|
||||
Assert.NotNull(triggerBindingProvider);
|
||||
|
||||
// ensure the EventProcessorOptions ExceptionReceived event is wired up
|
||||
var options = host.Services.GetService<IOptions<EventHubOptions>>().Value;
|
||||
var eventProcessorOptions = options.EventProcessorOptions;
|
||||
var ex = new EventHubsException(false, "Kaboom!");
|
||||
var ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
|
||||
var args = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" });
|
||||
var handler = (Action<ExceptionReceivedEventArgs>)eventProcessorOptions.GetType().GetField("exceptionHandler", BindingFlags.Instance | BindingFlags.NonPublic).GetValue(eventProcessorOptions);
|
||||
handler.Method.Invoke(handler.Target, new object[] { args });
|
||||
|
||||
string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
|
||||
var logMessage = host.GetTestLoggerProvider().GetAllLogMessages().Single();
|
||||
Assert.Equal(LogLevel.Error, logMessage.Level);
|
||||
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
|
||||
Assert.Same(ex, logMessage.Exception);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LogExceptionReceivedEvent_NonTransientEvent_LoggedAsError()
|
||||
{
|
||||
var ex = new EventHubsException(false);
|
||||
Assert.False(ex.IsTransient);
|
||||
var ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
|
||||
var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" });
|
||||
EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory);
|
||||
|
||||
string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
|
||||
var logMessage = _loggerProvider.GetAllLogMessages().Single();
|
||||
Assert.Equal(LogLevel.Error, logMessage.Level);
|
||||
Assert.Same(ex, logMessage.Exception);
|
||||
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LogExceptionReceivedEvent_TransientEvent_LoggedAsVerbose()
|
||||
{
|
||||
var ex = new EventHubsException(true);
|
||||
Assert.True(ex.IsTransient);
|
||||
var ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
|
||||
var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" });
|
||||
EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory);
|
||||
|
||||
string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
|
||||
var logMessage = _loggerProvider.GetAllLogMessages().Single();
|
||||
Assert.Equal(LogLevel.Information, logMessage.Level);
|
||||
Assert.Same(ex, logMessage.Exception);
|
||||
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LogExceptionReceivedEvent_OperationCanceledException_LoggedAsVerbose()
|
||||
{
|
||||
var ex = new OperationCanceledException("Testing");
|
||||
var ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
|
||||
var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" });
|
||||
EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory);
|
||||
|
||||
string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
|
||||
var logMessage = _loggerProvider.GetAllLogMessages().Single();
|
||||
Assert.Equal(LogLevel.Information, logMessage.Level);
|
||||
Assert.Same(ex, logMessage.Exception);
|
||||
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LogExceptionReceivedEvent_NonMessagingException_LoggedAsError()
|
||||
{
|
||||
var ex = new MissingMethodException("What method??");
|
||||
var ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
|
||||
var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" });
|
||||
EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory);
|
||||
|
||||
string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
|
||||
var logMessage = _loggerProvider.GetAllLogMessages().Single();
|
||||
Assert.Equal(LogLevel.Error, logMessage.Level);
|
||||
Assert.Same(ex, logMessage.Exception);
|
||||
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LogExceptionReceivedEvent_PartitionExceptions_LoggedAsInfo()
|
||||
{
|
||||
var ctor = typeof(ReceiverDisconnectedException).GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(string) }, null);
|
||||
var ex = (ReceiverDisconnectedException)ctor.Invoke(new object[] { "New receiver with higher epoch of '30402' is created hence current receiver with epoch '30402' is getting disconnected." });
|
||||
ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
|
||||
var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" });
|
||||
EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory);
|
||||
|
||||
string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
|
||||
var logMessage = _loggerProvider.GetAllLogMessages().Single();
|
||||
Assert.Equal(LogLevel.Information, logMessage.Level);
|
||||
Assert.Same(ex, logMessage.Exception);
|
||||
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,190 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs.Host.Executors;
|
||||
using Microsoft.Azure.WebJobs.Host.TestCommon;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
|
||||
{
|
||||
public class EventHubListenerTests
|
||||
{
|
||||
[Theory]
|
||||
[InlineData(1, 100)]
|
||||
[InlineData(4, 25)]
|
||||
[InlineData(8, 12)]
|
||||
[InlineData(32, 3)]
|
||||
[InlineData(128, 0)]
|
||||
public async Task ProcessEvents_SingleDispatch_CheckpointsCorrectly(int batchCheckpointFrequency, int expected)
|
||||
{
|
||||
var partitionContext = EventHubTests.GetPartitionContext();
|
||||
var checkpoints = 0;
|
||||
var options = new EventHubOptions
|
||||
{
|
||||
BatchCheckpointFrequency = batchCheckpointFrequency
|
||||
};
|
||||
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
|
||||
checkpointer.Setup(p => p.CheckpointAsync(partitionContext)).Callback<PartitionContext>(c =>
|
||||
{
|
||||
checkpoints++;
|
||||
}).Returns(Task.CompletedTask);
|
||||
var loggerMock = new Mock<ILogger>(MockBehavior.Strict);
|
||||
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
|
||||
executor.Setup(p => p.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>())).ReturnsAsync(new FunctionResult(true));
|
||||
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true, checkpointer.Object);
|
||||
|
||||
for (int i = 0; i < 100; i++)
|
||||
{
|
||||
List<EventData> events = new List<EventData>() { new EventData(new byte[0]) };
|
||||
await eventProcessor.ProcessEventsAsync(partitionContext, events);
|
||||
}
|
||||
|
||||
Assert.Equal(expected, checkpoints);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(1, 100)]
|
||||
[InlineData(4, 25)]
|
||||
[InlineData(8, 12)]
|
||||
[InlineData(32, 3)]
|
||||
[InlineData(128, 0)]
|
||||
public async Task ProcessEvents_MultipleDispatch_CheckpointsCorrectly(int batchCheckpointFrequency, int expected)
|
||||
{
|
||||
var partitionContext = EventHubTests.GetPartitionContext();
|
||||
var options = new EventHubOptions
|
||||
{
|
||||
BatchCheckpointFrequency = batchCheckpointFrequency
|
||||
};
|
||||
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
|
||||
checkpointer.Setup(p => p.CheckpointAsync(partitionContext)).Returns(Task.CompletedTask);
|
||||
var loggerMock = new Mock<ILogger>(MockBehavior.Strict);
|
||||
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
|
||||
executor.Setup(p => p.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>())).ReturnsAsync(new FunctionResult(true));
|
||||
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, false, checkpointer.Object);
|
||||
|
||||
for (int i = 0; i < 100; i++)
|
||||
{
|
||||
List<EventData> events = new List<EventData>() { new EventData(new byte[0]), new EventData(new byte[0]), new EventData(new byte[0]) };
|
||||
await eventProcessor.ProcessEventsAsync(partitionContext, events);
|
||||
}
|
||||
|
||||
checkpointer.Verify(p => p.CheckpointAsync(partitionContext), Times.Exactly(expected));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Even if some events in a batch fail, we still checkpoint. Event error handling
|
||||
/// is the responsiblity of user function code.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
[Fact]
|
||||
public async Task ProcessEvents_Failure_Checkpoints()
|
||||
{
|
||||
var partitionContext = EventHubTests.GetPartitionContext();
|
||||
var options = new EventHubOptions();
|
||||
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
|
||||
checkpointer.Setup(p => p.CheckpointAsync(partitionContext)).Returns(Task.CompletedTask);
|
||||
|
||||
List<EventData> events = new List<EventData>();
|
||||
List<FunctionResult> results = new List<FunctionResult>();
|
||||
for (int i = 0; i < 10; i++)
|
||||
{
|
||||
events.Add(new EventData(new byte[0]));
|
||||
var succeeded = i > 7 ? false : true;
|
||||
results.Add(new FunctionResult(succeeded));
|
||||
}
|
||||
|
||||
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
|
||||
int execution = 0;
|
||||
executor.Setup(p => p.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>())).ReturnsAsync(() =>
|
||||
{
|
||||
var result = results[execution++];
|
||||
return result;
|
||||
});
|
||||
var loggerMock = new Mock<ILogger>(MockBehavior.Strict);
|
||||
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true, checkpointer.Object);
|
||||
|
||||
await eventProcessor.ProcessEventsAsync(partitionContext, events);
|
||||
|
||||
checkpointer.Verify(p => p.CheckpointAsync(partitionContext), Times.Once);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CloseAsync_Shutdown_DoesNotCheckpoint()
|
||||
{
|
||||
var partitionContext = EventHubTests.GetPartitionContext();
|
||||
var options = new EventHubOptions();
|
||||
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
|
||||
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
|
||||
var loggerMock = new Mock<ILogger>(MockBehavior.Strict);
|
||||
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, loggerMock.Object, true, checkpointer.Object);
|
||||
|
||||
await eventProcessor.CloseAsync(partitionContext, CloseReason.Shutdown);
|
||||
|
||||
checkpointer.Verify(p => p.CheckpointAsync(partitionContext), Times.Never);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ProcessErrorsAsync_LoggedAsError()
|
||||
{
|
||||
var partitionContext = EventHubTests.GetPartitionContext(partitionId: "123", eventHubPath: "abc", owner: "def");
|
||||
var options = new EventHubOptions();
|
||||
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
|
||||
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
|
||||
var testLogger = new TestLogger("Test");
|
||||
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true, checkpointer.Object);
|
||||
|
||||
var ex = new InvalidOperationException("My InvalidOperationException!");
|
||||
|
||||
await eventProcessor.ProcessErrorAsync(partitionContext, ex);
|
||||
var msg = testLogger.GetLogMessages().Single();
|
||||
Assert.Equal("Error processing event from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'", msg.FormattedMessage);
|
||||
Assert.IsType<InvalidOperationException>(msg.Exception);
|
||||
Assert.Equal(LogLevel.Error, msg.Level);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ProcessErrorsAsync_RebalancingExceptions_LoggedAsInformation()
|
||||
{
|
||||
var partitionContext = EventHubTests.GetPartitionContext(partitionId: "123", eventHubPath: "abc", owner: "def");
|
||||
var options = new EventHubOptions();
|
||||
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
|
||||
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
|
||||
var testLogger = new TestLogger("Test");
|
||||
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true, checkpointer.Object);
|
||||
|
||||
// ctor is private
|
||||
var constructor = typeof(ReceiverDisconnectedException)
|
||||
.GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(string) }, null);
|
||||
ReceiverDisconnectedException disconnectedEx = (ReceiverDisconnectedException)constructor.Invoke(new[] { "My ReceiverDisconnectedException!" });
|
||||
|
||||
await eventProcessor.ProcessErrorAsync(partitionContext, disconnectedEx);
|
||||
var msg = testLogger.GetLogMessages().Single();
|
||||
Assert.Equal("An Event Hub exception of type 'ReceiverDisconnectedException' was thrown from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.", msg.FormattedMessage);
|
||||
Assert.Null(msg.Exception);
|
||||
Assert.Equal(LogLevel.Information, msg.Level);
|
||||
|
||||
testLogger.ClearLogMessages();
|
||||
|
||||
// ctor is private
|
||||
constructor = typeof(LeaseLostException)
|
||||
.GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(string), typeof(Exception) }, null);
|
||||
LeaseLostException leaseLostEx = (LeaseLostException)constructor.Invoke(new object[] { "My LeaseLostException!", new Exception() });
|
||||
|
||||
await eventProcessor.ProcessErrorAsync(partitionContext, leaseLostEx);
|
||||
msg = testLogger.GetLogMessages().Single();
|
||||
Assert.Equal("An Event Hub exception of type 'LeaseLostException' was thrown from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.", msg.FormattedMessage);
|
||||
Assert.Null(msg.Exception);
|
||||
Assert.Equal(LogLevel.Information, msg.Level);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,286 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Reflection;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.EventHubs.Processor;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Azure.WebJobs.Host.TestCommon;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Xunit;
|
||||
using static Microsoft.Azure.EventHubs.EventData;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
|
||||
{
|
||||
public class EventHubTests
|
||||
{
|
||||
[Fact]
|
||||
public void GetStaticBindingContract_ReturnsExpectedValue()
|
||||
{
|
||||
var strategy = new EventHubTriggerBindingStrategy();
|
||||
var contract = strategy.GetBindingContract();
|
||||
|
||||
Assert.Equal(7, contract.Count);
|
||||
Assert.Equal(typeof(PartitionContext), contract["PartitionContext"]);
|
||||
Assert.Equal(typeof(string), contract["Offset"]);
|
||||
Assert.Equal(typeof(long), contract["SequenceNumber"]);
|
||||
Assert.Equal(typeof(DateTime), contract["EnqueuedTimeUtc"]);
|
||||
Assert.Equal(typeof(IDictionary<string, object>), contract["Properties"]);
|
||||
Assert.Equal(typeof(IDictionary<string, object>), contract["SystemProperties"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetBindingContract_SingleDispatch_ReturnsExpectedValue()
|
||||
{
|
||||
var strategy = new EventHubTriggerBindingStrategy();
|
||||
var contract = strategy.GetBindingContract(true);
|
||||
|
||||
Assert.Equal(7, contract.Count);
|
||||
Assert.Equal(typeof(PartitionContext), contract["PartitionContext"]);
|
||||
Assert.Equal(typeof(string), contract["Offset"]);
|
||||
Assert.Equal(typeof(long), contract["SequenceNumber"]);
|
||||
Assert.Equal(typeof(DateTime), contract["EnqueuedTimeUtc"]);
|
||||
Assert.Equal(typeof(IDictionary<string, object>), contract["Properties"]);
|
||||
Assert.Equal(typeof(IDictionary<string, object>), contract["SystemProperties"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetBindingContract_MultipleDispatch_ReturnsExpectedValue()
|
||||
{
|
||||
var strategy = new EventHubTriggerBindingStrategy();
|
||||
var contract = strategy.GetBindingContract(false);
|
||||
|
||||
Assert.Equal(7, contract.Count);
|
||||
Assert.Equal(typeof(PartitionContext), contract["PartitionContext"]);
|
||||
Assert.Equal(typeof(string[]), contract["PartitionKeyArray"]);
|
||||
Assert.Equal(typeof(string[]), contract["OffsetArray"]);
|
||||
Assert.Equal(typeof(long[]), contract["SequenceNumberArray"]);
|
||||
Assert.Equal(typeof(DateTime[]), contract["EnqueuedTimeUtcArray"]);
|
||||
Assert.Equal(typeof(IDictionary<string, object>[]), contract["PropertiesArray"]);
|
||||
Assert.Equal(typeof(IDictionary<string, object>[]), contract["SystemPropertiesArray"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetBindingData_SingleDispatch_ReturnsExpectedValue()
|
||||
{
|
||||
var evt = new EventData(new byte[] { });
|
||||
IDictionary<string, object> sysProps = GetSystemProperties();
|
||||
|
||||
TestHelpers.SetField(evt, "SystemProperties", sysProps);
|
||||
|
||||
var input = EventHubTriggerInput.New(evt);
|
||||
input.PartitionContext = GetPartitionContext();
|
||||
|
||||
var strategy = new EventHubTriggerBindingStrategy();
|
||||
var bindingData = strategy.GetBindingData(input);
|
||||
|
||||
Assert.Equal(7, bindingData.Count);
|
||||
Assert.Same(input.PartitionContext, bindingData["PartitionContext"]);
|
||||
Assert.Equal(evt.SystemProperties.PartitionKey, bindingData["PartitionKey"]);
|
||||
Assert.Equal(evt.SystemProperties.Offset, bindingData["Offset"]);
|
||||
Assert.Equal(evt.SystemProperties.SequenceNumber, bindingData["SequenceNumber"]);
|
||||
Assert.Equal(evt.SystemProperties.EnqueuedTimeUtc, bindingData["EnqueuedTimeUtc"]);
|
||||
Assert.Same(evt.Properties, bindingData["Properties"]);
|
||||
IDictionary<string, object> bindingDataSysProps = bindingData["SystemProperties"] as Dictionary<string, object>;
|
||||
Assert.NotNull(bindingDataSysProps);
|
||||
Assert.Equal(bindingDataSysProps["PartitionKey"], bindingData["PartitionKey"]);
|
||||
Assert.Equal(bindingDataSysProps["Offset"], bindingData["Offset"]);
|
||||
Assert.Equal(bindingDataSysProps["SequenceNumber"], bindingData["SequenceNumber"]);
|
||||
Assert.Equal(bindingDataSysProps["EnqueuedTimeUtc"], bindingData["EnqueuedTimeUtc"]);
|
||||
Assert.Equal(bindingDataSysProps["iothub-connection-device-id"], "testDeviceId");
|
||||
Assert.Equal(bindingDataSysProps["iothub-enqueuedtime"], DateTime.MinValue);
|
||||
}
|
||||
|
||||
private static IDictionary<string, object> GetSystemProperties(string partitionKey = "TestKey")
|
||||
{
|
||||
long testSequence = 4294967296;
|
||||
IDictionary<string, object> sysProps = TestHelpers.New<SystemPropertiesCollection>();
|
||||
sysProps["x-opt-partition-key"] = partitionKey;
|
||||
sysProps["x-opt-offset"] = "TestOffset";
|
||||
sysProps["x-opt-enqueued-time"] = DateTime.MinValue;
|
||||
sysProps["x-opt-sequence-number"] = testSequence;
|
||||
sysProps["iothub-connection-device-id"] = "testDeviceId";
|
||||
sysProps["iothub-enqueuedtime"] = DateTime.MinValue;
|
||||
return sysProps;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetBindingData_MultipleDispatch_ReturnsExpectedValue()
|
||||
{
|
||||
|
||||
var events = new EventData[3]
|
||||
{
|
||||
new EventData(Encoding.UTF8.GetBytes("Event 1")),
|
||||
new EventData(Encoding.UTF8.GetBytes("Event 2")),
|
||||
new EventData(Encoding.UTF8.GetBytes("Event 3")),
|
||||
};
|
||||
|
||||
var count = 0;
|
||||
foreach (var evt in events)
|
||||
{
|
||||
IDictionary<string, object> sysProps = GetSystemProperties($"pk{count++}");
|
||||
TestHelpers.SetField(evt, "SystemProperties", sysProps);
|
||||
}
|
||||
|
||||
var input = new EventHubTriggerInput
|
||||
{
|
||||
Events = events,
|
||||
PartitionContext = GetPartitionContext(),
|
||||
};
|
||||
var strategy = new EventHubTriggerBindingStrategy();
|
||||
var bindingData = strategy.GetBindingData(input);
|
||||
|
||||
Assert.Equal(7, bindingData.Count);
|
||||
Assert.Same(input.PartitionContext, bindingData["PartitionContext"]);
|
||||
|
||||
// verify an array was created for each binding data type
|
||||
Assert.Equal(events.Length, ((string[])bindingData["PartitionKeyArray"]).Length);
|
||||
Assert.Equal(events.Length, ((string[])bindingData["OffsetArray"]).Length);
|
||||
Assert.Equal(events.Length, ((long[])bindingData["SequenceNumberArray"]).Length);
|
||||
Assert.Equal(events.Length, ((DateTime[])bindingData["EnqueuedTimeUtcArray"]).Length);
|
||||
Assert.Equal(events.Length, ((IDictionary<string, object>[])bindingData["PropertiesArray"]).Length);
|
||||
Assert.Equal(events.Length, ((IDictionary<string, object>[])bindingData["SystemPropertiesArray"]).Length);
|
||||
|
||||
Assert.Equal(events[0].SystemProperties.PartitionKey, ((string[])bindingData["PartitionKeyArray"])[0]);
|
||||
Assert.Equal(events[1].SystemProperties.PartitionKey, ((string[])bindingData["PartitionKeyArray"])[1]);
|
||||
Assert.Equal(events[2].SystemProperties.PartitionKey, ((string[])bindingData["PartitionKeyArray"])[2]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TriggerStrategy()
|
||||
{
|
||||
string data = "123";
|
||||
|
||||
var strategy = new EventHubTriggerBindingStrategy();
|
||||
EventHubTriggerInput triggerInput = strategy.ConvertFromString(data);
|
||||
|
||||
var contract = strategy.GetBindingData(triggerInput);
|
||||
|
||||
EventData single = strategy.BindSingle(triggerInput, null);
|
||||
string body = Encoding.UTF8.GetString(single.Body.Array);
|
||||
|
||||
Assert.Equal(data, body);
|
||||
Assert.Null(contract["PartitionContext"]);
|
||||
Assert.Null(contract["partitioncontext"]); // case insensitive
|
||||
}
|
||||
|
||||
// Validate that if connection string has EntityPath, that takes precedence over the parameter.
|
||||
[Theory]
|
||||
[InlineData("k1", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey")]
|
||||
[InlineData("path2", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2")]
|
||||
public void EntityPathInConnectionString(string expectedPathName, string connectionString)
|
||||
{
|
||||
EventHubOptions options = new EventHubOptions();
|
||||
|
||||
// Test sender
|
||||
options.AddSender("k1", connectionString);
|
||||
var client = options.GetEventHubClient("k1", null);
|
||||
Assert.Equal(expectedPathName, client.EventHubName);
|
||||
}
|
||||
|
||||
// Validate that if connection string has EntityPath, that takes precedence over the parameter.
|
||||
[Theory]
|
||||
[InlineData("k1", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey")]
|
||||
[InlineData("path2", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2")]
|
||||
public void GetEventHubClient_AddsConnection(string expectedPathName, string connectionString)
|
||||
{
|
||||
EventHubOptions options = new EventHubOptions();
|
||||
var client = options.GetEventHubClient("k1", connectionString);
|
||||
Assert.Equal(expectedPathName, client.EventHubName);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("e", "n1", "n1/e/")]
|
||||
[InlineData("e--1", "host_.path.foo", "host_.path.foo/e--1/")]
|
||||
[InlineData("Ab", "Cd", "cd/ab/")]
|
||||
[InlineData("A=", "Cd", "cd/a:3D/")]
|
||||
[InlineData("A:", "Cd", "cd/a:3A/")]
|
||||
public void EventHubBlobPrefix(string eventHubName, string serviceBusNamespace, string expected)
|
||||
{
|
||||
string actual = EventHubOptions.GetBlobPrefix(eventHubName, serviceBusNamespace);
|
||||
Assert.Equal(expected, actual);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(1)]
|
||||
[InlineData(5)]
|
||||
[InlineData(200)]
|
||||
public void EventHubBatchCheckpointFrequency(int num)
|
||||
{
|
||||
var options = new EventHubOptions();
|
||||
options.BatchCheckpointFrequency = num;
|
||||
Assert.Equal(num, options.BatchCheckpointFrequency);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(-1)]
|
||||
[InlineData(0)]
|
||||
public void EventHubBatchCheckpointFrequency_Throws(int num)
|
||||
{
|
||||
var options = new EventHubOptions();
|
||||
Assert.Throws<InvalidOperationException>(() => options.BatchCheckpointFrequency = num);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void InitializeFromHostMetadata()
|
||||
{
|
||||
// TODO: It's tough to wire all this up without using a new host.
|
||||
IHost host = new HostBuilder()
|
||||
.ConfigureDefaultTestHost(builder =>
|
||||
{
|
||||
builder.AddEventHubs();
|
||||
})
|
||||
.ConfigureAppConfiguration(c =>
|
||||
{
|
||||
c.AddInMemoryCollection(new Dictionary<string, string>
|
||||
{
|
||||
{ "AzureWebJobs:extensions:EventHubs:EventProcessorOptions:MaxBatchSize", "100" },
|
||||
{ "AzureWebJobs:extensions:EventHubs:EventProcessorOptions:PrefetchCount", "200" },
|
||||
{ "AzureWebJobs:extensions:EventHubs:BatchCheckpointFrequency", "5" },
|
||||
{ "AzureWebJobs:extensions:EventHubs:PartitionManagerOptions:LeaseDuration", "00:00:31" },
|
||||
{ "AzureWebJobs:extensions:EventHubs:PartitionManagerOptions:RenewInterval", "00:00:21" }
|
||||
});
|
||||
})
|
||||
.Build();
|
||||
|
||||
// Force the ExtensionRegistryFactory to run, which will initialize the EventHubConfiguration.
|
||||
var extensionRegistry = host.Services.GetService<IExtensionRegistry>();
|
||||
var options = host.Services.GetService<IOptions<EventHubOptions>>().Value;
|
||||
|
||||
var eventProcessorOptions = options.EventProcessorOptions;
|
||||
Assert.Equal(100, eventProcessorOptions.MaxBatchSize);
|
||||
Assert.Equal(200, eventProcessorOptions.PrefetchCount);
|
||||
Assert.Equal(5, options.BatchCheckpointFrequency);
|
||||
Assert.Equal(31, options.PartitionManagerOptions.LeaseDuration.TotalSeconds);
|
||||
Assert.Equal(21, options.PartitionManagerOptions.RenewInterval.TotalSeconds);
|
||||
}
|
||||
|
||||
internal static PartitionContext GetPartitionContext(string partitionId = null, string eventHubPath = null,
|
||||
string consumerGroupName = null, string owner = null)
|
||||
{
|
||||
var constructor = typeof(PartitionContext).GetConstructor(
|
||||
BindingFlags.NonPublic | BindingFlags.Instance,
|
||||
null,
|
||||
new Type[] { typeof(EventProcessorHost), typeof(string), typeof(string), typeof(string), typeof(CancellationToken) },
|
||||
null);
|
||||
var context = (PartitionContext)constructor.Invoke(new object[] { null, partitionId, eventHubPath, consumerGroupName, null });
|
||||
|
||||
// Set a lease, which allows us to grab the "Owner"
|
||||
constructor = typeof(Lease).GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { }, null);
|
||||
var lease = (Lease)constructor.Invoke(new object[] { });
|
||||
lease.Owner = owner;
|
||||
|
||||
var leaseProperty = typeof(PartitionContext).GetProperty("Lease", BindingFlags.NonPublic | BindingFlags.Instance);
|
||||
leaseProperty.SetValue(context, lease);
|
||||
|
||||
return context;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System.Reflection;
|
||||
using Xunit;
|
||||
|
||||
[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)]
|
|
@ -1,28 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using Microsoft.Azure.WebJobs.Host.TestCommon;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.Host.UnitTests
|
||||
{
|
||||
public class PublicSurfaceTests
|
||||
{
|
||||
[Fact]
|
||||
public void WebJobs_Extensions_EventHubs_VerifyPublicSurfaceArea()
|
||||
{
|
||||
var assembly = typeof(EventHubAttribute).Assembly;
|
||||
|
||||
var expected = new[]
|
||||
{
|
||||
"EventHubAttribute",
|
||||
"EventHubTriggerAttribute",
|
||||
"EventHubOptions",
|
||||
"EventHubWebJobsBuilderExtensions",
|
||||
"EventHubsWebJobsStartup"
|
||||
};
|
||||
|
||||
TestHelpers.AssertPublicTypes(expected, assembly);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<Import Project="..\..\build\common.props" />
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp2.1</TargetFramework>
|
||||
<RootNamespace>Microsoft.Azure.WebJobs.EventHubs.UnitTests</RootNamespace>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
|
||||
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<WarningsAsErrors />
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
|
||||
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<WarningsAsErrors />
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
</PackageReference>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
|
||||
<PackageReference Include="Moq" Version="4.7.145" />
|
||||
<PackageReference Include="xunit" Version="2.3.1" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
|
||||
<DotNetCliToolReference Include="dotnet-xunit" Version="2.3.0-beta3-build3705" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Extensions.EventHubs\WebJobs.Extensions.EventHubs.csproj" />
|
||||
<ProjectReference Include="..\Microsoft.Azure.WebJobs.Host.TestCommon\WebJobs.Host.TestCommon.csproj" />
|
||||
</ItemGroup>
|
||||
</Project>
|
|
@ -1,215 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Reflection;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.WebJobs.EventHubs;
|
||||
using Microsoft.Azure.WebJobs.Host.TestCommon;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
|
||||
{
|
||||
public class EventHubEndToEndTests
|
||||
{
|
||||
private const string TestHubName = "webjobstesthub";
|
||||
private const int Timeout = 30000;
|
||||
private static EventWaitHandle _eventWait;
|
||||
private static string _testId;
|
||||
private static List<string> _results;
|
||||
|
||||
public EventHubEndToEndTests()
|
||||
{
|
||||
_results = new List<string>();
|
||||
_testId = Guid.NewGuid().ToString();
|
||||
_eventWait = new ManualResetEvent(initialState: false);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task EventHub_SingleDispatch()
|
||||
{
|
||||
using (JobHost host = BuildHost<EventHubTestSingleDispatchJobs>())
|
||||
{
|
||||
var method = typeof(EventHubTestSingleDispatchJobs).GetMethod("SendEvent_TestHub", BindingFlags.Static | BindingFlags.Public);
|
||||
var id = Guid.NewGuid().ToString();
|
||||
await host.CallAsync(method, new { input = _testId });
|
||||
|
||||
bool result = _eventWait.WaitOne(Timeout);
|
||||
Assert.True(result);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task EventHub_MultipleDispatch()
|
||||
{
|
||||
using (JobHost host = BuildHost<EventHubTestMultipleDispatchJobs>())
|
||||
{
|
||||
// send some events BEFORE starting the host, to ensure
|
||||
// the events are received in batch
|
||||
var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public);
|
||||
int numEvents = 5;
|
||||
await host.CallAsync(method, new { numEvents = numEvents, input = _testId });
|
||||
|
||||
bool result = _eventWait.WaitOne(Timeout);
|
||||
Assert.True(result);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task EventHub_PartitionKey()
|
||||
{
|
||||
using (JobHost host = BuildHost<EventHubParitionKeyTestJobs>())
|
||||
{
|
||||
var method = typeof(EventHubParitionKeyTestJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public);
|
||||
_eventWait = new ManualResetEvent(initialState: false);
|
||||
await host.CallAsync(method, new { input = _testId });
|
||||
|
||||
bool result = _eventWait.WaitOne(Timeout);
|
||||
|
||||
Assert.True(result);
|
||||
}
|
||||
}
|
||||
|
||||
public class EventHubTestSingleDispatchJobs
|
||||
{
|
||||
public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt)
|
||||
{
|
||||
evt = new EventData(Encoding.UTF8.GetBytes(input));
|
||||
evt.Properties.Add("TestProp1", "value1");
|
||||
evt.Properties.Add("TestProp2", "value2");
|
||||
}
|
||||
|
||||
|
||||
public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt,
|
||||
string partitionKey, DateTime enqueuedTimeUtc, IDictionary<string, object> properties,
|
||||
IDictionary<string, object> systemProperties)
|
||||
{
|
||||
// filter for the ID the current test is using
|
||||
if (evt == _testId)
|
||||
{
|
||||
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);
|
||||
|
||||
Assert.Equal("value1", properties["TestProp1"]);
|
||||
Assert.Equal("value2", properties["TestProp2"]);
|
||||
|
||||
_eventWait.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class EventHubTestMultipleDispatchJobs
|
||||
{
|
||||
|
||||
public static void SendEvents_TestHub(int numEvents, string input, [EventHub(TestHubName)] out EventData[] events)
|
||||
{
|
||||
events = new EventData[numEvents];
|
||||
for (int i = 0; i < numEvents; i++)
|
||||
{
|
||||
var evt = new EventData(Encoding.UTF8.GetBytes(input));
|
||||
evt.Properties.Add("TestIndex", i);
|
||||
evt.Properties.Add("TestProp1", "value1");
|
||||
evt.Properties.Add("TestProp2", "value2");
|
||||
events[i] = evt;
|
||||
}
|
||||
}
|
||||
|
||||
public static void ProcessMultipleEvents([EventHubTrigger(TestHubName)] string[] events,
|
||||
string[] partitionKeyArray, DateTime[] enqueuedTimeUtcArray, IDictionary<string, object>[] propertiesArray,
|
||||
IDictionary<string, object>[] systemPropertiesArray)
|
||||
{
|
||||
Assert.Equal(events.Length, partitionKeyArray.Length);
|
||||
Assert.Equal(events.Length, enqueuedTimeUtcArray.Length);
|
||||
Assert.Equal(events.Length, propertiesArray.Length);
|
||||
Assert.Equal(events.Length, systemPropertiesArray.Length);
|
||||
|
||||
for (int i = 0; i < events.Length; i++)
|
||||
{
|
||||
Assert.Equal(i, propertiesArray[i]["TestIndex"]);
|
||||
}
|
||||
|
||||
// filter for the ID the current test is using
|
||||
if (events[0] == _testId)
|
||||
{
|
||||
_results.AddRange(events);
|
||||
_eventWait.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class EventHubParitionKeyTestJobs
|
||||
{
|
||||
public static async Task SendEvents_TestHub(
|
||||
string input,
|
||||
[EventHub(TestHubName)] EventHubClient client)
|
||||
{
|
||||
List<EventData> list = new List<EventData>();
|
||||
EventData evt = new EventData(Encoding.UTF8.GetBytes(input));
|
||||
|
||||
// Send event without PK
|
||||
await client.SendAsync(evt);
|
||||
|
||||
// Send event with different PKs
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
evt = new EventData(Encoding.UTF8.GetBytes(input));
|
||||
await client.SendAsync(evt, "test_pk" + i);
|
||||
}
|
||||
}
|
||||
|
||||
public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)] EventData[] events)
|
||||
{
|
||||
foreach (EventData eventData in events)
|
||||
{
|
||||
string message = Encoding.UTF8.GetString(eventData.Body);
|
||||
|
||||
// filter for the ID the current test is using
|
||||
if (message == _testId)
|
||||
{
|
||||
_results.Add(eventData.SystemProperties.PartitionKey);
|
||||
_results.Sort();
|
||||
|
||||
if (_results.Count == 6 && _results[5] == "test_pk4")
|
||||
{
|
||||
_eventWait.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private JobHost BuildHost<T>()
|
||||
{
|
||||
JobHost jobHost = null;
|
||||
|
||||
var config = new ConfigurationBuilder()
|
||||
.AddEnvironmentVariables()
|
||||
.AddTestSettings()
|
||||
.Build();
|
||||
|
||||
string connection = config.GetConnectionStringOrSetting("AzureWebJobsTestHubConnection");
|
||||
Assert.True(!string.IsNullOrEmpty(connection), "Required test connection string is missing.");
|
||||
|
||||
var host = new HostBuilder()
|
||||
.ConfigureDefaultTestHost<T>(b =>
|
||||
{
|
||||
b.AddEventHubs(options =>
|
||||
{
|
||||
options.AddSender(TestHubName, connection);
|
||||
options.AddReceiver(TestHubName, connection);
|
||||
});
|
||||
})
|
||||
.Build();
|
||||
|
||||
jobHost = host.GetJobHost();
|
||||
jobHost.StartAsync().GetAwaiter().GetResult();
|
||||
|
||||
return jobHost;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -38,7 +38,6 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Extensions.EventHubs\WebJobs.Extensions.EventHubs.csproj" />
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Extensions.Storage\WebJobs.Extensions.Storage.csproj" />
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Host.Storage\WebJobs.Host.Storage.csproj" />
|
||||
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Host\WebJobs.Host.csproj" />
|
||||
|
|
Загрузка…
Ссылка в новой задаче