зеркало из https://github.com/Azure/TypeEdge.git
provisioning
This commit is contained in:
Родитель
8d20b72c84
Коммит
9a9af57196
|
@ -3,7 +3,6 @@
|
|||
internal class Constants
|
||||
{
|
||||
public const string ConfigFileName = "appsettings_compose.json";
|
||||
public const string DeviceConnectionStringName = "deviceConnectionString";
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ namespace Microsoft.Azure.IoT.EdgeCompose.Hubs
|
|||
|
||||
private IConfigurationRoot HubServiceConfiguration { get; set; }
|
||||
|
||||
public override CreationResult Create(IConfigurationRoot configuration)
|
||||
public override CreationResult Configure(IConfigurationRoot configuration)
|
||||
{
|
||||
HubServiceConfiguration = new ConfigurationBuilder()
|
||||
.AddJsonFile(Constants.ConfigFileName)
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
using Microsoft.Azure.IoT.EdgeCompose.Modules;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using Microsoft.Azure.IoT.EdgeCompose.Modules;
|
||||
|
||||
namespace Microsoft.Azure.IoT.EdgeCompose.Hubs
|
||||
{
|
||||
|
@ -10,5 +12,12 @@ namespace Microsoft.Azure.IoT.EdgeCompose.Hubs
|
|||
{
|
||||
JsonData = data;
|
||||
}
|
||||
|
||||
public IDictionary<string, string> Properties { get; set; }
|
||||
|
||||
public byte[] GetBytes()
|
||||
{
|
||||
return Encoding.UTF8.GetBytes(JsonData);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,6 +15,10 @@ using System.Reflection;
|
|||
using Microsoft.Azure.Devices;
|
||||
using System.Configuration;
|
||||
using Agent = Microsoft.Azure.Devices.Edge.Agent.Core;
|
||||
using Microsoft.Azure.Devices.Common.Exceptions;
|
||||
using Newtonsoft.Json;
|
||||
using Microsoft.Azure.Devices.Shared;
|
||||
using Newtonsoft.Json.Linq;
|
||||
|
||||
namespace Microsoft.Azure.IoT.EdgeCompose
|
||||
{
|
||||
|
@ -38,7 +42,6 @@ namespace Microsoft.Azure.IoT.EdgeCompose
|
|||
Modules = new ModuleCollection();
|
||||
|
||||
Hub = new EdgeHub();
|
||||
Modules.Add(Hub);
|
||||
|
||||
Compose();
|
||||
|
||||
|
@ -52,13 +55,19 @@ namespace Microsoft.Azure.IoT.EdgeCompose
|
|||
builder.Populate(services);
|
||||
builder.RegisterBuildCallback(c => { });
|
||||
|
||||
var edgeDeviceConnectionString = Configuration.GetValue<string>(Constants.DeviceConnectionStringName);
|
||||
var iotHubConnectionString = Configuration.GetValue<string>(Agent.Constants.IotHubConnectionStringKey);
|
||||
if (String.IsNullOrEmpty(iotHubConnectionString))
|
||||
throw new Exception($"Missing {Agent.Constants.IotHubConnectionStringKey} value in configuration");
|
||||
|
||||
if (String.IsNullOrEmpty(edgeDeviceConnectionString))
|
||||
throw new Exception($"Missing {Constants.DeviceConnectionStringName} in configuration");
|
||||
var deviceId = Configuration.GetValue<string>("DeviceId");
|
||||
if (String.IsNullOrEmpty(deviceId))
|
||||
throw new Exception($"Missing DeviceId value in configuration");
|
||||
|
||||
var hubConnectionString = $"{edgeDeviceConnectionString};{Agent.Constants.ModuleIdKey}={Agent.Constants.EdgeHubModuleIdentityName}";
|
||||
#region edge hub config
|
||||
|
||||
//configure the edge hub
|
||||
|
||||
|
||||
Environment.SetEnvironmentVariable(HubService.Constants.SslCertEnvName, "edge-hub-server.cert.pfx");
|
||||
Environment.SetEnvironmentVariable(HubService.Constants.SslCertPathEnvName, Path.Combine(
|
||||
Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location),
|
||||
|
@ -72,42 +81,176 @@ namespace Microsoft.Azure.IoT.EdgeCompose
|
|||
Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location),
|
||||
@"Storage");
|
||||
|
||||
|
||||
var hubStorageFolder = Path.Combine(storageFolder, HubService.Constants.EdgeHubStorageFolder);
|
||||
|
||||
if (!Directory.Exists(hubStorageFolder))
|
||||
Directory.CreateDirectory(hubStorageFolder);
|
||||
|
||||
Environment.SetEnvironmentVariable("storageFolder", storageFolder);
|
||||
Environment.SetEnvironmentVariable("IotHubConnectionString", iotHubConnectionString);
|
||||
#endregion
|
||||
|
||||
Environment.SetEnvironmentVariable("IotHubConnectionString", hubConnectionString.ToString());
|
||||
|
||||
var deviceSasKey = ProvisionDeviceAsync(iotHubConnectionString, deviceId, Modules).Result;
|
||||
|
||||
foreach (var module in Modules)
|
||||
{
|
||||
module.Create(Configuration);
|
||||
var moduleConnectionString = GetModuleConnectionStringAsync(iotHubConnectionString, deviceId, module.Name).Result;
|
||||
|
||||
Environment.SetEnvironmentVariable(Agent.Constants.EdgeHubConnectionStringKey, moduleConnectionString);
|
||||
|
||||
var moduleConfiguration = new ConfigurationBuilder()
|
||||
.AddEnvironmentVariables()
|
||||
.Build();
|
||||
|
||||
module.InternalConfigure(moduleConfiguration);
|
||||
}
|
||||
|
||||
var csBuilder = IotHubConnectionStringBuilder.Create(iotHubConnectionString);
|
||||
var edgeConnectionString = new Agent.ModuleConnectionString.ModuleConnectionStringBuilder(csBuilder.HostName, deviceId)
|
||||
.WithModuleId(Agent.Constants.EdgeHubModuleName)
|
||||
.WithModuleId(Agent.Constants.EdgeHubModuleIdentityName)
|
||||
.WithSharedAccessKey(deviceSasKey)
|
||||
.Build();
|
||||
Environment.SetEnvironmentVariable(Agent.Constants.EdgeHubConnectionStringKey, edgeConnectionString);
|
||||
Environment.SetEnvironmentVariable(Agent.Constants.IotHubConnectionStringKey, edgeConnectionString);
|
||||
|
||||
var edgeHubConfiguration = new ConfigurationBuilder()
|
||||
.AddEnvironmentVariables()
|
||||
.Build();
|
||||
|
||||
Hub.InternalConfigure(edgeHubConfiguration);
|
||||
|
||||
IContainer container = builder.Build();
|
||||
return container;
|
||||
}
|
||||
private async Task<string> ProvisionDeviceAsync(string iotHubConnectionString, string deviceId, ModuleCollection modules)
|
||||
{
|
||||
var csBuilder = IotHubConnectionStringBuilder.Create(iotHubConnectionString);
|
||||
|
||||
RegistryManager registryManager = RegistryManager.CreateFromConnectionString(iotHubConnectionString);
|
||||
string sasKey = null;
|
||||
try
|
||||
{
|
||||
var device = await registryManager.AddDeviceAsync(new Devices.Device(deviceId) { Capabilities = new Microsoft.Azure.Devices.Shared.DeviceCapabilities() { IotEdge = true } });
|
||||
sasKey = device.Authentication.SymmetricKey.PrimaryKey;
|
||||
}
|
||||
catch (DeviceAlreadyExistsException)
|
||||
{
|
||||
var device = await registryManager.GetDeviceAsync(deviceId);
|
||||
sasKey = device.Authentication.SymmetricKey.PrimaryKey;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var config = JsonConvert.DeserializeObject<ConfigurationContent>(File.ReadAllText("deviceconfig.json"));
|
||||
var modulesConfig = config.ModuleContent["$edgeAgent"].TargetContent["modules"] as JObject;
|
||||
foreach (var module in modules)
|
||||
{
|
||||
modulesConfig.Add(module.Name, JObject.FromObject(new
|
||||
{
|
||||
version = "1.0",
|
||||
type = "docker",
|
||||
status = "running",
|
||||
restartPolicy = "on-failure",
|
||||
settings = new
|
||||
{
|
||||
image = "devimage",
|
||||
createOptions = $" -e __MODULE_NAME='{module.Name}' "
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
var twinContent = new TwinContent();
|
||||
config.ModuleContent["$edgeHub"] = twinContent;
|
||||
|
||||
var desiredProperties = new
|
||||
{
|
||||
schemaVersion = "1.0",
|
||||
routes = new Dictionary<string, string>
|
||||
{
|
||||
["route1"] = "from /* INTO $upstream",
|
||||
["route2"] = "from /modules/module1 INTO BrokeredEndpoint(\"/modules/module2/inputs/input1\")",
|
||||
["route3"] = "from /modules/module2 INTO BrokeredEndpoint(\"/modules/module3/inputs/input1\")",
|
||||
["route4"] = "from /modules/module3 INTO BrokeredEndpoint(\"/modules/module4/inputs/input1\")",
|
||||
},
|
||||
storeAndForwardConfiguration = new
|
||||
{
|
||||
timeToLiveSecs = 20
|
||||
}
|
||||
};
|
||||
string patch = JsonConvert.SerializeObject(desiredProperties);
|
||||
|
||||
twinContent.TargetContent = new TwinCollection(patch);
|
||||
await registryManager.ApplyConfigurationContentOnDeviceAsync(deviceId, config);
|
||||
}
|
||||
catch
|
||||
{
|
||||
throw;
|
||||
}
|
||||
|
||||
return sasKey;
|
||||
}
|
||||
|
||||
private async Task<string> GetModuleConnectionStringAsync(string iotHubConnectionString, string deviceId, string moduleName)
|
||||
{
|
||||
var csBuilder = IotHubConnectionStringBuilder.Create(iotHubConnectionString);
|
||||
RegistryManager registryManager = RegistryManager.CreateFromConnectionString(iotHubConnectionString);
|
||||
string sasKey = null;
|
||||
try
|
||||
{
|
||||
var module = await registryManager.GetModuleAsync(deviceId, moduleName);
|
||||
sasKey = module.Authentication.SymmetricKey.PrimaryKey;
|
||||
}
|
||||
catch
|
||||
{
|
||||
throw;
|
||||
}
|
||||
return new Agent.ModuleConnectionString.ModuleConnectionStringBuilder(csBuilder.HostName, deviceId)
|
||||
.WithGatewayHostName(Environment.MachineName)
|
||||
.WithModuleId(moduleName)
|
||||
.WithSharedAccessKey(sasKey)
|
||||
.Build();
|
||||
}
|
||||
|
||||
//private async Task<string> ProvisionModuleAsync(string iotHubConnectionString, string deviceId, string moduleName)
|
||||
//{
|
||||
// var csBuilder = IotHubConnectionStringBuilder.Create(iotHubConnectionString);
|
||||
// RegistryManager registryManager = RegistryManager.CreateFromConnectionString(iotHubConnectionString);
|
||||
// string sasKey = null;
|
||||
// try
|
||||
// {
|
||||
// var module = await registryManager.AddModuleAsync(new Devices.Module(deviceId, moduleName));
|
||||
// sasKey = module.Authentication.SymmetricKey.PrimaryKey;
|
||||
// }
|
||||
// catch (ModuleAlreadyExistsException)
|
||||
// {
|
||||
// var module = await registryManager.GetModuleAsync(deviceId, moduleName);
|
||||
// sasKey = module.Authentication.SymmetricKey.PrimaryKey;
|
||||
// }
|
||||
// return new Agent.ModuleConnectionString.ModuleConnectionStringBuilder(csBuilder.IotHubName, deviceId)
|
||||
// .WithGatewayHostName(Environment.MachineName)
|
||||
// .WithModuleId(moduleName)
|
||||
// .WithSharedAccessKey(sasKey)
|
||||
// .Build();
|
||||
//}
|
||||
public abstract CompositionResult Compose();
|
||||
|
||||
public async Task RunAsync()
|
||||
{
|
||||
List<Task> tasks = new List<Task>();
|
||||
//configure all modules
|
||||
foreach (var module in Modules)
|
||||
{
|
||||
await module.InitAsync();
|
||||
}
|
||||
|
||||
tasks.Add(Hub.RunAsync());
|
||||
//start all modules
|
||||
foreach (var module in Modules)
|
||||
{
|
||||
tasks.Add(module.RunAsync());
|
||||
tasks.Add(module.InternalRunAsync());
|
||||
}
|
||||
|
||||
Task.WaitAll(tasks.ToArray());
|
||||
await Task.WhenAll(tasks.ToArray());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,9 @@
|
|||
<None Update="Certificates\edge-hub-server\cert\edge-hub-server.cert.pfx">
|
||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
||||
</None>
|
||||
<None Update="deviceconfig.json">
|
||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
||||
</None>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
|
|
@ -4,7 +4,7 @@ using System.Collections.Generic;
|
|||
|
||||
namespace Microsoft.Azure.IoT.EdgeCompose
|
||||
{
|
||||
public class ModuleCollection : List<IEdgeModule>
|
||||
public class ModuleCollection : List<EdgeModule>
|
||||
{
|
||||
}
|
||||
}
|
|
@ -1,6 +1,10 @@
|
|||
namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
||||
{
|
||||
public interface IEdgeMessage
|
||||
{
|
||||
IDictionary<string, string> Properties { get; set; }
|
||||
byte[] GetBytes();
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ using Microsoft.Azure.IoT.EdgeCompose.Hubs;
|
|||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Reflection;
|
||||
using System.Runtime.InteropServices;
|
||||
|
@ -19,10 +20,18 @@ using Agent = Microsoft.Azure.Devices.Edge.Agent.Core;
|
|||
|
||||
namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
||||
{
|
||||
public abstract class EdgeModule : IEdgeModule
|
||||
public abstract class EdgeModule
|
||||
{
|
||||
private string ConnectionString { get; set; }
|
||||
private DeviceClient IoTHubModuleClient { get; set; }
|
||||
private ITransportSettings[] TransportSettings { get; set; }
|
||||
|
||||
private Dictionary<string, MessageCallback> Subscriptions { get; set; }
|
||||
|
||||
public EdgeModule()
|
||||
{
|
||||
Subscriptions = new Dictionary<string, MessageCallback>();
|
||||
|
||||
var props = GetType().GetProperties();
|
||||
|
||||
foreach (var prop in props)
|
||||
|
@ -44,18 +53,97 @@ namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
|||
}
|
||||
public virtual string Name { get { return this.GetType().Name; } }
|
||||
|
||||
public virtual CreationResult Create(IConfigurationRoot configuration)
|
||||
public virtual CreationResult Configure(IConfigurationRoot configuration)
|
||||
{
|
||||
return CreationResult.OK;
|
||||
}
|
||||
public virtual Task<InitializationResult> InitAsync()
|
||||
internal CreationResult InternalConfigure(IConfigurationRoot configuration)
|
||||
{
|
||||
return Task.FromResult(InitializationResult.OK);
|
||||
ConnectionString = configuration.GetValue<string>(Agent.Constants.EdgeHubConnectionStringKey);
|
||||
|
||||
// Cert verification is not yet fully functional when using Windows OS for the container
|
||||
bool bypassCertVerification = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
|
||||
if (!bypassCertVerification)
|
||||
InstallCert();
|
||||
|
||||
Console.WriteLine("Connection String {0}", ConnectionString);
|
||||
|
||||
MqttTransportSettings mqttSetting = new MqttTransportSettings(Devices.Client.TransportType.Mqtt_Tcp_Only);
|
||||
// During dev you might want to bypass the cert verification. It is highly recommended to verify certs systematically in production
|
||||
if (bypassCertVerification)
|
||||
{
|
||||
mqttSetting.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true;
|
||||
}
|
||||
TransportSettings = new ITransportSettings[] { mqttSetting };
|
||||
|
||||
return Configure(configuration);
|
||||
}
|
||||
|
||||
void InstallCert()
|
||||
{
|
||||
string certPath = Environment.GetEnvironmentVariable("EdgeModuleCACertificateFile");
|
||||
if (string.IsNullOrWhiteSpace(certPath))
|
||||
{
|
||||
// We cannot proceed further without a proper cert file
|
||||
Console.WriteLine($"Missing path to certificate collection file: {certPath}");
|
||||
throw new InvalidOperationException("Missing path to certificate file.");
|
||||
}
|
||||
else if (!File.Exists(certPath))
|
||||
{
|
||||
// We cannot proceed further without a proper cert file
|
||||
Console.WriteLine($"Missing path to certificate collection file: {certPath}");
|
||||
throw new InvalidOperationException("Missing certificate file.");
|
||||
}
|
||||
X509Store store = new X509Store(StoreName.Root, StoreLocation.CurrentUser);
|
||||
store.Open(OpenFlags.ReadWrite);
|
||||
store.Add(new X509Certificate2(X509Certificate2.CreateFromCertFile(certPath)));
|
||||
Console.WriteLine("Added Cert: " + certPath);
|
||||
store.Close();
|
||||
}
|
||||
|
||||
internal async Task<ExecutionResult> InternalRunAsync()
|
||||
{
|
||||
// Open a connection to the Edge runtime
|
||||
IoTHubModuleClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportSettings);
|
||||
await IoTHubModuleClient.OpenAsync();
|
||||
Console.WriteLine("IoT Hub module client initialized.");
|
||||
|
||||
// Register callback to be called when a message is received by the module
|
||||
foreach (var subscription in Subscriptions)
|
||||
{
|
||||
await IoTHubModuleClient.SetInputMessageHandlerAsync(subscription.Key, SubscribedMessageHandler, subscription.Value);
|
||||
}
|
||||
return await RunAsync();
|
||||
}
|
||||
public virtual Task<ExecutionResult> RunAsync()
|
||||
{
|
||||
return Task.FromResult(ExecutionResult.OK);
|
||||
}
|
||||
|
||||
private async Task<MessageResponse> SubscribedMessageHandler(Devices.Client.Message message, object userContext)
|
||||
{
|
||||
var inputRoute = userContext as string;
|
||||
if (string.IsNullOrEmpty(inputRoute))
|
||||
throw new InvalidOperationException("UserContext doesn't contain a valid input route");
|
||||
|
||||
byte[] messageBytes = message.GetBytes();
|
||||
string messageString = Encoding.UTF8.GetString(messageBytes);
|
||||
Console.WriteLine($"Received message: Body: [{messageString}]");
|
||||
|
||||
var subscription = Subscriptions[inputRoute];
|
||||
|
||||
if (subscription == null)
|
||||
throw new InvalidOperationException("No subscription found for this input route");
|
||||
|
||||
var input = Activator.CreateInstance(subscription.MessageType);
|
||||
var result = await subscription.Handler(input);
|
||||
|
||||
if (result is MessageResult && ((MessageResult)result) == MessageResult.OK)
|
||||
return MessageResponse.Completed;
|
||||
|
||||
return MessageResponse.Abandoned;
|
||||
}
|
||||
|
||||
public virtual Task<TwinResult> TwinHandler(ModuleTwin newTwin)
|
||||
{
|
||||
return Task.FromResult(TwinResult.OK);
|
||||
|
@ -67,134 +155,30 @@ namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
|||
public Input<JsonMessage> DefaultInput { get; set; }
|
||||
public Output<JsonMessage> DefaultOutput { get; set; }
|
||||
|
||||
public void DependsOn(IEdgeModule module)
|
||||
public void DependsOn(EdgeModule module)
|
||||
{
|
||||
}
|
||||
|
||||
public async Task<PublishResult> PublishMessageAsync<T>(string outputName, T message) where T : IEdgeMessage
|
||||
public async Task<PublishResult> PublishAsync<T>(string outputName, T message)
|
||||
where T : IEdgeMessage
|
||||
{
|
||||
var edgeMessage = new Devices.Client.Message(message.GetBytes());
|
||||
if (message.Properties != null)
|
||||
foreach (var prop in edgeMessage.Properties)
|
||||
{
|
||||
edgeMessage.Properties.Add(prop.Key, prop.Value);
|
||||
}
|
||||
|
||||
await IoTHubModuleClient.SendEventAsync(outputName, edgeMessage);
|
||||
|
||||
Console.WriteLine("Received message sent");
|
||||
return PublishResult.OK;
|
||||
}
|
||||
|
||||
/*
|
||||
public void Configure(IConfigurationRoot configuration)
|
||||
{
|
||||
var edgeDeviceConnectionString = configuration.GetValue<string>(Constants.DeviceConnectionStringName);
|
||||
|
||||
Options = new TOptions();
|
||||
Options.DeviceConnectionString = edgeDeviceConnectionString;
|
||||
|
||||
PopulateOptions(configuration);
|
||||
}
|
||||
|
||||
public virtual void PopulateOptions(IConfigurationRoot configuration)
|
||||
{
|
||||
//stadardize the custom options loading here
|
||||
}
|
||||
|
||||
public void Subscribe(Upstream<TInputMessage> output)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public void Subscribe<T>(Upstream<T> output, Func<T, TInputMessage> endpointTypeConverter)
|
||||
where T : IModuleMessage
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static void InstallCert()
|
||||
{
|
||||
string certPath = Environment.GetEnvironmentVariable("EdgeModuleCACertificateFile");
|
||||
if (string.IsNullOrWhiteSpace(certPath))
|
||||
public void Subscribe<T>(string name, Func<T, Task<MessageResult>> handler)
|
||||
where T : IEdgeMessage
|
||||
{
|
||||
// We cannot proceed further without a proper cert file
|
||||
Console.WriteLine($"Missing path to certificate collection file: {certPath}");
|
||||
throw new InvalidOperationException("Missing path to certificate file.");
|
||||
Subscriptions[name] = new MessageCallback(name, handler.GetMethodInfo(), handler, typeof(T));
|
||||
}
|
||||
else if (!File.Exists(certPath))
|
||||
{
|
||||
// We cannot proceed further without a proper cert file
|
||||
Console.WriteLine($"Missing path to certificate collection file: {certPath}");
|
||||
throw new InvalidOperationException("Missing certificate file.");
|
||||
}
|
||||
X509Store store = new X509Store(StoreName.Root, StoreLocation.CurrentUser);
|
||||
store.Open(OpenFlags.ReadWrite);
|
||||
store.Add(new X509Certificate2(X509Certificate2.CreateFromCertFile(certPath)));
|
||||
Console.WriteLine("Added Cert: " + certPath);
|
||||
store.Close();
|
||||
}
|
||||
|
||||
public async Task<CreationResult> CreateAsync()
|
||||
{
|
||||
return await CreateHandler(Options);
|
||||
}
|
||||
|
||||
public Task WhenCancelled(CancellationToken cancellationToken)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<bool>();
|
||||
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
public async Task StartAsync()
|
||||
{
|
||||
// The Edge runtime gives us the connection string we need -- it is injected as an environment variable
|
||||
string connectionString = Environment.GetEnvironmentVariable("EdgeHubConnectionString");
|
||||
Console.WriteLine("Connection String {0}", connectionString);
|
||||
|
||||
// Cert verification is not yet fully functional when using Windows OS for the container
|
||||
bool bypassCertVerification = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
|
||||
if (!bypassCertVerification)
|
||||
InstallCert();
|
||||
|
||||
MqttTransportSettings mqttSetting = new MqttTransportSettings(Devices.Client.TransportType.Mqtt_Tcp_Only);
|
||||
|
||||
// During dev you might want to bypass the cert verification. It is highly recommended to verify certs systematically in production
|
||||
if (bypassCertVerification)
|
||||
mqttSetting.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true;
|
||||
|
||||
ITransportSettings[] settings = { mqttSetting };
|
||||
|
||||
// Open a connection to the Edge runtime
|
||||
IoTHubModuleClient = DeviceClient.CreateFromConnectionString(connectionString, settings);
|
||||
|
||||
await IoTHubModuleClient.OpenAsync();
|
||||
Console.WriteLine($"{Name} module DeviceClient initialized.");
|
||||
|
||||
// Register callback to be called when a message is received by the module
|
||||
await IoTHubModuleClient.SetInputMessageHandlerAsync("Input", (e, u) => this.InputMessageHandlerAsync(e, u), this);
|
||||
|
||||
// Wait until the app unloads or is cancelled
|
||||
var cts = new CancellationTokenSource();
|
||||
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
|
||||
|
||||
await ExecuteHandler(Output);
|
||||
|
||||
WhenCancelled(cts.Token).Wait();
|
||||
}
|
||||
|
||||
|
||||
public async Task<MessageResponse> InputMessageHandlerAsync(Devices.Client.Message message, object userContext)
|
||||
{
|
||||
var deviceClient = userContext as DeviceClient;
|
||||
if (deviceClient == null)
|
||||
{
|
||||
throw new InvalidOperationException("UserContext doesn't contain " + "expected values");
|
||||
}
|
||||
|
||||
byte[] messageBytes = message.GetBytes();
|
||||
string messageString = Encoding.UTF8.GetString(messageBytes);
|
||||
Console.WriteLine($"Body: [{messageString}]");
|
||||
|
||||
await IncomingMessageHandler(messageString, deviceClient);
|
||||
return MessageResponse.Completed;
|
||||
}
|
||||
|
||||
internal string ModuleConnectionString { get { return $"{Options.DeviceConnectionString};{Agent.Constants.ModuleIdKey}={Name}"; } }
|
||||
|
||||
public DeviceClient IoTHubModuleClient { get; private set; }
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,9 +3,9 @@
|
|||
public class Endpoint
|
||||
{
|
||||
public string Name { get; set; }
|
||||
public IEdgeModule Module { get; set; }
|
||||
public EdgeModule Module { get; set; }
|
||||
|
||||
public Endpoint(string name, IEdgeModule module)
|
||||
public Endpoint(string name, EdgeModule module)
|
||||
{
|
||||
Name = name;
|
||||
Module = module;
|
||||
|
|
|
@ -1,20 +1,26 @@
|
|||
using System.Threading.Tasks;
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.IoT.EdgeCompose.Hubs;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
||||
{
|
||||
public interface IEdgeModule
|
||||
{
|
||||
Input<JsonMessage> DefaultInput { get; }
|
||||
Output<JsonMessage> DefaultOutput { get; }
|
||||
string Name { get; }
|
||||
//public interface IEdgeModule
|
||||
//{
|
||||
// Input<JsonMessage> DefaultInput { get; }
|
||||
// Output<JsonMessage> DefaultOutput { get; }
|
||||
// string Name { get; }
|
||||
|
||||
CreationResult Create(IConfigurationRoot configuration);
|
||||
Task<InitializationResult> InitAsync();
|
||||
Task<PropertiesResult> PropertiesHandler(ModuleProperties newProps);
|
||||
Task<ExecutionResult> RunAsync();
|
||||
Task<TwinResult> TwinHandler(ModuleTwin newTwin);
|
||||
Task<PublishResult> PublishMessageAsync<T>(string outputName, T message) where T : IEdgeMessage;
|
||||
}
|
||||
// CreationResult Configure(IConfigurationRoot configuration);
|
||||
// Task<ExecutionResult> RunAsync();
|
||||
|
||||
// void Subscribe<T>(string name, Func<T, Task<MessageResult>> handler)
|
||||
// where T : IEdgeMessage;
|
||||
// Task<PublishResult> PublishAsync<T>(string outputName, T message)
|
||||
// where T : IEdgeMessage;
|
||||
|
||||
// //handlers
|
||||
// Task<PropertiesResult> PropertiesHandler(ModuleProperties newProps);
|
||||
// Task<TwinResult> TwinHandler(ModuleTwin newTwin);
|
||||
//}
|
||||
}
|
|
@ -6,13 +6,14 @@ namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
|||
public class Input<T> : Endpoint
|
||||
where T : IEdgeMessage
|
||||
{
|
||||
public Input(string name, IEdgeModule module) :
|
||||
public Input(string name, EdgeModule module) :
|
||||
base(name, module)
|
||||
{
|
||||
}
|
||||
|
||||
public void Subscribe(Output<T> output, Func<T, Task<MessageResult>> handler)
|
||||
{
|
||||
Module.Subscribe(output.Name, handler);
|
||||
}
|
||||
public void Subscribe<O>(Output<O> output, Func<O, Task<T>> convert)
|
||||
where O : IEdgeMessage
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
using System;
|
||||
using System.Reflection;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
||||
{
|
||||
public class MessageCallback
|
||||
{
|
||||
public string Name { get; set; }
|
||||
public MethodInfo MethodInfo { get; set; }
|
||||
public dynamic Handler { get; set; }
|
||||
|
||||
public Type MessageType { get; set; }
|
||||
|
||||
public MessageCallback(string name, MethodInfo methodInfo, Delegate handler, Type messageType)
|
||||
{
|
||||
this.Name = name;
|
||||
this.MethodInfo = methodInfo;
|
||||
this.Handler = handler;
|
||||
MessageType = messageType;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,13 +6,13 @@ namespace Microsoft.Azure.IoT.EdgeCompose.Modules
|
|||
public class Output<T> : Endpoint
|
||||
where T : IEdgeMessage
|
||||
{
|
||||
public Output(string name, IEdgeModule module) :
|
||||
public Output(string name, EdgeModule module) :
|
||||
base(name, module)
|
||||
{
|
||||
}
|
||||
public async Task<PublishResult> PublishAsync(T message)
|
||||
{
|
||||
return await Module.PublishMessageAsync(Name, message);
|
||||
return await Module.PublishAsync(Name, message);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
{
|
||||
"moduleContent": {
|
||||
"$edgeAgent": {
|
||||
"properties.desired": {
|
||||
"schemaVersion": "1.0",
|
||||
"runtime": {
|
||||
"type": "docker",
|
||||
"settings": {
|
||||
"minDockerVersion": "v1.25",
|
||||
"loggingOptions": ""
|
||||
}
|
||||
},
|
||||
"systemModules": {
|
||||
"edgeAgent": {
|
||||
"type": "docker",
|
||||
"settings": {
|
||||
"image": "microsoft/azureiotedge-agent:1.0-preview",
|
||||
"createOptions": ""
|
||||
}
|
||||
},
|
||||
"edgeHub": {
|
||||
"type": "docker",
|
||||
"status": "running",
|
||||
"restartPolicy": "always",
|
||||
"settings": {
|
||||
"image": "microsoft/azureiotedge-hub:1.0-preview",
|
||||
"createOptions": ""
|
||||
}
|
||||
}
|
||||
},
|
||||
"modules": {
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
"$edgeHub": {
|
||||
"properties.desired": {
|
||||
"schemaVersion": "1.0",
|
||||
"routes": {
|
||||
"route": "FROM /* INTO $upstream"
|
||||
},
|
||||
"storeAndForwardConfiguration": {
|
||||
"timeToLiveSecs": 7200
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,10 +1,20 @@
|
|||
using Microsoft.Azure.IoT.EdgeCompose;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using Microsoft.Azure.IoT.EdgeCompose;
|
||||
using Microsoft.Azure.IoT.EdgeCompose.Hubs;
|
||||
using Microsoft.Azure.IoT.EdgeCompose.Modules;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace ThermpostatEdgeApplication
|
||||
{
|
||||
public class TemperatureModuleInput : IEdgeMessage
|
||||
{
|
||||
public int MyData { get; set; }
|
||||
public IDictionary<string, string> Properties { get; set; }
|
||||
|
||||
public byte[] GetBytes()
|
||||
{
|
||||
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(this));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,8 @@
|
|||
using Microsoft.Azure.IoT.EdgeCompose;
|
||||
using Microsoft.Azure.IoT.EdgeCompose.Modules;
|
||||
using Newtonsoft.Json;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace ThermpostatEdgeApplication
|
||||
|
@ -8,6 +11,11 @@ namespace ThermpostatEdgeApplication
|
|||
{
|
||||
public TemperatureScale Scale { get; set; }
|
||||
public double Temperature { get; set; }
|
||||
public IDictionary<string, string> Properties { get; set; }
|
||||
|
||||
public byte[] GetBytes()
|
||||
{
|
||||
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(this));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,6 +6,10 @@
|
|||
<LangVersion>latest</LangVersion>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Microsoft.Azure.IoT.EdgeCompose\Microsoft.Azure.IoT.EdgeCompose.csproj" />
|
||||
</ItemGroup>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
{
|
||||
"appSettings": {
|
||||
},
|
||||
"deviceConnectionString": "HostName=iotedgedev-iothub-7389d7.azure-devices.net;DeviceId=iotedgedev-edgedevice;SharedAccessKey=oICqhBx3j0dyayGl/SchnK4NDFz2py7TfzWGORix7yQ=",
|
||||
"iotHubConnectionString": "HostName=iotedgedev-iothub-7389d7.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=68K3zvI8CKbN7NM5s46N9rlP+zvPHPJKg7shy1rwMRU=",
|
||||
"deviceId": "iotedgedev-edgedevice-new",
|
||||
"RuntimeLogLevel": "info"
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче