зеркало из https://github.com/Azure/iotedge.git
Refactor Modules (#845)
* Refactor to reuse commond logic. * create a new ModuleUtil project and fix some warnings in Leaf device * make sure all module use ShutdownHandler. * Add Load Gen and Message Analyzer in Windows images build * Update NuGet for loadgen * add a space in function sample name * remove module user from direct method receiver docker file
This commit is contained in:
Родитель
c2d5568b77
Коммит
587a9adc94
|
@ -167,6 +167,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DirectMethodSender", "edge-
|
|||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DirectMethodReceiver", "edge-modules\DirectMethodReceiver\DirectMethodReceiver.csproj", "{013D53C7-3AB5-41A4-9A8D-0F2C47238773}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.ModuleUtil", "edge-modules\ModuleLib\Microsoft.Azure.Devices.Edge.ModuleUtil.csproj", "{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
CheckInBuild|Any CPU = CheckInBuild|Any CPU
|
||||
|
@ -527,6 +529,14 @@ Global
|
|||
{013D53C7-3AB5-41A4-9A8D-0F2C47238773}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{013D53C7-3AB5-41A4-9A8D-0F2C47238773}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{013D53C7-3AB5-41A4-9A8D-0F2C47238773}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.CheckInBuild|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.CheckInBuild|Any CPU.Build.0 = Debug|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.CodeCoverage|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.CodeCoverage|Any CPU.Build.0 = Debug|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
|
@ -593,6 +603,7 @@ Global
|
|||
{9BED8F14-63E9-4B4B-88F7-659D5E522CBD} = {63969606-14B2-4D9D-AB72-A5D60D22037C}
|
||||
{029174D7-588D-4D29-BF3D-4A414B070C39} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
|
||||
{013D53C7-3AB5-41A4-9A8D-0F2C47238773} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
|
||||
{A58633ED-5302-41DF-A0F6-FDC48E5C6B04} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {D71830F5-3AF5-46B4-8A9E-1DCE4F2253AC}
|
||||
|
|
|
@ -73,7 +73,7 @@ jobs:
|
|||
# Functions Sample
|
||||
- template: templates/image-linux.yaml
|
||||
parameters:
|
||||
name: FunctionsSample
|
||||
name: Functions Sample
|
||||
imageName: azureiotedge-functions-filter
|
||||
project: EdgeHubTriggerCSharp
|
||||
|
||||
|
@ -145,10 +145,24 @@ jobs:
|
|||
imageName: azureiotedge-temperature-filter
|
||||
project: TemperatureFilter
|
||||
|
||||
# Load Gen
|
||||
- template: templates/image-windows.yaml
|
||||
parameters:
|
||||
name: Load Gen
|
||||
imageName: azureiotedge-load-gen
|
||||
project: load-gen
|
||||
|
||||
# Messages Analyzer
|
||||
- template: templates/image-windows.yaml
|
||||
parameters:
|
||||
name: Messages Analyzer
|
||||
imageName: azureiotedge-analyzer
|
||||
project: MessagesAnalyzer
|
||||
|
||||
# Functions Sample
|
||||
- template: templates/image-windows.yaml
|
||||
parameters:
|
||||
name: FunctionsSample
|
||||
name: Functions Sample
|
||||
imageName: azureiotedge-functions-filter
|
||||
project: EdgeHubTriggerCSharp
|
||||
arm32v7: 'false'
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
<PackageReference Include="Antlr4" Version="4.6.1-beta002" />
|
||||
<PackageReference Include="App.Metrics" Version="3.0.0-alpha-0780" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.1.1" />
|
||||
<PackageReference Include="System.Collections.Immutable" Version="1.3.1" />
|
||||
<PackageReference Include="System.Collections.Immutable" Version="1.5.0" />
|
||||
<PackageReference Include="System.Reflection.Extensions" Version="4.3.0" />
|
||||
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.3.0" />
|
||||
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.7.0" />
|
||||
|
|
|
@ -38,6 +38,7 @@
|
|||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
|
||||
<ProjectReference Include="..\ModuleLib\Microsoft.Azure.Devices.Edge.ModuleUtil.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
@ -7,9 +7,4 @@ WORKDIR /app
|
|||
|
||||
COPY $EXE_DIR/ ./
|
||||
|
||||
USER ContainerAdministrator
|
||||
# Add an unprivileged user account for running the module
|
||||
RUN net user /add moduleuser
|
||||
|
||||
USER moduleuser
|
||||
CMD ["dotnet", "DirectMethodReceiver.dll"]
|
|
@ -2,33 +2,25 @@
|
|||
namespace DirectMethodReceiver
|
||||
{
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Runtime.Loader;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
|
||||
using Microsoft.Azure.Devices.Edge.Util;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ModuleLib;
|
||||
|
||||
class Program
|
||||
{
|
||||
public static int Main() => MainAsync().Result;
|
||||
static readonly ILogger Logger = ModuleUtil.CreateLogger("DirectMethodReceiver");
|
||||
|
||||
/// <summary>
|
||||
/// Handles cleanup operations when app is cancelled or unloads
|
||||
/// </summary>
|
||||
public static Task WhenCancelled(CancellationToken cancellationToken)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<bool>();
|
||||
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
|
||||
return tcs.Task;
|
||||
}
|
||||
public static int Main() => MainAsync().Result;
|
||||
|
||||
static async Task<int> MainAsync()
|
||||
{
|
||||
Console.WriteLine($"[{DateTime.UtcNow.ToString("MM/dd/yyyy hh:mm:ss.fff tt", CultureInfo.InvariantCulture)}] Main()");
|
||||
Logger.LogInformation("DirectMethodReceiver Main() started.");
|
||||
|
||||
IConfiguration configuration = new ConfigurationBuilder()
|
||||
.SetBasePath(Directory.GetCurrentDirectory())
|
||||
|
@ -37,45 +29,27 @@ namespace DirectMethodReceiver
|
|||
.Build();
|
||||
|
||||
TransportType transportType = configuration.GetValue("ClientTransportType", TransportType.Amqp_Tcp_Only);
|
||||
Console.WriteLine($"Using transport {transportType.ToString()}");
|
||||
ModuleClient moduleClient = await ModuleUtil.CreateModuleClientAsync(
|
||||
transportType,
|
||||
ModuleUtil.DefaultTimeoutErrorDetectionStrategy,
|
||||
ModuleUtil.DefaultTransientRetryStrategy,
|
||||
Logger);
|
||||
|
||||
await InitModuleClient(transportType);
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), null);
|
||||
|
||||
// Wait until the app unloads or is cancelled
|
||||
var cts = new CancellationTokenSource();
|
||||
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
|
||||
Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
|
||||
await WhenCancelled(cts.Token);
|
||||
await moduleClient.OpenAsync();
|
||||
await moduleClient.SetMethodHandlerAsync("HelloWorldMethod", HelloWorldMethodAsync, null, cts.Token);
|
||||
await cts.Token.WhenCanceled();
|
||||
|
||||
completed.Set();
|
||||
handler.ForEach(h => GC.KeepAlive(h));
|
||||
Logger.LogInformation("DirectMethodReceiver Main() finished.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static async Task InitModuleClient(TransportType transportType)
|
||||
static Task<MethodResponse> HelloWorldMethodAsync(MethodRequest methodRequest, object userContext)
|
||||
{
|
||||
ITransportSettings[] GetTransportSettings()
|
||||
{
|
||||
switch (transportType)
|
||||
{
|
||||
case TransportType.Mqtt:
|
||||
case TransportType.Mqtt_Tcp_Only:
|
||||
case TransportType.Mqtt_WebSocket_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(transportType) };
|
||||
default:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(transportType) };
|
||||
}
|
||||
}
|
||||
|
||||
ITransportSettings[] settings = GetTransportSettings();
|
||||
|
||||
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
|
||||
await moduleClient.OpenAsync();
|
||||
await moduleClient.SetMethodHandlerAsync("HelloWorldMethod", HelloWorldMethod, null);
|
||||
|
||||
Console.WriteLine("Successfully initialized module client.");
|
||||
}
|
||||
|
||||
static Task<MethodResponse> HelloWorldMethod(MethodRequest methodRequest, object userContext)
|
||||
{
|
||||
Console.WriteLine("Received direct method call...");
|
||||
Logger.LogInformation("Received direct method call.");
|
||||
return Task.FromResult(new MethodResponse((int)HttpStatusCode.OK));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@
|
|||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
|
||||
<ProjectReference Include="..\ModuleLib\Microsoft.Azure.Devices.Edge.ModuleUtil.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"EdgeHubConnectionString": "<YourConnectionString>",
|
||||
"TargetModuleId": "DirectMethodReceiver",
|
||||
"DMDelay": "00:00:05"
|
||||
"DirectMethodDelay": "00:00:05"
|
||||
}
|
|
@ -2,24 +2,26 @@
|
|||
namespace DirectMethodSender
|
||||
{
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
|
||||
using Microsoft.Azure.Devices.Edge.Util;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ModuleLib;
|
||||
|
||||
class Program
|
||||
{
|
||||
static readonly ILogger Logger = ModuleUtil.CreateLogger("DirectMethodSender");
|
||||
|
||||
public static int Main() => MainAsync().Result;
|
||||
|
||||
static async Task<int> MainAsync()
|
||||
{
|
||||
Console.WriteLine($"[{DateTime.UtcNow.ToString("MM/dd/yyyy hh:mm:ss.fff tt", CultureInfo.InvariantCulture)}] Main()");
|
||||
Logger.LogInformation("DirectMethodSender Main() started.");
|
||||
|
||||
IConfiguration configuration = new ConfigurationBuilder()
|
||||
.SetBasePath(Directory.GetCurrentDirectory())
|
||||
|
@ -27,92 +29,57 @@ namespace DirectMethodSender
|
|||
.AddEnvironmentVariables()
|
||||
.Build();
|
||||
|
||||
TimeSpan dmDelay = configuration.GetValue("DMDelay", TimeSpan.FromSeconds(5));
|
||||
|
||||
string targetModuleId = configuration.GetValue("TargetModuleId", "DirectMethodReceiver");
|
||||
|
||||
// Get deviced id of this device, exposed as a system variable by the iot edge runtime
|
||||
TimeSpan dmDelay = configuration.GetValue("DirectMethodDelay", TimeSpan.FromSeconds(5));
|
||||
// Get device id of this device, exposed as a system variable by the iot edge runtime
|
||||
string targetDeviceId = configuration.GetValue<string>("IOTEDGE_DEVICEID");
|
||||
|
||||
string targetModuleId = configuration.GetValue("TargetModuleId", "DirectMethodReceiver");
|
||||
TransportType transportType = configuration.GetValue("ClientTransportType", TransportType.Amqp_Tcp_Only);
|
||||
Console.WriteLine($"Using transport {transportType.ToString()}");
|
||||
|
||||
ModuleClient moduleClient = await InitModuleClient(transportType);
|
||||
ModuleClient moduleClient = await ModuleUtil.CreateModuleClientAsync(
|
||||
transportType,
|
||||
ModuleUtil.DefaultTimeoutErrorDetectionStrategy,
|
||||
ModuleUtil.DefaultTransientRetryStrategy,
|
||||
Logger);
|
||||
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), null);
|
||||
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler)
|
||||
= ShutdownHandler.Init(TimeSpan.FromSeconds(5), null);
|
||||
Console.WriteLine($"Target device Id = [{targetDeviceId}], Target module Id = [{targetModuleId}]");
|
||||
await CallDirectMethod(moduleClient, dmDelay, targetDeviceId, targetModuleId, cts);
|
||||
await moduleClient.CloseAsync();
|
||||
|
||||
completed.Set();
|
||||
handler.ForEach(h => GC.KeepAlive(h));
|
||||
Logger.LogInformation("DirectMethodSender Main() finished.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static async Task<ModuleClient> InitModuleClient(TransportType transportType)
|
||||
{
|
||||
ITransportSettings[] GetTransportSettings()
|
||||
{
|
||||
switch (transportType)
|
||||
{
|
||||
case TransportType.Mqtt:
|
||||
case TransportType.Mqtt_Tcp_Only:
|
||||
case TransportType.Mqtt_WebSocket_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(transportType) };
|
||||
default:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(transportType) };
|
||||
}
|
||||
}
|
||||
|
||||
ITransportSettings[] settings = GetTransportSettings();
|
||||
|
||||
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
|
||||
await moduleClient.OpenAsync();
|
||||
|
||||
Console.WriteLine("Successfully initialized module client.");
|
||||
return moduleClient;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Module behavior:
|
||||
/// Call HelloWorld Direct Method every 5 seconds.
|
||||
/// </summary>
|
||||
/// <param name="moduleClient"></param>
|
||||
/// <param name="dmDelay"></param>
|
||||
/// <param name="targetModuleId"></param>
|
||||
/// <param name="cts"></param>
|
||||
/// <param name="targetDeviceId"></param>
|
||||
/// <returns></returns>
|
||||
static async Task CallDirectMethod(
|
||||
ModuleClient moduleClient,
|
||||
TimeSpan dmDelay,
|
||||
string targetDeviceId,
|
||||
string targetModuleId,
|
||||
TimeSpan delay,
|
||||
string deviceId,
|
||||
string moduleId,
|
||||
CancellationTokenSource cts)
|
||||
{
|
||||
var request = new MethodRequest("HelloWorldMethod", Encoding.UTF8.GetBytes("{ \"Message\": \"Hello\" }"));
|
||||
|
||||
while (!cts.Token.IsCancellationRequested)
|
||||
{
|
||||
Console.WriteLine($"\t{DateTime.Now.ToLocalTime()}> Calling Direct Method on module.");
|
||||
|
||||
// Create the request
|
||||
var request = new MethodRequest("HelloWorldMethod", Encoding.UTF8.GetBytes("{ \"Message\": \"Hello\" }"));
|
||||
Logger.LogInformation($"Calling Direct Method on device [{deviceId}] module [{moduleId}].");
|
||||
|
||||
try
|
||||
{
|
||||
// Ignore Exception. Keep trying.
|
||||
MethodResponse response = await moduleClient.InvokeMethodAsync(targetDeviceId, targetModuleId, request);
|
||||
MethodResponse response = await moduleClient.InvokeMethodAsync(deviceId, moduleId, request);
|
||||
|
||||
if (response.Status == (int)HttpStatusCode.OK)
|
||||
{
|
||||
await moduleClient.SendEventAsync("AnyOutput", new Message(Encoding.UTF8.GetBytes("Method Call succeeded.")));
|
||||
await moduleClient.SendEventAsync("AnyOutput", new Message(Encoding.UTF8.GetBytes("Direct Method Call succeeded.")));
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Console.WriteLine(e);
|
||||
Logger.LogError($"Exception caught: {e}");
|
||||
}
|
||||
|
||||
await Task.Delay(dmDelay, cts.Token);
|
||||
await Task.Delay(delay, cts.Token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>netcoreapp2.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.19.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" />
|
||||
</ItemGroup>
|
||||
<PropertyGroup>
|
||||
<CodeAnalysisRuleSet>..\..\stylecop.ruleset</CodeAnalysisRuleSet>
|
||||
<RootNamespace>Microsoft.Azure.Devices.Edge.ModuleUtil</RootNamespace>
|
||||
<AssemblyName>Microsoft.Azure.Devices.Edge.ModuleUtil</AssemblyName>
|
||||
</PropertyGroup>
|
||||
<Import Project="..\..\stylecop.props" />
|
||||
</Project>
|
|
@ -0,0 +1,97 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
namespace ModuleLib
|
||||
{
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
|
||||
using Microsoft.Azure.Devices.Edge.Util;
|
||||
using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Serilog.Events;
|
||||
using ExponentialBackoff = Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling.ExponentialBackoff;
|
||||
using ILogger = Microsoft.Extensions.Logging.ILogger;
|
||||
|
||||
public static class ModuleUtil
|
||||
{
|
||||
public static readonly ITransientErrorDetectionStrategy DefaultTimeoutErrorDetectionStrategy =
|
||||
new DelegateErrorDetectionStrategy(ex => ex.HasTimeoutException());
|
||||
|
||||
public static readonly RetryStrategy DefaultTransientRetryStrategy =
|
||||
new ExponentialBackoff(
|
||||
5,
|
||||
TimeSpan.FromSeconds(2),
|
||||
TimeSpan.FromSeconds(60),
|
||||
TimeSpan.FromSeconds(4));
|
||||
|
||||
public static async Task<ModuleClient> CreateModuleClientAsync(
|
||||
TransportType transportType,
|
||||
ITransientErrorDetectionStrategy transientErrorDetectionStrategy = null,
|
||||
RetryStrategy retryStrategy = null,
|
||||
ILogger logger = null)
|
||||
{
|
||||
var retryPolicy = new RetryPolicy(transientErrorDetectionStrategy, retryStrategy);
|
||||
retryPolicy.Retrying += (_, args) =>
|
||||
{
|
||||
WriteLog(logger, LogLevel.Error, $"Retry {args.CurrentRetryCount} times to create module client and failed with exception:{Environment.NewLine}{args.LastException}");
|
||||
};
|
||||
|
||||
ModuleClient client = await retryPolicy.ExecuteAsync(() => InitializeModuleClientAsync(transportType, logger));
|
||||
return client;
|
||||
}
|
||||
|
||||
public static ILogger CreateLogger(string categoryName, LogEventLevel logEventLevel = LogEventLevel.Debug, string outputTemplate = "")
|
||||
{
|
||||
Preconditions.CheckNonWhiteSpace(categoryName, nameof(categoryName));
|
||||
|
||||
var levelSwitch = new LoggingLevelSwitch(logEventLevel);
|
||||
Log.Logger = new LoggerConfiguration()
|
||||
.MinimumLevel.ControlledBy(levelSwitch)
|
||||
.WriteTo.Console(outputTemplate: string.IsNullOrWhiteSpace(outputTemplate) ? "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}" : outputTemplate)
|
||||
.CreateLogger();
|
||||
|
||||
return new LoggerFactory().AddSerilog().CreateLogger(categoryName);
|
||||
}
|
||||
|
||||
static async Task<ModuleClient> InitializeModuleClientAsync(TransportType transportType, ILogger logger)
|
||||
{
|
||||
ITransportSettings[] GetTransportSettings()
|
||||
{
|
||||
switch (transportType)
|
||||
{
|
||||
case TransportType.Mqtt:
|
||||
case TransportType.Mqtt_Tcp_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_Tcp_Only) };
|
||||
case TransportType.Mqtt_WebSocket_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_WebSocket_Only) };
|
||||
case TransportType.Amqp_WebSocket_Only:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_WebSocket_Only) };
|
||||
default:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_Tcp_Only) };
|
||||
}
|
||||
}
|
||||
|
||||
ITransportSettings[] settings = GetTransportSettings();
|
||||
WriteLog(logger, LogLevel.Information, $"Trying to initialize module client using transport type [{transportType}].");
|
||||
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
|
||||
await moduleClient.OpenAsync();
|
||||
|
||||
WriteLog(logger, LogLevel.Information, $"Successfully initialized module client of transport type [{transportType}].");
|
||||
return moduleClient;
|
||||
}
|
||||
|
||||
static void WriteLog(ILogger logger, LogLevel logLevel, string message)
|
||||
{
|
||||
if (logger == null)
|
||||
{
|
||||
Console.WriteLine($"{logLevel}: {message}");
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.Log(logLevel, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -38,6 +38,7 @@
|
|||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
|
||||
<ProjectReference Include="..\ModuleLib\Microsoft.Azure.Devices.Edge.ModuleUtil.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
@ -2,52 +2,43 @@
|
|||
namespace SimulatedTemperatureSensor
|
||||
{
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
|
||||
using Microsoft.Azure.Devices.Edge.Util;
|
||||
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
|
||||
using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling;
|
||||
using Microsoft.Azure.Devices.Shared;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ModuleLib;
|
||||
using Newtonsoft.Json;
|
||||
using ExponentialBackoff = Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling.ExponentialBackoff;
|
||||
|
||||
class Program
|
||||
{
|
||||
const int RetryCount = 5;
|
||||
const string MessageCountConfigKey = "MessageCount";
|
||||
const string SendDataConfigKey = "SendData";
|
||||
const string SendIntervalConfigKey = "SendInterval";
|
||||
|
||||
static readonly ITransientErrorDetectionStrategy TimeoutErrorDetectionStrategy = new DelegateErrorDetectionStrategy(ex => ex.HasTimeoutException());
|
||||
|
||||
static readonly RetryStrategy TransientRetryStrategy =
|
||||
new ExponentialBackoff(RetryCount, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(4));
|
||||
|
||||
static readonly Random Rnd = new Random();
|
||||
static readonly AtomicBoolean Reset = new AtomicBoolean(false);
|
||||
static readonly Guid BatchId = Guid.NewGuid();
|
||||
|
||||
static readonly AtomicBoolean Reset = new AtomicBoolean(false);
|
||||
static readonly Random Rnd = new Random();
|
||||
static TimeSpan messageDelay;
|
||||
static bool sendData = true;
|
||||
|
||||
public enum ControlCommandEnum
|
||||
{
|
||||
Reset = 0,
|
||||
Noop = 1
|
||||
NoOperation = 1
|
||||
}
|
||||
|
||||
public static int Main() => MainAsync().Result;
|
||||
|
||||
static async Task<int> MainAsync()
|
||||
{
|
||||
Console.WriteLine($"[{DateTime.UtcNow.ToString("MM/dd/yyyy hh:mm:ss.fff tt", CultureInfo.InvariantCulture)}] Main()");
|
||||
Console.WriteLine("SimulatedTemperatureSensor Main() started.");
|
||||
|
||||
IConfiguration configuration = new ConfigurationBuilder()
|
||||
.SetBasePath(Directory.GetCurrentDirectory())
|
||||
|
@ -57,8 +48,7 @@ namespace SimulatedTemperatureSensor
|
|||
|
||||
messageDelay = configuration.GetValue("MessageDelay", TimeSpan.FromSeconds(5));
|
||||
int messageCount = configuration.GetValue(MessageCountConfigKey, 500);
|
||||
bool sendForever = messageCount < 0;
|
||||
var sim = new SimulatorParameters
|
||||
var simulatorParameters = new SimulatorParameters
|
||||
{
|
||||
MachineTempMin = configuration.GetValue<double>("machineTempMin", 21),
|
||||
MachineTempMax = configuration.GetValue<double>("machineTempMax", 100),
|
||||
|
@ -68,27 +58,23 @@ namespace SimulatedTemperatureSensor
|
|||
HumidityPercent = configuration.GetValue("ambientHumidity", 25)
|
||||
};
|
||||
|
||||
string messagesToSendString = sendForever ? "unlimited" : messageCount.ToString();
|
||||
Console.WriteLine(
|
||||
$"Initializing simulated temperature sensor to send {messagesToSendString} messages, at an interval of {messageDelay.TotalSeconds} seconds.\n"
|
||||
$"Initializing simulated temperature sensor to send {(SendUnlimitedMessages(messageCount) ? "unlimited" : messageCount.ToString())} "
|
||||
+ $"messages, at an interval of {messageDelay.TotalSeconds} seconds.\n"
|
||||
+ $"To change this, set the environment variable {MessageCountConfigKey} to the number of messages that should be sent (set it to -1 to send unlimited messages).");
|
||||
|
||||
TransportType transportType = configuration.GetValue("ClientTransportType", TransportType.Amqp_Tcp_Only);
|
||||
Console.WriteLine($"Using transport {transportType.ToString()}");
|
||||
|
||||
var retryPolicy = new RetryPolicy(TimeoutErrorDetectionStrategy, TransientRetryStrategy);
|
||||
retryPolicy.Retrying += (_, args) =>
|
||||
{
|
||||
Console.WriteLine($"Creating ModuleClient failed with exception {args.LastException}");
|
||||
if (args.CurrentRetryCount < RetryCount)
|
||||
{
|
||||
Console.WriteLine("Retrying...");
|
||||
}
|
||||
};
|
||||
ModuleClient moduleClient = await retryPolicy.ExecuteAsync(() => InitModuleClient(transportType));
|
||||
ModuleClient moduleClient = await ModuleUtil.CreateModuleClientAsync(
|
||||
transportType,
|
||||
ModuleUtil.DefaultTimeoutErrorDetectionStrategy,
|
||||
ModuleUtil.DefaultTransientRetryStrategy);
|
||||
await moduleClient.OpenAsync();
|
||||
await moduleClient.SetMethodHandlerAsync("reset", ResetMethod, null);
|
||||
|
||||
ModuleClient userContext = moduleClient;
|
||||
Twin currentTwinProperties = await moduleClient.GetTwinAsync();
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), null);
|
||||
|
||||
Twin currentTwinProperties = await moduleClient.GetTwinAsync(cts.Token);
|
||||
if (currentTwinProperties.Properties.Desired.Contains(SendIntervalConfigKey))
|
||||
{
|
||||
messageDelay = TimeSpan.FromSeconds((int)currentTwinProperties.Properties.Desired[SendIntervalConfigKey]);
|
||||
|
@ -103,42 +89,19 @@ namespace SimulatedTemperatureSensor
|
|||
}
|
||||
}
|
||||
|
||||
ModuleClient userContext = moduleClient;
|
||||
await moduleClient.SetDesiredPropertyUpdateCallbackAsync(OnDesiredPropertiesUpdated, userContext);
|
||||
await moduleClient.SetInputMessageHandlerAsync("control", ControlMessageHandle, userContext);
|
||||
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler)
|
||||
= ShutdownHandler.Init(TimeSpan.FromSeconds(5), null);
|
||||
await SendEvents(moduleClient, sendForever, messageCount, sim, cts);
|
||||
await SendEvents(moduleClient, messageCount, simulatorParameters, cts);
|
||||
await cts.Token.WhenCanceled();
|
||||
|
||||
completed.Set();
|
||||
handler.ForEach(h => GC.KeepAlive(h));
|
||||
Console.WriteLine("SimulatedTemperatureSensor Main() finished.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static async Task<ModuleClient> InitModuleClient(TransportType transportType)
|
||||
{
|
||||
ITransportSettings[] GetTransportSettings()
|
||||
{
|
||||
switch (transportType)
|
||||
{
|
||||
case TransportType.Mqtt:
|
||||
case TransportType.Mqtt_Tcp_Only:
|
||||
case TransportType.Mqtt_WebSocket_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(transportType) };
|
||||
default:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(transportType) };
|
||||
}
|
||||
}
|
||||
|
||||
ITransportSettings[] settings = GetTransportSettings();
|
||||
|
||||
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
|
||||
await moduleClient.OpenAsync();
|
||||
await moduleClient.SetMethodHandlerAsync("reset", ResetMethod, null);
|
||||
|
||||
Console.WriteLine("Successfully initialized module client.");
|
||||
return moduleClient;
|
||||
}
|
||||
static bool SendUnlimitedMessages(int maximumNumberOfMessages) => maximumNumberOfMessages < 0;
|
||||
|
||||
// Control Message expected to be:
|
||||
// {
|
||||
|
@ -162,10 +125,6 @@ namespace SimulatedTemperatureSensor
|
|||
Console.WriteLine("Resetting temperature sensor..");
|
||||
Reset.Set(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
// NoOp
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (JsonSerializationException)
|
||||
|
@ -177,14 +136,10 @@ namespace SimulatedTemperatureSensor
|
|||
Console.WriteLine("Resetting temperature sensor..");
|
||||
Reset.Set(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
// NoOp
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"Failed to deserialize control command with exception: [{ex.Message}]");
|
||||
Console.WriteLine($"Error: Failed to deserialize control command with exception: [{ex}]");
|
||||
}
|
||||
|
||||
return Task.FromResult(MessageResponse.Completed);
|
||||
|
@ -210,7 +165,6 @@ namespace SimulatedTemperatureSensor
|
|||
/// </summary>
|
||||
static async Task SendEvents(
|
||||
ModuleClient moduleClient,
|
||||
bool sendForever,
|
||||
int messageCount,
|
||||
SimulatorParameters sim,
|
||||
CancellationTokenSource cts)
|
||||
|
@ -219,7 +173,7 @@ namespace SimulatedTemperatureSensor
|
|||
double currentTemp = sim.MachineTempMin;
|
||||
double normal = (sim.MachinePressureMax - sim.MachinePressureMin) / (sim.MachineTempMax - sim.MachineTempMin);
|
||||
|
||||
while (!cts.Token.IsCancellationRequested && (sendForever || messageCount >= count))
|
||||
while (!cts.Token.IsCancellationRequested && (SendUnlimitedMessages(messageCount) || messageCount >= count))
|
||||
{
|
||||
if (Reset)
|
||||
{
|
||||
|
@ -291,24 +245,18 @@ namespace SimulatedTemperatureSensor
|
|||
sendData = desiredSendDataValue;
|
||||
}
|
||||
|
||||
ModuleClient moduleClient = (ModuleClient)userContext;
|
||||
TwinCollection patch = new TwinCollection($"{{ \"SendData\":{sendData.ToString().ToLower()}, \"SendInterval\": {messageDelay.TotalSeconds}}}");
|
||||
var moduleClient = (ModuleClient)userContext;
|
||||
var patch = new TwinCollection($"{{ \"SendData\":{sendData.ToString().ToLower()}, \"SendInterval\": {messageDelay.TotalSeconds}}}");
|
||||
await moduleClient.UpdateReportedPropertiesAsync(patch); // Just report back last desired property.
|
||||
}
|
||||
|
||||
static void CancelProgram(CancellationTokenSource cts)
|
||||
{
|
||||
Console.WriteLine("Termination requested, closing.");
|
||||
cts.Cancel();
|
||||
}
|
||||
|
||||
internal class ControlCommand
|
||||
class ControlCommand
|
||||
{
|
||||
[JsonProperty("command")]
|
||||
public ControlCommandEnum Command { get; set; }
|
||||
}
|
||||
|
||||
internal class SimulatorParameters
|
||||
class SimulatorParameters
|
||||
{
|
||||
public double MachineTempMin { get; set; }
|
||||
|
||||
|
|
|
@ -3,28 +3,24 @@ namespace TemperatureFilter
|
|||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Runtime.Loader;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
|
||||
using Microsoft.Azure.Devices.Edge.Util;
|
||||
using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling;
|
||||
using Microsoft.Azure.Devices.Shared;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ModuleLib;
|
||||
using Newtonsoft.Json;
|
||||
using ExponentialBackoff = Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling.ExponentialBackoff;
|
||||
|
||||
class Program
|
||||
{
|
||||
const int RetryCount = 5;
|
||||
const string TemperatureThresholdKey = "TemperatureThreshold";
|
||||
const int DefaultTemperatureThreshold = 25;
|
||||
static readonly ITransientErrorDetectionStrategy TimeoutErrorDetectionStrategy = new DelegateErrorDetectionStrategy(ex => ex.HasTimeoutException());
|
||||
static readonly RetryStrategy TransientRetryStrategy = new ExponentialBackoff(RetryCount, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(4));
|
||||
|
||||
static readonly ILogger Logger = ModuleUtil.CreateLogger("TemperatureFilter");
|
||||
static int counter;
|
||||
|
||||
public static int Main() => MainAsync().Result;
|
||||
|
@ -41,7 +37,7 @@ namespace TemperatureFilter
|
|||
|
||||
static async Task<int> MainAsync()
|
||||
{
|
||||
Console.WriteLine($"[{DateTime.UtcNow.ToString("MM/dd/yyyy hh:mm:ss.fff tt", CultureInfo.InvariantCulture)}] Main()");
|
||||
Logger.LogInformation("TemperatureFilter Main() started.");
|
||||
|
||||
IConfiguration configuration = new ConfigurationBuilder()
|
||||
.SetBasePath(Directory.GetCurrentDirectory())
|
||||
|
@ -50,59 +46,28 @@ namespace TemperatureFilter
|
|||
.Build();
|
||||
|
||||
TransportType transportType = configuration.GetValue("ClientTransportType", TransportType.Amqp_Tcp_Only);
|
||||
Console.WriteLine($"Using transport {transportType.ToString()}");
|
||||
|
||||
var retryPolicy = new RetryPolicy(TimeoutErrorDetectionStrategy, TransientRetryStrategy);
|
||||
retryPolicy.Retrying += (_, args) =>
|
||||
{
|
||||
Console.WriteLine($"Creating ModuleClient failed with exception {args.LastException}");
|
||||
if (args.CurrentRetryCount < RetryCount)
|
||||
{
|
||||
Console.WriteLine("Retrying...");
|
||||
}
|
||||
};
|
||||
Tuple<ModuleClient, ModuleConfig> moduleclientAndConfig = await retryPolicy.ExecuteAsync(() => InitModuleClient(transportType));
|
||||
ModuleClient moduleClient = await ModuleUtil.CreateModuleClientAsync(
|
||||
transportType,
|
||||
ModuleUtil.DefaultTimeoutErrorDetectionStrategy,
|
||||
ModuleUtil.DefaultTransientRetryStrategy,
|
||||
Logger);
|
||||
|
||||
Tuple<ModuleClient, ModuleConfig> userContext = moduleclientAndConfig;
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), null);
|
||||
|
||||
await moduleclientAndConfig.Item1.SetInputMessageHandlerAsync("input1", PrintAndFilterMessages, userContext);
|
||||
ModuleConfig moduleConfig = await GetConfigurationAsync(moduleClient);
|
||||
Logger.LogInformation($"Using TemperatureThreshold value of {moduleConfig.TemperatureThreshold}");
|
||||
|
||||
// Wait until the app unloads or is cancelled
|
||||
var cts = new CancellationTokenSource();
|
||||
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
|
||||
Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
|
||||
await WhenCancelled(cts.Token);
|
||||
var userContext = Tuple.Create(moduleClient, moduleConfig);
|
||||
await moduleClient.SetInputMessageHandlerAsync("input1", PrintAndFilterMessages, userContext);
|
||||
|
||||
await cts.Token.WhenCanceled();
|
||||
completed.Set();
|
||||
handler.ForEach(h => GC.KeepAlive(h));
|
||||
Logger.LogInformation("TemperatureFilter Main() finished.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static async Task<Tuple<ModuleClient, ModuleConfig>> InitModuleClient(TransportType transportType)
|
||||
{
|
||||
ITransportSettings[] GetTransportSettings()
|
||||
{
|
||||
switch (transportType)
|
||||
{
|
||||
case TransportType.Mqtt:
|
||||
case TransportType.Mqtt_Tcp_Only:
|
||||
case TransportType.Mqtt_WebSocket_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(transportType) };
|
||||
default:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(transportType) };
|
||||
}
|
||||
}
|
||||
|
||||
ITransportSettings[] settings = GetTransportSettings();
|
||||
|
||||
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
|
||||
await moduleClient.OpenAsync();
|
||||
Console.WriteLine("TemperatureFilter - Opened module client connection");
|
||||
|
||||
ModuleConfig moduleConfig = await GetConfiguration(moduleClient);
|
||||
Console.WriteLine($"Using TemperatureThreshold value of {moduleConfig.TemperatureThreshold}");
|
||||
|
||||
Console.WriteLine("Successfully initialized module client.");
|
||||
return new Tuple<ModuleClient, ModuleConfig>(moduleClient, moduleConfig);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This method is called whenever the Filter module is sent a message from the EdgeHub.
|
||||
/// It filters the messages based on the temperature value in the body of the messages,
|
||||
|
@ -118,27 +83,23 @@ namespace TemperatureFilter
|
|||
var userContextValues = userContext as Tuple<ModuleClient, ModuleConfig>;
|
||||
if (userContextValues == null)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
"UserContext doesn't contain " +
|
||||
"expected values");
|
||||
throw new InvalidOperationException("UserContext doesn't contain expected values");
|
||||
}
|
||||
|
||||
ModuleClient moduleClient = userContextValues.Item1;
|
||||
ModuleConfig moduleModuleConfig = userContextValues.Item2;
|
||||
ModuleConfig moduleConfig = userContextValues.Item2;
|
||||
|
||||
byte[] messageBytes = message.GetBytes();
|
||||
string messageString = Encoding.UTF8.GetString(messageBytes);
|
||||
Console.WriteLine($"Received message: {counterValue}, Body: [{messageString}]");
|
||||
Logger.LogInformation($"Received message: {counterValue}, Body: [{messageString}]");
|
||||
|
||||
// Get message body, containing the Temperature data
|
||||
var messageBody = JsonConvert.DeserializeObject<MessageBody>(messageString);
|
||||
|
||||
if (messageBody != null
|
||||
&& messageBody.Machine.Temperature > moduleModuleConfig.TemperatureThreshold)
|
||||
&& messageBody.Machine.Temperature > moduleConfig.TemperatureThreshold)
|
||||
{
|
||||
Console.WriteLine(
|
||||
$"Temperature {messageBody.Machine.Temperature} " +
|
||||
$"exceeds threshold {moduleModuleConfig.TemperatureThreshold}");
|
||||
Logger.LogInformation($"Temperature {messageBody.Machine.Temperature} exceeds threshold {moduleConfig.TemperatureThreshold}");
|
||||
var filteredMessage = new Message(messageBytes);
|
||||
foreach (KeyValuePair<string, string> prop in message.Properties)
|
||||
{
|
||||
|
@ -151,7 +112,7 @@ namespace TemperatureFilter
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Console.WriteLine($"Error in PrintAndFilterMessages - {e}");
|
||||
Logger.LogError($"Error in PrintAndFilterMessages: {e}");
|
||||
}
|
||||
|
||||
return MessageResponse.Completed;
|
||||
|
@ -160,7 +121,7 @@ namespace TemperatureFilter
|
|||
/// <summary>
|
||||
/// Get the configuration for the module (in this case the threshold temperature)s.
|
||||
/// </summary>
|
||||
static async Task<ModuleConfig> GetConfiguration(ModuleClient moduleClient)
|
||||
static async Task<ModuleConfig> GetConfigurationAsync(ModuleClient moduleClient)
|
||||
{
|
||||
// First try to get the config from the Module twin
|
||||
Twin twin = await moduleClient.GetTwinAsync();
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
|
||||
<ProjectReference Include="..\ModuleLib\Microsoft.Azure.Devices.Edge.ModuleUtil.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
@ -6,87 +6,69 @@ namespace LoadGen
|
|||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
|
||||
using Microsoft.Azure.Devices.Edge.Util;
|
||||
using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling;
|
||||
using Microsoft.Azure.Devices.Shared;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ModuleLib;
|
||||
using Newtonsoft.Json;
|
||||
using Serilog;
|
||||
using ExponentialBackoff = Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling.ExponentialBackoff;
|
||||
using ILogger = Microsoft.Extensions.Logging.ILogger;
|
||||
|
||||
class Program
|
||||
{
|
||||
const int RetryCount = 5;
|
||||
|
||||
static readonly ITransientErrorDetectionStrategy TimeoutErrorDetectionStrategy =
|
||||
new DelegateErrorDetectionStrategy(ex => ex.HasTimeoutException());
|
||||
|
||||
static readonly RetryStrategy TransientRetryStrategy =
|
||||
new ExponentialBackoff(
|
||||
RetryCount,
|
||||
TimeSpan.FromSeconds(2),
|
||||
TimeSpan.FromSeconds(60),
|
||||
TimeSpan.FromSeconds(4));
|
||||
static readonly ILogger Logger = ModuleUtil.CreateLogger("LoadGen");
|
||||
|
||||
static long messageIdCounter = 0;
|
||||
|
||||
static async Task Main()
|
||||
{
|
||||
ILogger logger = InitLogger().CreateLogger("loadgen");
|
||||
Log.Information($"Starting load gen with the following settings:\r\n{Settings.Current}");
|
||||
Logger.LogInformation($"Starting load gen with the following settings:\r\n{Settings.Current}");
|
||||
|
||||
try
|
||||
{
|
||||
var client = await GetModuleClientWithRetryAsync();
|
||||
ModuleClient moduleClient = await ModuleUtil.CreateModuleClientAsync(
|
||||
Settings.Current.TransportType,
|
||||
ModuleUtil.DefaultTimeoutErrorDetectionStrategy,
|
||||
ModuleUtil.DefaultTransientRetryStrategy,
|
||||
Logger);
|
||||
|
||||
using (var timers = new Timers())
|
||||
{
|
||||
Guid batchId = Guid.NewGuid();
|
||||
Logger.LogInformation($"Batch Id={batchId}");
|
||||
|
||||
// setup the message timer
|
||||
timers.Add(
|
||||
Settings.Current.MessageFrequency,
|
||||
Settings.Current.JitterFactor,
|
||||
() => GenerateMessageAsync(client, batchId));
|
||||
() => GenerateMessageAsync(moduleClient, batchId));
|
||||
|
||||
// setup the twin update timer
|
||||
timers.Add(
|
||||
Settings.Current.TwinUpdateFrequency,
|
||||
Settings.Current.JitterFactor,
|
||||
() => GenerateTwinUpdateAsync(client, batchId));
|
||||
() => GenerateTwinUpdateAsync(moduleClient, batchId));
|
||||
|
||||
timers.Start();
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), logger);
|
||||
Log.Information("Load gen running.");
|
||||
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), Logger);
|
||||
Logger.LogInformation("Load gen running.");
|
||||
|
||||
await cts.Token.WhenCanceled();
|
||||
Log.Information("Stopping timers.");
|
||||
Logger.LogInformation("Stopping timers.");
|
||||
timers.Stop();
|
||||
Log.Information("Closing connection to Edge Hub.");
|
||||
await client.CloseAsync();
|
||||
Logger.LogInformation("Closing connection to Edge Hub.");
|
||||
await moduleClient.CloseAsync();
|
||||
|
||||
completed.Set();
|
||||
handler.ForEach(h => GC.KeepAlive(h));
|
||||
Log.Information("Load gen complete. Exiting.");
|
||||
Logger.LogInformation("Load Gen complete. Exiting.");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error($"Error occurred during load gen.\r\n{ex}");
|
||||
Logger.LogError($"Error occurred during load gen.\r\n{ex}");
|
||||
}
|
||||
}
|
||||
|
||||
static ILoggerFactory InitLogger()
|
||||
{
|
||||
Log.Logger = new LoggerConfiguration()
|
||||
.MinimumLevel.Debug()
|
||||
.WriteTo.Console(outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}")
|
||||
.CreateLogger();
|
||||
|
||||
return new LoggerFactory().AddSerilog();
|
||||
}
|
||||
|
||||
static async Task GenerateMessageAsync(ModuleClient client, Guid batchId)
|
||||
{
|
||||
var random = new Random();
|
||||
|
@ -101,21 +83,18 @@ namespace LoadGen
|
|||
random.NextBytes(data.Data);
|
||||
|
||||
// build message
|
||||
var messageBody = new
|
||||
{
|
||||
data = data.Data,
|
||||
};
|
||||
|
||||
var messageBody = new { data = data.Data };
|
||||
var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(messageBody)));
|
||||
sequenceNumber = Interlocked.Increment(ref messageIdCounter);
|
||||
message.Properties.Add("sequenceNumber", sequenceNumber.ToString());
|
||||
message.Properties.Add("batchId", batchId.ToString());
|
||||
|
||||
await client.SendEventAsync(Settings.Current.OutputName, message);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.Error($"[GenerateMessageAsync] Sequence number {sequenceNumber}, BatchId: {batchId.ToString()}; {e}");
|
||||
Logger.LogError($"[GenerateMessageAsync] Sequence number {sequenceNumber}, BatchId: {batchId.ToString()};{Environment.NewLine}{e}");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,56 +103,15 @@ namespace LoadGen
|
|||
var twin = new TwinCollection();
|
||||
long sequenceNumber = messageIdCounter;
|
||||
twin["messagesSent"] = sequenceNumber;
|
||||
|
||||
try
|
||||
{
|
||||
await client.UpdateReportedPropertiesAsync(twin);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.Error($"[GenerateTwinUpdateAsync] Sequence number {sequenceNumber}, BatchId: {batchId.ToString()} {e}");
|
||||
Logger.LogError($"[GenerateTwinUpdateAsync] Sequence number {sequenceNumber}, BatchId: {batchId.ToString()};{Environment.NewLine}{e}");
|
||||
}
|
||||
}
|
||||
|
||||
static async Task<ModuleClient> GetModuleClientWithRetryAsync()
|
||||
{
|
||||
var retryPolicy = new RetryPolicy(TimeoutErrorDetectionStrategy, TransientRetryStrategy);
|
||||
retryPolicy.Retrying += (_, args) =>
|
||||
{
|
||||
Log.Error($"Creating ModuleClient failed with exception {args.LastException}");
|
||||
if (args.CurrentRetryCount < RetryCount)
|
||||
{
|
||||
Log.Information("Retrying...");
|
||||
}
|
||||
};
|
||||
ModuleClient client = await retryPolicy.ExecuteAsync(() => InitModuleClient(Settings.Current.TransportType));
|
||||
return client;
|
||||
}
|
||||
|
||||
static async Task<ModuleClient> InitModuleClient(TransportType transportType)
|
||||
{
|
||||
ITransportSettings[] GetTransportSettings()
|
||||
{
|
||||
switch (transportType)
|
||||
{
|
||||
case TransportType.Mqtt:
|
||||
case TransportType.Mqtt_Tcp_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_Tcp_Only) };
|
||||
case TransportType.Mqtt_WebSocket_Only:
|
||||
return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_WebSocket_Only) };
|
||||
case TransportType.Amqp_WebSocket_Only:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_WebSocket_Only) };
|
||||
default:
|
||||
return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_Tcp_Only) };
|
||||
}
|
||||
}
|
||||
|
||||
ITransportSettings[] settings = GetTransportSettings();
|
||||
|
||||
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
|
||||
await moduleClient.OpenAsync();
|
||||
|
||||
Log.Information("Successfully initialized module client.");
|
||||
return moduleClient;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ namespace LoadGen
|
|||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using System.Timers;
|
||||
|
||||
public class Timers : IDisposable
|
||||
|
@ -11,7 +12,7 @@ namespace LoadGen
|
|||
|
||||
bool disposedValue = false; // To detect redundant calls
|
||||
|
||||
public void Add(TimeSpan interval, double jitterFactor, Action callback)
|
||||
public void Add(TimeSpan interval, double jitterFactor, Func<Task> callback)
|
||||
{
|
||||
this.timerTasks.Add(new TimerTask(interval, jitterFactor, callback));
|
||||
}
|
||||
|
@ -59,7 +60,7 @@ namespace LoadGen
|
|||
|
||||
class TimerTask
|
||||
{
|
||||
public TimerTask(TimeSpan interval, double jitterFactor, Action callback)
|
||||
public TimerTask(TimeSpan interval, double jitterFactor, Func<Task> callback)
|
||||
{
|
||||
this.Callback = callback;
|
||||
this.Quit = false;
|
||||
|
@ -69,10 +70,10 @@ namespace LoadGen
|
|||
this.Timer.Enabled = false;
|
||||
|
||||
var random = new Random();
|
||||
this.Timer.Elapsed += (source, args) =>
|
||||
this.Timer.Elapsed += async (source, args) =>
|
||||
{
|
||||
// invoke callback
|
||||
this.Callback();
|
||||
await this.Callback();
|
||||
|
||||
// schedule next callback adding jitter if necessary
|
||||
if (this.Quit == false)
|
||||
|
@ -84,7 +85,7 @@ namespace LoadGen
|
|||
};
|
||||
}
|
||||
|
||||
public Action Callback { get; }
|
||||
public Func<Task> Callback { get; }
|
||||
|
||||
public Timer Timer { get; }
|
||||
|
||||
|
|
|
@ -20,14 +20,12 @@
|
|||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.1" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
|
||||
<PackageReference Include="Serilog" Version="2.7.1" />
|
||||
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
|
||||
<ProjectReference Include="..\ModuleLib\Microsoft.Azure.Devices.Edge.ModuleUtil.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
@ -57,7 +57,7 @@ Defaults:
|
|||
[Option("-proto|--protocol", Description = @"Protocol the leaf device will use to communicate with the Edge device.
|
||||
Choices are Mqtt, MqttWs, Amqp, AmqpWs.
|
||||
If protocol is unspecified, default is Mqtt.")]
|
||||
public DeviceProtocol protocol { get; } = DeviceProtocol.Mqtt;
|
||||
public DeviceProtocol Protocol { get; } = DeviceProtocol.Mqtt;
|
||||
|
||||
[Option("-cac|--x509-ca-cert-path", Description = "Path to a X.509 leaf certificate file in PEM format to be used for X.509 CA authentication.")]
|
||||
public string X509CACertPath { get; } = string.Empty;
|
||||
|
@ -98,7 +98,7 @@ Defaults:
|
|||
this.TrustedCACertificateFileName,
|
||||
this.EdgeHostName,
|
||||
this.EdgeGatewayDeviceId,
|
||||
this.protocol);
|
||||
this.Protocol);
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(this.X509PrimaryCertPath) &&
|
||||
!string.IsNullOrWhiteSpace(this.X509PrimaryKeyPath) &&
|
||||
|
|
|
@ -25,7 +25,8 @@ namespace LeafDeviceTest
|
|||
using Message = Microsoft.Azure.Devices.Client.Message;
|
||||
using ServiceClientTransportType = Microsoft.Azure.Devices.TransportType;
|
||||
|
||||
public enum DeviceProtocol {
|
||||
public enum DeviceProtocol
|
||||
{
|
||||
Amqp,
|
||||
AmqpWS,
|
||||
Mqtt,
|
||||
|
|
Загрузка…
Ссылка в новой задаче