Updated Publisher to new IoT Edge architecture and now using official IoT Device SDK, now that it supports .NetCore.

This commit is contained in:
Erich Barnstedt 2017-06-30 18:38:14 -07:00
Родитель d68c3bfa5c
Коммит fdab87759f
30 изменённых файлов: 250 добавлений и 1568 удалений

Просмотреть файл

@ -1,49 +0,0 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.13
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{0E241D3D-7C66-4E4F-99AB-9FF5780180D1}"
ProjectSection(SolutionItems) = preProject
.travis.yml = .travis.yml
appveyor.yml = appveyor.yml
Dockerfile = Dockerfile
License.txt = License.txt
README.md = README.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GatewayApp.NetCore", "src\GatewayApp.NetCore\GatewayApp.NetCore.csproj", "{592A483D-5F91-400E-B4EF-C25092143A4F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Opc.Ua.Publisher.Module", "src\Opc.Ua.Publisher.Module\Opc.Ua.Publisher.Module.csproj", "{A1CC3B57-A186-4C3E-9E36-30A54A947369}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "IoTHubCredentialTools", "src\IoTHubCredentialTools\IoTHubCredentialTools.csproj", "{2605E447-DF6E-4FB8-B226-CD1DB643F186}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Opc.Ua.IoTHub.Module", "src\Opc.Ua.IoTHub.Module\Opc.Ua.IoTHub.Module.csproj", "{B7B97F4D-B753-4E1B-A745-2E99B9D11D74}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{592A483D-5F91-400E-B4EF-C25092143A4F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{592A483D-5F91-400E-B4EF-C25092143A4F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{592A483D-5F91-400E-B4EF-C25092143A4F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{592A483D-5F91-400E-B4EF-C25092143A4F}.Release|Any CPU.Build.0 = Release|Any CPU
{A1CC3B57-A186-4C3E-9E36-30A54A947369}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A1CC3B57-A186-4C3E-9E36-30A54A947369}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A1CC3B57-A186-4C3E-9E36-30A54A947369}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A1CC3B57-A186-4C3E-9E36-30A54A947369}.Release|Any CPU.Build.0 = Release|Any CPU
{2605E447-DF6E-4FB8-B226-CD1DB643F186}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2605E447-DF6E-4FB8-B226-CD1DB643F186}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2605E447-DF6E-4FB8-B226-CD1DB643F186}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2605E447-DF6E-4FB8-B226-CD1DB643F186}.Release|Any CPU.Build.0 = Release|Any CPU
{B7B97F4D-B753-4E1B-A745-2E99B9D11D74}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B7B97F4D-B753-4E1B-A745-2E99B9D11D74}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B7B97F4D-B753-4E1B-A745-2E99B9D11D74}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B7B97F4D-B753-4E1B-A745-2E99B9D11D74}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal

Просмотреть файл

@ -1,44 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<AssemblyName>GatewayApp.NetCore</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>GatewayApp.NetCore</PackageId>
<RuntimeIdentifiers>win10-x64;win81-x64;win8-x64;win7-x64;debian.8-x64;ubuntu.16.04-x64</RuntimeIdentifiers>
<RuntimeFrameworkVersion>1.1.2</RuntimeFrameworkVersion>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>true</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<Description />
<Company>Microsoft</Company>
</PropertyGroup>
<ItemGroup>
<None Remove="gatewayconfig.json" />
<None Remove="publishednodes.json" />
</ItemGroup>
<ItemGroup>
<Content Include="gatewayconfig.json">
<CopyToPublishDirectory>PreserveNewest</CopyToPublishDirectory>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<Content Include="publishednodes.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Opc.Ua.IoTHub.Module\Opc.Ua.IoTHub.Module.csproj" />
<ProjectReference Include="..\Opc.Ua.Publisher.Module\Opc.Ua.Publisher.Module.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices.Gateway.Native.Debian.x64" Version="1.1.1" />
<PackageReference Include="Microsoft.Azure.Devices.Gateway.Native.Ubuntu.x64" Version="1.1.1" />
<PackageReference Include="Microsoft.Azure.Devices.Gateway.Native.Windows.x64" Version="1.1.2" />
</ItemGroup>
</Project>

Просмотреть файл

@ -1,124 +0,0 @@

using System;
using System.IO;
using Microsoft.Azure.Devices.Gateway;
using IoTHubCredentialTools;
using System.Runtime.InteropServices;
using System.Collections.Generic;
namespace GatewayApp.NetCore
{
public class Program
{
public static void Main(string[] args)
{
// check for OSX, which we don't support
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
throw new NotSupportedException("OSX is not supported by the Gateway App on .Net Core");
}
// patch IoT Hub module DLL name
string gatewayConfigFile = "gatewayconfig.json";
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
Console.WriteLine("Target system is Linux.");
File.WriteAllText(gatewayConfigFile, File.ReadAllText(gatewayConfigFile).Replace("iothub.dll", "libiothub.so"));
}
else
{
Console.WriteLine("Target system is Windows.");
File.WriteAllText(gatewayConfigFile, File.ReadAllText(gatewayConfigFile).Replace("libiothub.so", "iothub.dll"));
}
Console.WriteLine(RuntimeInformation.OSDescription);
// print target system info
if (IsX64Process())
{
Console.WriteLine("Target system is 64-bit.");
}
else
{
Console.WriteLine("Target system is 32-bit.");
throw new Exception("32-bit systems are currently not supported.");
}
// check if we got command line arguments to patch our gateway config file and register ourselves with IoT Hub
if ((args.Length > 0) && !string.IsNullOrEmpty(args[0]))
{
string applicationName = args[0];
File.WriteAllText(gatewayConfigFile, File.ReadAllText(gatewayConfigFile).Replace("<ReplaceWithYourApplicationName>", applicationName));
Console.WriteLine("Gateway config file patched with application name: " + applicationName);
// check if we also received an owner connection string to register ourselves with IoT Hub
if ((args.Length > 1) && !string.IsNullOrEmpty(args[1]))
{
string ownerConnectionString = args[1];
Console.WriteLine("Attemping to register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
string deviceConnectionString = IoTHubRegistration.RegisterDeviceWithIoTHub(applicationName, ownerConnectionString);
if (!string.IsNullOrEmpty(deviceConnectionString))
{
SecureIoTHubToken.Write(applicationName, deviceConnectionString);
}
else
{
Console.WriteLine("Could not register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
}
}
else
{
Console.WriteLine("IoT Hub owner connection string not passed as argument, registration with IoT Hub abandoned.");
}
// try to read connection string from secure store and patch gateway config file
Console.WriteLine("Attemping to read connection string from secure store with certificate name: " + applicationName);
string connectionString = SecureIoTHubToken.Read(applicationName);
if (!string.IsNullOrEmpty(connectionString))
{
Console.WriteLine("Attemping to configure publisher with connection string: " + connectionString);
string[] parsedConnectionString = IoTHubRegistration.ParseConnectionString(connectionString, true);
if ((parsedConnectionString != null) && (parsedConnectionString.Length == 3))
{
string _IoTHubName = parsedConnectionString[0];
if (_IoTHubName.Contains("."))
{
_IoTHubName = _IoTHubName.Substring(0, _IoTHubName.IndexOf('.'));
}
File.WriteAllText(gatewayConfigFile, File.ReadAllText(gatewayConfigFile).Replace("<ReplaceWithYourIoTHubName>", _IoTHubName));
Console.WriteLine("Gateway config file patched with IoT Hub name: " + _IoTHubName);
}
else
{
throw new Exception("Could not parse persisted device connection string!");
}
}
else
{
Console.WriteLine("Device connection string not found in secure store.");
}
}
else
{
Console.WriteLine("Application name not passed as argument, patching gateway config file abandoned");
}
IntPtr gateway = GatewayInterop.CreateFromJson(gatewayConfigFile);
if (gateway != IntPtr.Zero)
{
Console.WriteLine(".NET Core Gateway is running. Press enter to quit.");
Console.ReadLine();
GatewayInterop.Destroy(gateway);
}
else
{
Console.WriteLine(".NET Core Gateway failed to initialize. Please make sure you have published the GatewayApp.NetCore app to make sure the depend DLLs are available!");
}
}
private static bool IsX64Process()
{
return (IntPtr.Size == 8);
}
}
}

Просмотреть файл

@ -1,14 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
This file is used by the publish/package process of your Web project. You can customize the behavior of this process
by editing this MSBuild file. In order to learn more about this please visit https://go.microsoft.com/fwlink/?LinkID=208121.
-->
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<PublishProtocol>FileSystem</PublishProtocol>
<Configuration>Debug</Configuration>
<TargetFramework>netcoreapp1.1</TargetFramework>
<PublishDir>bin\Debug\netcoreapp1.1\</PublishDir>
<RuntimeIdentifier>win10-x64</RuntimeIdentifier>
</PropertyGroup>
</Project>

Просмотреть файл

@ -1,9 +0,0 @@
{
"profiles": {
"GatewayApp.NetCore": {
"commandName": "Project",
"commandLineArgs": "myapp",
"workingDirectory": ".\\bin\\Debug\\netcoreapp1.1"
}
}
}

Просмотреть файл

@ -1,38 +0,0 @@
{
"modules": [
{
"name": "OpcUa",
"loader": {
"name": "dotnetcore",
"entrypoint": {
"assembly.name": "Opc.Ua.Publisher.Module",
"entry.type": "Opc.Ua.Publisher.Module"
}
},
"args": {
"Configuration": {
"ApplicationName": "<ReplaceWithYourApplicationName>",
"ApplicationType": "ClientAndServer",
"ApplicationUri": "urn:localhost:microsoft:publisher"
}
}
},
{
"name": "IoTHub",
"loader": {
"name": "dotnetcore",
"entrypoint": {
"assembly.name": "Opc.Ua.IoTHub.Module",
"entry.type": "Opc.Ua.IoTHub.Module"
}
},
"args": "<ReplaceWithYourApplicationName>"
}
],
"links": [
{
"source": "OpcUa",
"sink": "IoTHub"
}
]
}

Просмотреть файл

@ -1,21 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<AssemblyName>IoTHubCredentialTools</AssemblyName>
<PackageId>IoTHubCredentialTools</PackageId>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>true</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<Description />
<Company>Microsoft</Company>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Portable.BouncyCastle" Version="1.8.1.2" />
<PackageReference Include="System.Collections.NonGeneric" Version="4.3.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -1,175 +0,0 @@

using System;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace IoTHubCredentialTools
{
public class IoTHubRegistration
{
public const string _IoTHubAPIVersion = "?api-version=2016-11-14";
/// <summary>
/// Returns an array of the parsed parts of a connection string
/// </summary>
public static string[] ParseConnectionString(string connectionString, bool isDevice)
{
string[] connectionStringParts = connectionString.Split(';');
if (connectionStringParts.Length == 3)
{
if (connectionStringParts[0].StartsWith("HostName="))
{
connectionStringParts[0] = connectionStringParts[0].Substring(connectionStringParts[0].IndexOf('=') + 1);
}
else
{
return null;
}
if (connectionStringParts[1].StartsWith("DeviceId=") && (isDevice == true))
{
connectionStringParts[1] = connectionStringParts[1].Substring(connectionStringParts[1].IndexOf('=') + 1);
}
else if (connectionStringParts[1].StartsWith("SharedAccessKeyName=") && (isDevice == false))
{
connectionStringParts[1] = connectionStringParts[1].Substring(connectionStringParts[1].IndexOf('=') + 1);
}
else
{
return null;
}
if (connectionStringParts[2].StartsWith("SharedAccessKey="))
{
connectionStringParts[2] = connectionStringParts[2].Substring(connectionStringParts[2].IndexOf('=') + 1);
}
else
{
return null;
}
return connectionStringParts;
}
else
{
return null;
}
}
/// <summary>
/// Creates a device in the IoT Hub device registry using the IoT Hub REST API
/// </summary>
public static async Task<string> CreateDeviceInIoTHubDeviceRegistry(HttpClient httpClient, string deviceName)
{
// check if device already registered
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "/devices/" + deviceName + _IoTHubAPIVersion);
HttpResponseMessage response = await httpClient.SendAsync(request).ConfigureAwait(false);
if (response.IsSuccessStatusCode)
{
// already registered, delete existing device first
request = new HttpRequestMessage(HttpMethod.Delete, "/devices/" + deviceName + _IoTHubAPIVersion);
request.Headers.IfMatch.Add(new EntityTagHeaderValue("\"*\""));
response = await httpClient.SendAsync(request).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
throw new Exception("Delete device failed with " + response.Content.ReadAsStringAsync().Result);
}
}
// now create a new one
string jsonMessage = "{\"deviceId\": \"" + deviceName + "\"}";
request = new HttpRequestMessage(HttpMethod.Put, "/devices/" + deviceName + _IoTHubAPIVersion)
{
Content = new StringContent(jsonMessage, Encoding.ASCII, "application/json")
};
response = await httpClient.SendAsync(request).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
throw new Exception("Create device failed with " + response.Content.ReadAsStringAsync().Result);
}
string result = response.Content.ReadAsStringAsync().Result;
if (result.Contains("primaryKey"))
{
const string keyIdentifier = "\"primaryKey\":\"";
const int keylength = 44;
return result.Substring(result.IndexOf(keyIdentifier) + keyIdentifier.Length, keylength);
}
else
{
throw new Exception("Could not find primary key in response: " + response.Content.ReadAsStringAsync().Result);
}
}
/// <summary>
/// Registers a device with IoT Hub
/// </summary>
public static string RegisterDeviceWithIoTHub(string deviceName, string IoTHubOwnerConnectionString)
{
string[] parsedConnectionString = ParseConnectionString(IoTHubOwnerConnectionString, false);
string deviceConnectionString = string.Empty;
if ((parsedConnectionString != null) && (parsedConnectionString.Length == 3))
{
string IoTHubName = parsedConnectionString[0];
string name = parsedConnectionString[1];
string accessToken = parsedConnectionString[2];
using (HttpClient httpClient = new HttpClient())
{
httpClient.BaseAddress = new UriBuilder { Scheme = "https", Host = IoTHubName }.Uri;
string sharedAccessSignature = GenerateSharedAccessToken(name, Convert.FromBase64String(accessToken), IoTHubName, 60000);
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("SharedAccessSignature", sharedAccessSignature);
deviceConnectionString = CreateDeviceInIoTHubDeviceRegistry(httpClient, deviceName.Replace(" ", "")).Result;
// prepend the rest of the connection string
deviceConnectionString = "HostName=" + IoTHubName + ";DeviceId=" + deviceName.Replace(" ", "") + ";SharedAccessKey=" + deviceConnectionString;
return deviceConnectionString;
}
}
else
{
throw new Exception("Could not parse IoT Hub owner connection string: " + IoTHubOwnerConnectionString);
}
}
/// <summary>
/// Sas token generation
/// </summary>
/// <param name="keyName"></param>
/// <param name="key"></param>
/// <param name="tokenScope"></param>
/// <param name="ttl"></param>
/// <returns>shared access token</returns>
public static string GenerateSharedAccessToken(string keyName, byte[] key, string tokenScope, int ttl)
{
// http://msdn.microsoft.com/en-us/library/azure/dn170477.aspx
// signature is computed from joined encoded request Uri string and expiry string
DateTime expiryTime = DateTime.UtcNow + TimeSpan.FromMilliseconds(ttl);
string expiry = ((long)(expiryTime - new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds).ToString();
string encodedScope = Uri.EscapeDataString(tokenScope);
string sig;
// the connection string signature is base64 encoded
using (var hmac = new HMACSHA256(key))
{
sig = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(encodedScope + "\n" + expiry)));
}
return string.Format(
"sr={0}&sig={1}&se={2}&skn={3}",
encodedScope,
Uri.EscapeDataString(sig),
Uri.EscapeDataString(expiry),
Uri.EscapeDataString(keyName)
);
}
}
}

Просмотреть файл

@ -0,0 +1,11 @@
************************* Logging started at 06/30/2017 18:35:20
18:35:20.513 Attemping to load nodes file from: C:\Users\erichb\Source\Repos\iot-gateway-opc-ua\src\publishednodes.json
18:35:20.823 Loaded 2 nodes.
18:35:20.825 Starting server on endpoint opc.tcp://myapp:62222/UA/Publisher...
18:35:21.379 Server: Session Monitor Thread Started.
18:35:21.411 Server: Publish Subscriptions Thread Started.
18:35:21.414 Server started.
18:35:21.414 Attemping to connect to servers...
18:35:21.415 Connecting to server: opc.tcp://myopcservername:51210/UA/SampleServer
18:35:21.435 Channel 0 in Connecting state.

Просмотреть файл

@ -1,8 +1,5 @@

using Newtonsoft.Json;
using System;
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Security.Cryptography.X509Certificates;
namespace Opc.Ua.Publisher
@ -10,26 +7,19 @@ namespace Opc.Ua.Publisher
/// <summary>
/// Module configuration object to deserialize / serialize
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class ModuleConfiguration
{
/// <summary>
/// Opc client configuration
/// </summary>
[JsonProperty]
public ApplicationConfiguration Configuration { get; set; }
/// <summary>
/// Called when the object is deserialized
/// </summary>
/// <param name="context"></param>
[OnDeserialized]
internal void OnDeserializedMethod(StreamingContext context)
public ModuleConfiguration(string applicationName)
{
// Validate configuration and set reasonable defaults
Configuration.ApplicationUri = Configuration.ApplicationUri.Replace("localhost", Utils.GetHostName());
// set reasonable defaults
Configuration = new ApplicationConfiguration();
Configuration.ApplicationName = applicationName;
Configuration.ApplicationUri = "urn:" + Utils.GetHostName() + ":microsoft:" + Configuration.ApplicationName;
Configuration.ApplicationType = ApplicationType.ClientAndServer;
Configuration.TransportQuotas = new TransportQuotas { OperationTimeout = 15000 };
Configuration.ClientConfiguration = new ClientConfiguration();
@ -120,13 +110,13 @@ namespace Opc.Ua.Publisher
ICertificateStore store = Configuration.SecurityConfiguration.TrustedPeerCertificates.OpenStore();
if (store == null)
{
Module.Trace("Could not open trusted peer store. StorePath={0}", Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath);
Program.Trace("Could not open trusted peer store. StorePath={0}", Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath);
}
else
{
try
{
Module.Trace(Utils.TraceMasks.Information, "Adding certificate to trusted peer store. StorePath={0}", Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath);
Program.Trace(Utils.TraceMasks.Information, "Adding certificate to trusted peer store. StorePath={0}", Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath);
X509Certificate2 publicKey = new X509Certificate2(certificate.RawData);
store.Add(publicKey).Wait();
}
@ -138,7 +128,7 @@ namespace Opc.Ua.Publisher
}
catch (Exception e)
{
Module.Trace(e, "Could not add certificate to trusted peer store. StorePath={0}", Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath);
Program.Trace(e, "Could not add certificate to trusted peer store. StorePath={0}", Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath);
}
// patch our base address

Просмотреть файл

Просмотреть файл

@ -1,599 +0,0 @@

using System;
using System.Text;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using System.IO;
using System.Net;
using System.Threading;
using System.Diagnostics;
using System.Collections.Generic;
using IoTHubCredentialTools;
namespace Opc.Ua.IoTHub
{
public class AmqpConnection
{
#region Serialized Configuration Properties
public string Host { get; set; }
public int Port { get; set; }
public string Endpoint { get; set; }
public string WebSocketEndpoint { get; set; }
public string KeyName { get; set; }
public string KeyValue { get; set; }
public string KeyEncoding { get; set; }
public bool UseCbs { get; set; }
public string TokenType { get; set; }
public string TokenScope { get; set; }
public int TokenLifetime { get; set; }
#endregion
#region Private members
private Connection m_connection;
private Session m_session;
private SenderLink m_link;
private LinkedList<ArraySegment<byte>> messages;
private DateTime m_currentExpiryTime;
private Timer m_tokenRenewalTimer;
private bool m_closed;
private int m_sendCounter;
private int m_sendAcceptedCounter;
private int m_sendRejectedCounter;
private object m_sending;
private int m_sendallthreads;
#endregion
#region Constructor
/// <summary>
/// Default Constructor
/// </summary>
public AmqpConnection()
{
Initialize();
}
/// <summary>
/// Initialize the connection class
/// </summary>
private void Initialize()
{
m_connection = null;
m_session = null;
m_link = null;
m_closed = true;
m_sending = new object();
m_sendallthreads = 0;
messages = new LinkedList<ArraySegment<byte>>();
m_sendCounter = 0;
m_sendAcceptedCounter = 0;
m_sendRejectedCounter = 0;
}
#endregion
/// <summary>
/// Open the connection
/// </summary>
public async Task OpenAsync()
{
// make sure only one SendAll or OpenAsync task is active
if (Interlocked.Increment(ref m_sendallthreads) != 1)
{
Interlocked.Decrement(ref m_sendallthreads);
return;
}
try
{
Close();
ConnectionFactory factory = new ConnectionFactory();
factory.AMQP.ContainerId = Guid.NewGuid().ToString();
if (UseCbs)
{
factory.SASL.Profile = SaslProfile.External;
}
m_connection = await factory.CreateAsync(GetAddress());
m_connection.Closed += new ClosedCallback(OnConnectionClosed);
if (UseCbs && KeyName != null && KeyValue != null)
{
await StartCbs();
}
else
{
await ResetLinkAsync();
}
Module.Trace("AMQP Connection opened, connected to '{0}'...", Endpoint);
m_closed = false;
}
catch (Exception e)
{
Module.Trace("AMQP Connection failed to open, exception: {0}...", e.Message);
}
finally
{
Interlocked.Decrement(ref m_sendallthreads);
}
// Push out the messages we have so far
SendAll();
}
/// <summary>
/// Publish a JSON message
/// </summary>
/// <param name="body"></param>
public void Publish(ArraySegment<byte> body)
{
lock (messages)
{
messages.AddLast(body);
}
if (IsClosed())
{
Task.Run(OpenAsync);
}
else
{
// Push out the messages we have so far
Task.Run(new Action(SendAll));
}
}
/// <summary>
/// Work until all messages have been send...
/// </summary>
protected void SendAll()
{
// make sure only one send all task is active
if (Interlocked.Increment(ref m_sendallthreads) != 1)
{
Interlocked.Decrement(ref m_sendallthreads);
return;
}
try
{
while (!IsClosed())
{
ArraySegment<byte> onemessage;
lock (messages)
{
if (messages.Count == 0)
{
break;
}
onemessage = messages.First.Value;
messages.RemoveFirst();
}
bool sent;
lock (m_sending)
{
sent = SendOneAsync(onemessage, SendAllCallback);
}
if (!sent)
{
lock (messages)
{
messages.AddFirst(onemessage);
}
}
}
}
catch (Exception)
{
Close();
}
finally
{
Interlocked.Decrement(ref m_sendallthreads);
}
}
/// <summary>
/// Send outcome callback
/// </summary>
/// <param name="body"></param>
/// <returns>Whether message was sent</returns>
void SendAllCallback(Message message, Outcome outcome, object state)
{
if (outcome.Descriptor.Code == 36)
{
// accepted
m_sendAcceptedCounter++;
}
else
{
// rejected or other fail reason
m_sendRejectedCounter++;
}
if (((m_sendRejectedCounter + m_sendAcceptedCounter) % 100) == 0)
{
Module.Trace("Send Statistics: {0} sent {1} accepted {2} rejected",
m_sendCounter, m_sendAcceptedCounter, m_sendRejectedCounter);
}
}
/// <summary>
/// Send message
/// </summary>
/// <param name="body"></param>
/// <returns>Whether message was sent</returns>
protected bool SendOneAsync(ArraySegment<byte> body, OutcomeCallback SendCallback)
{
if (IsClosed())
{
return false;
}
using (var istrm = new MemoryStream(body.Array, body.Offset, body.Count, false))
{
Message message = new Message()
{
BodySection = new Data() { Binary = istrm.ToArray() }
};
message.Properties = new Properties()
{
MessageId = Guid.NewGuid().ToString(),
ContentType = "application/opcua+json"
};
if (m_link != null)
{
m_sendCounter++;
m_link.Send(message, SendCallback, null);
return true;
}
}
return false;
}
/// <summary>
/// Close and therefore dispose of all resources
/// </summary>
public void Close()
{
m_closed = true;
if (m_tokenRenewalTimer != null)
{
m_tokenRenewalTimer.Dispose();
m_tokenRenewalTimer = null;
}
Dispose(true);
}
/// <summary>
/// is the connection closed?
/// </summary>
/// <returns>true or false</returns>
public bool IsClosed()
{
return m_closed;
}
/// <summary>
/// Destructor
/// </summary>
public void Dispose()
{
Dispose(true);
}
/// <summary>
/// Close all resources
/// </summary>
/// <param name="disposing"></param>
public virtual void Dispose(bool disposing)
{
if (disposing)
{
if (m_tokenRenewalTimer != null)
{
m_tokenRenewalTimer.Dispose();
m_tokenRenewalTimer = null;
}
if (m_link != null)
{
try
{
m_link.Close(3000);
}
catch(Exception)
{
}
m_link = null;
}
if (m_session != null)
{
try
{
m_session.Close(3000);
}
catch (Exception)
{
}
m_session = null;
}
if (m_connection != null)
{
try
{
m_connection.Close(3000);
}
catch (Exception)
{
}
m_connection = null;
}
}
}
/// <summary>
/// Returns the amqp.net lite broker address to connect to.
/// </summary>
/// <returns>Address to connect to</returns>
protected Address GetAddress()
{
if (Port == 0)
{
// Set default port
if (WebSocketEndpoint != null)
Port = 443;
else
Port = 5671;
}
if (WebSocketEndpoint != null)
{
return new Address(Host, Port, null, null, WebSocketEndpoint, "wss");
}
else if (UseCbs)
{
return new Address(Host, Port);
}
else
{
return new Address(Host, Port, KeyName.Trim(), KeyValue.Trim());
}
}
/// <summary>
/// Start cbs protocol on the underlying connection
/// </summary>
/// <returns>Task to wait on</returns>
protected async Task StartCbs()
{
if (m_connection == null)
{
throw new Exception("No connection to run cbs renewal on!");
}
if (TokenType == null || TokenScope == null)
{
throw new Exception("Must specifiy token scope and type");
}
if (TokenLifetime == 0)
{
TokenLifetime = 60000;
}
// Ensure we have a token
await RenewTokenAsync(GenerateSharedAccessToken());
// then start the periodic renewal
int interval = (int)(TokenLifetime * 0.8);
m_tokenRenewalTimer = new Timer(OnTokenRenewal, null, interval, interval);
}
/// <summary>
/// Return decoded key from configured key value
/// </summary>
/// <returns>decoded key</returns>
protected byte[] DecodeKey()
{
if (!KeyEncoding.Equals("base64", StringComparison.CurrentCultureIgnoreCase))
{
return Encoding.UTF8.GetBytes(KeyValue.Trim());
}
else
{
return Convert.FromBase64String(KeyValue.Trim());
}
}
/// <summary>
/// Generate token for member values
/// </summary>
/// <returns>Token string</returns>
protected string GenerateSharedAccessToken()
{
m_currentExpiryTime = DateTime.UtcNow + TimeSpan.FromMilliseconds(TokenLifetime);
return "SharedAccessSignature " + IoTHubRegistration.GenerateSharedAccessToken(KeyName.Trim(), DecodeKey(), TokenScope.Trim(), TokenLifetime);
}
/// <summary>
/// Callback for connection close events
/// </summary>
/// <param name="sender"></param>
/// <param name="error"></param>
protected virtual void OnConnectionClosed(AmqpObject sender, Error error)
{
if (error != null)
{
Debug.WriteLine("Connection Closed {0} {1}", error.Condition, error.Description);
}
m_closed = true;
}
/// <summary>
/// Callback for session close event
/// </summary>
/// <param name="sender"></param>
/// <param name="error"></param>
protected virtual void OnSessionClosed(AmqpObject sender, Error error)
{
if (error != null)
{
Debug.WriteLine("Session Closed {0} {1}", error.Condition, error.Description);
}
}
/// <summary>
/// Callback for link close events
/// </summary>
/// <param name="sender"></param>
/// <param name="error"></param>
protected virtual void OnLinkClosed(AmqpObject sender, Error error)
{
if (error != null)
{
Debug.WriteLine("Link Closed {0} {1}", error.Condition, error.Description);
}
}
/// <summary>
/// Timer callback for token renewal
/// </summary>
/// <param name="state"></param>
private void OnTokenRenewal(object state)
{
try
{
lock (m_sending)
{
bool result = RenewTokenAsync(GenerateSharedAccessToken()).Wait(TokenLifetime);
if (!result)
{
Module.Trace("Unexpected timeout error renewing token.");
}
}
}
catch (Exception e)
{
Module.Trace(e, "Unexpected error renewing token.");
if (e is AggregateException ae)
{
foreach (var ie in ae.InnerExceptions)
{
Module.Trace("[{0}] {1}", ie.GetType().Name, ie.Message);
}
}
}
}
/// <summary>
/// Reset the link and session
/// </summary>
/// <returns>Task to wait on</returns>
private async Task ResetLinkAsync()
{
SenderLink link;
Session session;
session = new Session(m_connection);
session.Closed += new ClosedCallback(OnSessionClosed);
link = new SenderLink(session, Guid.NewGuid().ToString(), Endpoint);
link.Closed += new ClosedCallback(OnLinkClosed);
if (m_link != null)
{
await m_link.CloseAsync();
}
if (m_session != null)
{
await m_session.CloseAsync();
}
m_session = session;
m_link = link;
}
/// <summary>
/// renews the cbs token
/// </summary>
/// <param name="sharedAccessToken">token to renew</param>
/// <returns>Task to wait on</returns>
private async Task RenewTokenAsync(string sharedAccessToken)
{
var session = new Session(m_connection);
string cbsClientAddress = "cbs-client-reply-to";
var cbsSender = new SenderLink(session, "cbs-sender", "$cbs");
var receiverAttach = new Attach()
{
Source = new Source() { Address = "$cbs" },
Target = new Target() { Address = cbsClientAddress }
};
var cbsReceiver = new ReceiverLink(session, "cbs-receiver", receiverAttach, null);
// construct the put-token message
var request = new Message(sharedAccessToken);
request.Properties = new Properties();
request.Properties.MessageId = "1";
request.Properties.ReplyTo = cbsClientAddress;
request.ApplicationProperties = new ApplicationProperties();
request.ApplicationProperties["operation"] = "put-token";
request.ApplicationProperties["type"] = TokenType;
request.ApplicationProperties["name"] = TokenScope.Trim();
await cbsSender.SendAsync(request);
// receive the response
var response = await cbsReceiver.ReceiveAsync();
if (response == null || response.Properties == null || response.ApplicationProperties == null)
{
throw new Exception("invalid response received");
}
int statusCode = (int)response.ApplicationProperties["status-code"];
await cbsSender.CloseAsync();
await cbsReceiver.CloseAsync();
await session.CloseAsync();
if (statusCode != (int)HttpStatusCode.Accepted && statusCode != (int)HttpStatusCode.OK)
{
throw new Exception("put-token message was not accepted. Error code: " + statusCode);
}
// Now create new link
await ResetLinkAsync();
}
}
}

Просмотреть файл

@ -1,137 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using IoTHubCredentialTools;
using Microsoft.Azure.Devices.Gateway;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
namespace Opc.Ua.IoTHub
{
/// <summary>
/// Gateway module that acts as IoT Hub connectivity
/// </summary>
public class Module : IGatewayModule
{
private static AmqpConnection m_publisher = new AmqpConnection();
private static StreamWriter m_trace = null;
/// <summary>
/// Trace message helper
/// </summary>
public static void Trace(string message, params object[] args)
{
m_trace.WriteLine(message, args);
Console.WriteLine(message, args);
}
public static void Trace(int traceMask, string format, params object[] args)
{
m_trace.WriteLine(format, args);
Console.WriteLine(format, args);
}
public static void Trace(Exception e, string format, params object[] args)
{
m_trace.WriteLine(e.ToString());
m_trace.WriteLine(format, args);
Console.WriteLine(e.ToString());
Console.WriteLine(format, args);
}
/// <summary>
/// Create module, throws if configuration is bad
/// </summary>
public void Create(Broker broker, byte[] configuration)
{
string appName = Encoding.UTF8.GetString(configuration).Replace("\"","");
// enable logging
m_trace = new StreamWriter(File.Open("./Logs/" + appName + ".IoTHub.Module.log.txt", FileMode.Create));
Trace("Opc.Ua.IoTHub.Module: Creating...");
// configure connection
try
{
ConfigureAMQPConnectionToIoTHub(appName).Wait();
}
catch (Exception ex)
{
Module.Trace(ex, "Failed to configure AMQP connection, dropping....");
}
Trace("Opc.Ua.IoTHub.Module: Created.");
}
/// <summary>
/// Disconnect all sessions
/// </summary>
public void Destroy()
{
m_publisher.Close();
Trace("Opc.Ua.IoTHub.Module: Closed.");
m_trace.Flush();
m_trace.Dispose();
}
/// <summary>
/// Receive message from broker
/// </summary>
public void Receive(Message received_message)
{
try
{
if (!m_publisher.IsClosed())
{
m_publisher.Publish(new ArraySegment<byte>(received_message.Content));
}
}
catch (Exception ex)
{
Module.Trace(ex, "Failed to publish message, dropping....");
}
}
/// <summary>
/// Publish message to bus
/// </summary>
public static void Publish(Message message)
{
// NO-OP
}
/// <summary>
/// Configures the AMQP (telemetry) connection to IoT Hub
/// </summary>
public static async Task ConfigureAMQPConnectionToIoTHub(string appName)
{
Trace("Opc.Ua.IoTHub.Module: Attemping to read connection string from secure store with certificate name: " + appName);
string connectionString = SecureIoTHubToken.Read(appName);
Trace("Opc.Ua.IoTHub.Module: Attemping to configure IoTHub module with connection string: " + connectionString);
string[] parsedConnectionString = IoTHubRegistration.ParseConnectionString(connectionString, true);
string IoTHubName = parsedConnectionString[0];
string deviceName = parsedConnectionString[1];
string accessToken = parsedConnectionString[2];
m_publisher.Endpoint = "/devices/" + deviceName + "/messages/events";
m_publisher.WebSocketEndpoint = null; // not used
m_publisher.Host = IoTHubName;
m_publisher.Port = 0; // use default
m_publisher.KeyName = ""; // not used
m_publisher.KeyValue = accessToken;
m_publisher.KeyEncoding = "Base64";
m_publisher.UseCbs = true;
m_publisher.TokenType = "servicebus.windows.net:sastoken";
m_publisher.TokenScope = m_publisher.Host + "/devices/" + deviceName;
m_publisher.TokenLifetime = 60000;
await m_publisher.OpenAsync();
}
}
}

Просмотреть файл

@ -1,27 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<DebugType>portable</DebugType>
<AssemblyName>Opc.Ua.IoTHub.Module</AssemblyName>
<PackageId>Opc.Ua.IoTHub.Module</PackageId>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>true</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<Description />
<Company>Microsoft</Company>
<RootNamespace>Opc.Ua.IoTHub</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="1.2.3" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.SDK" Version="0.2.4" />
<PackageReference Include="Microsoft.Azure.Devices.Gateway.Module.NetStandard" Version="1.0.4" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\IoTHubCredentialTools\IoTHubCredentialTools.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -1,30 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<DebugType>portable</DebugType>
<AssemblyName>Opc.Ua.Publisher.Module</AssemblyName>
<PackageId>Opc.Ua.Publisher.Module</PackageId>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>true</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<Description />
<Company>Microsoft</Company>
<RootNamespace>Opc.Ua.Publisher</RootNamespace>
</PropertyGroup>
<ItemGroup>
<EmbeddedResource Include="Publisher.PredefinedNodes.uanodes" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\IoTHubCredentialTools\IoTHubCredentialTools.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices.Gateway.Module.NetStandard" Version="1.0.4" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.SDK" Version="0.2.4" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<AssemblyName>Opc.Ua.Publisher</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Opc.Ua.Publisher</PackageId>
<RuntimeFrameworkVersion>1.1.2</RuntimeFrameworkVersion>
<GenerateAssemblyCompanyAttribute>true</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<Description />
<Company>Microsoft</Company>
<Version>1.1.2</Version>
</PropertyGroup>
<ItemGroup>
<None Remove="publishednodes.json" />
</ItemGroup>
<ItemGroup>
<Content Include="publishednodes.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Publisher.PredefinedNodes.uanodes" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices" Version="1.2.8" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.2.13" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.SDK" Version="0.2.4" />
</ItemGroup>
</Project>

22
src/Opc.Ua.Publisher.sln Normal file
Просмотреть файл

@ -0,0 +1,22 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.14
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Opc.Ua.Publisher", "Opc.Ua.Publisher.csproj", "{EAC47E1C-39F4-4E51-A241-88432552D461}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{EAC47E1C-39F4-4E51-A241-88432552D461}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EAC47E1C-39F4-4E51-A241-88432552D461}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EAC47E1C-39F4-4E51-A241-88432552D461}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EAC47E1C-39F4-4E51-A241-88432552D461}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal

Просмотреть файл

@ -1,38 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using IoTHubCredentialTools;
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Opc.Ua.Client;
using Publisher;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Text;
using Newtonsoft.Json;
using Microsoft.Azure.Devices.Gateway;
using System.Net.Http;
using System.Net.Http.Headers;
using Publisher;
using Opc.Ua.Client;
using IoTHubCredentialTools;
using System.IO;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace Opc.Ua.Publisher
{
/// <summary>
/// Gateway module that acts as Opc.Ua Publisher and Server
/// </summary>
public class Module : IGatewayModule, IGatewayModuleStart
public class Program
{
public static ApplicationConfiguration m_configuration = null;
public static List<Session> m_sessions = new List<Session>();
public static PublishedNodesCollection m_nodesLookups = new PublishedNodesCollection();
public static List<Uri> m_endpointUrls = new List<Uri>();
public static string m_deviceName = string.Empty;
public static string m_accessKey = string.Empty;
public static string m_applicationName = string.Empty;
public static DeviceClient m_deviceClient = null;
private static Broker m_broker = null;
private PublisherServer m_server = new PublisherServer();
private const string m_IoTHubAPIVersion = "?api-version=2016-11-14";
private static PublisherServer m_server = new PublisherServer();
/// <summary>
/// Trace message helper
@ -56,29 +46,79 @@ namespace Opc.Ua.Publisher
Console.WriteLine(format, args);
}
/// <summary>
/// Create module, throws if configuration is bad
/// </summary>
public void Create(Broker broker, byte[] configuration)
public static void Main(string[] args)
{
Trace("Opc.Ua.Publisher.Module: Creating...");
m_broker = broker;
string configString = Encoding.UTF8.GetString(configuration);
// Deserialize from configuration string
ModuleConfiguration moduleConfiguration = null;
try
if ((args.Length == 0) || string.IsNullOrEmpty(args[0]))
{
moduleConfiguration = JsonConvert.DeserializeObject<ModuleConfiguration>(configString);
}
catch (Exception ex)
{
Trace("Opc.Ua.Publisher.Module: Module config string " + configString + " could not be deserialized: " + ex.Message);
throw;
Console.WriteLine("Please specify an application name as argument!");
return;
}
m_applicationName = args[0];
string ownerConnectionString = string.Empty;
// check if we also received an owner connection string
if ((args.Length > 1) && !string.IsNullOrEmpty(args[1]))
{
ownerConnectionString = args[1];
}
else
{
Trace("IoT Hub owner connection string not passed as argument.");
// check if we have an environment variable to register ourselves with IoT Hub
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_HUB_CS")))
{
ownerConnectionString = Environment.GetEnvironmentVariable("_HUB_CS");
}
}
// register ourselves with IoT Hub
if (ownerConnectionString != string.Empty)
{
Trace("Attemping to register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
RegistryManager manager = RegistryManager.CreateFromConnectionString(ownerConnectionString);
// remove any existing device
Device existingDevice = manager.GetDeviceAsync(m_applicationName).Result;
if (existingDevice != null)
{
manager.RemoveDeviceAsync(m_applicationName).Wait();
}
Device newDevice = manager.AddDeviceAsync(new Device(m_applicationName)).Result;
if (newDevice != null)
{
string hostname = ownerConnectionString.Substring(0, ownerConnectionString.IndexOf(";"));
string deviceConnectionString = hostname + ";DeviceId=" + m_applicationName + ";SharedAccessKey=" + newDevice.Authentication.SymmetricKey.PrimaryKey;
SecureIoTHubToken.Write(m_applicationName, deviceConnectionString);
}
else
{
Trace("Could not register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
}
}
else
{
Trace("IoT Hub owner connection string not found, registration with IoT Hub abandoned.");
}
// try to read connection string from secure store and open IoTHub client
Trace("Attemping to read connection string from secure store with certificate name: " + m_applicationName);
string connectionString = SecureIoTHubToken.Read(m_applicationName);
if (!string.IsNullOrEmpty(connectionString))
{
Trace("Attemping to configure publisher with connection string: " + connectionString);
m_deviceClient = DeviceClient.CreateFromConnectionString(connectionString, Microsoft.Azure.Devices.Client.TransportType.Mqtt);
m_deviceClient.RetryPolicy = RetryPolicyType.Exponential_Backoff_With_Jitter;
m_deviceClient.OpenAsync().Wait();
}
else
{
Trace("Device connection string not found in secure store.");
}
ModuleConfiguration moduleConfiguration = new ModuleConfiguration(m_applicationName);
m_configuration = moduleConfiguration.Configuration;
m_configuration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_CertificateValidation);
@ -99,15 +139,15 @@ namespace Opc.Ua.Publisher
publishedNodesFilePath = Environment.GetEnvironmentVariable("_GW_PNFP");
}
Trace("Opc.Ua.Publisher.Module: Attemping to load nodes file from: " + publishedNodesFilePath);
Trace("Attemping to load nodes file from: " + publishedNodesFilePath);
m_nodesLookups = JsonConvert.DeserializeObject<PublishedNodesCollection>(File.ReadAllText(publishedNodesFilePath));
Trace("Opc.Ua.Publisher.Module: Loaded " + m_nodesLookups.Count.ToString() + " nodes.");
Trace("Loaded " + m_nodesLookups.Count.ToString() + " nodes.");
}
catch (Exception ex)
{
Trace("Opc.Ua.Publisher.Module: Nodes file loading failed with: " + ex.Message);
Trace("Nodes file loading failed with: " + ex.Message);
}
foreach (NodeLookup nodeLookup in m_nodesLookups)
{
if (!m_endpointUrls.Contains(nodeLookup.EndPointURL))
@ -119,46 +159,23 @@ namespace Opc.Ua.Publisher
// start the server
try
{
Trace("Opc.Ua.Publisher.Module: Starting server on endpoint " + m_configuration.ServerConfiguration.BaseAddresses[0].ToString() + "...");
Trace("Starting server on endpoint " + m_configuration.ServerConfiguration.BaseAddresses[0].ToString() + "...");
m_server.Start(m_configuration);
Trace("Opc.Ua.Publisher.Module: Server started.");
Trace("Server started.");
}
catch (Exception ex)
{
Trace("Opc.Ua.Publisher.Module: Starting server failed with: " + ex.Message);
Trace("Starting server failed with: " + ex.Message);
}
// check if we have an environment variable to register ourselves with IoT Hub
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_HUB_CS")))
{
string ownerConnectionString = Environment.GetEnvironmentVariable("_HUB_CS");
if ((m_configuration != null) && (!string.IsNullOrEmpty(m_configuration.ApplicationName)))
{
Trace("Attemping to register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
string deviceConnectionString = IoTHubRegistration.RegisterDeviceWithIoTHub(m_configuration.ApplicationName, ownerConnectionString);
if (!string.IsNullOrEmpty(deviceConnectionString))
{
SecureIoTHubToken.Write(m_configuration.ApplicationName, deviceConnectionString);
}
else
{
Trace("Could not register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
}
}
}
// try to configure our publisher component
TryConfigurePublisherAsync().Wait();
// connect to servers
Trace("Opc.Ua.Publisher.Module: Attemping to connect to servers...");
Trace("Attemping to connect to servers...");
try
{
List<Task> connectionAttempts = new List<Task>();
foreach (Uri endpointUrl in m_endpointUrls)
{
Trace("Opc.Ua.Publisher.Module: Connecting to server: " + endpointUrl);
Trace("Connecting to server: " + endpointUrl);
connectionAttempts.Add(EndpointConnect(endpointUrl));
}
@ -167,105 +184,11 @@ namespace Opc.Ua.Publisher
}
catch (Exception ex)
{
Trace("Opc.Ua.Publisher.Module: Exception: " + ex.ToString() + "\r\n" + ex.InnerException != null ? ex.InnerException.ToString() : null);
Trace("Exception: " + ex.ToString() + "\r\n" + ex.InnerException != null ? ex.InnerException.ToString() : null);
}
Trace("Opc.Ua.Publisher.Module: Created.");
}
/// <summary>
/// Disconnect all sessions
/// </summary>
public void Destroy()
{
foreach (Session session in m_sessions)
{
session.Close();
}
Trace("Opc.Ua.Publisher.Module: All sessions closed.");
}
/// <summary>
/// Receive message from broker
/// </summary>
public void Receive(Message received_message)
{
// No-op
}
/// <summary>
/// Try to configure our Publisher settings
/// </summary>
public static async Task TryConfigurePublisherAsync()
{
// read connection string from secure store and configure publisher, if possible
if ((m_configuration != null) && (!string.IsNullOrEmpty(m_configuration.ApplicationName)))
{
Trace("Opc.Ua.Publisher.Module: Attemping to read connection string from secure store with certificate name: " + m_configuration.ApplicationName);
string connectionString = SecureIoTHubToken.Read(m_configuration.ApplicationName);
if (!string.IsNullOrEmpty(connectionString))
{
Trace("Opc.Ua.Publisher.Module: Attemping to configure publisher with connection string: " + connectionString);
string[] parsedConnectionString = IoTHubRegistration.ParseConnectionString(connectionString, true);
if ((parsedConnectionString != null) && (parsedConnectionString.Length == 3))
{
// note: IoTHub name can't be changed during runtime in the GW IoTHub module
string _IoTHubName = parsedConnectionString[0];
m_deviceName = parsedConnectionString[1];
m_accessKey = parsedConnectionString[2];
Trace("Opc.Ua.Publisher.Module: Publisher configured for device: " + m_deviceName);
// try to connect to IoT Hub
using (HttpClient httpClient = new HttpClient())
{
httpClient.BaseAddress = new UriBuilder { Scheme = "https", Host = _IoTHubName }.Uri;
string sharedAccessToken = IoTHubRegistration.GenerateSharedAccessToken(string.Empty, Convert.FromBase64String(m_accessKey), _IoTHubName, 60000);
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("SharedAccessSignature", sharedAccessToken);
// send an empty d2c message
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, "/devices/" + m_deviceName + "/messages/events" + IoTHubRegistration._IoTHubAPIVersion);
HttpResponseMessage response = await httpClient.SendAsync(request).ConfigureAwait(false);
if (response.StatusCode != HttpStatusCode.NoContent)
{
throw new Exception("Opc.Ua.Publisher.Module: Could not connect to IoT Hub. Response: " + response.ToString());
}
}
}
else
{
throw new Exception("Opc.Ua.Publisher.Module: Publisher configuration failed!");
}
}
else
{
Trace("Opc.Ua.Publisher.Module: Connection string not found in secure store.");
}
}
}
/// <summary>
/// Publish message to bus
/// </summary>
public static void Publish(Message message)
{
if (m_broker != null)
{
m_broker.Publish(message);
}
}
/// <summary>
/// Called when gateway starts, establishes the connections to endpoints
/// </summary>
public void Start()
{
Trace("Opc.Ua.Publisher.Module: Starting...");
// subscribe to preconfigured nodes
Trace("Opc.Ua.Publisher.Module: Attemping to subscribe to published nodes...");
Trace("Attemping to subscribe to published nodes...");
if (m_nodesLookups != null)
{
foreach (NodeLookup nodeLookup in m_nodesLookups)
@ -276,43 +199,20 @@ namespace Opc.Ua.Publisher
}
catch (Exception ex)
{
Trace("Opc.Ua.Publisher.Module: Unexpected error publishing node: " + ex.Message + "\r\nIgnoring node: " + nodeLookup.EndPointURL.AbsoluteUri + ", " + nodeLookup.NodeID.ToString());
Trace("Unexpected error publishing node: " + ex.Message + "\r\nIgnoring node: " + nodeLookup.EndPointURL.AbsoluteUri + ", " + nodeLookup.NodeID.ToString());
}
}
}
Trace("Opc.Ua.Publisher.Module: Started.");
}
/// <summary>
/// Registers ourselves with IoTHub so we can send messages to it
/// </summary>
private void SelfRegisterWithIoTHub(string ownerConnectionString)
{
string[] parsedConnectionString = IoTHubRegistration.ParseConnectionString(ownerConnectionString, false);
string deviceConnectionString = string.Empty;
if ((parsedConnectionString != null) && (parsedConnectionString.Length == 3))
Console.WriteLine("Publisher is running. Press enter to quit.");
Console.ReadLine();
foreach (Session session in m_sessions)
{
string IoTHubName = parsedConnectionString[0];
string name = parsedConnectionString[1];
string accessToken = parsedConnectionString[2];
using (HttpClient httpClient = new HttpClient())
{
httpClient.BaseAddress = new UriBuilder { Scheme = "https", Host = IoTHubName }.Uri;
string sharedAccessSignature = IoTHubRegistration.GenerateSharedAccessToken(name, Convert.FromBase64String(accessToken), IoTHubName, 60000);
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("SharedAccessSignature", sharedAccessSignature);
deviceConnectionString = IoTHubRegistration.CreateDeviceInIoTHubDeviceRegistry(httpClient, m_configuration.ApplicationName.Replace(" ", "")).Result;
// prepend the rest of the connection string
deviceConnectionString = "HostName=" + IoTHubName + ";DeviceId=" + m_configuration.ApplicationName.Replace(" ", "") + ";SharedAccessKey=" + deviceConnectionString;
SecureIoTHubToken.Write(m_configuration.ApplicationName, deviceConnectionString);
}
}
else
{
Trace("Opc.Ua.Publisher.Module: Could not parse IoT Hub owner connection string: " + ownerConnectionString);
session.Close();
}
m_deviceClient.CloseAsync().Wait();
}
/// <summary>
@ -336,7 +236,7 @@ namespace Opc.Ua.Publisher
if (newSession != null)
{
Trace("Opc.Ua.Publisher.Module: Created session with updated endpoint " + selectedEndpoint.EndpointUrl + " from server!");
Trace("Created session with updated endpoint " + selectedEndpoint.EndpointUrl + " from server!");
newSession.KeepAlive += new KeepAliveEventHandler((sender, e) => StandardClient_KeepAlive(sender, e, newSession));
m_sessions.Add(newSession);
}
@ -386,14 +286,14 @@ namespace Opc.Ua.Publisher
monitoredItem.QueueSize = 0;
monitoredItem.DiscardOldest = true;
monitoredItem.Notification += new MonitoredItemNotificationEventHandler(Module.MonitoredItem_Notification);
monitoredItem.Notification += new MonitoredItemNotificationEventHandler(MonitoredItem_Notification);
subscription.AddItem(monitoredItem);
subscription.ApplyChanges();
}
else
{
Trace("Opc.Ua.Publisher.Module: ERROR: Could not find endpoint URL " + nodeLookup.EndPointURL.ToString() + " in active server sessions, NodeID " + nodeLookup.NodeID.Identifier.ToString() + " NOT published!");
Trace("Opc.Ua.Publisher.Module: To fix this, please update your publishednodes.json file with the updated endpoint URL!");
Trace("ERROR: Could not find endpoint URL " + nodeLookup.EndPointURL.ToString() + " in active server sessions, NodeID " + nodeLookup.NodeID.Identifier.ToString() + " NOT published!");
Trace("To fix this, please update your publishednodes.json file with the updated endpoint URL!");
}
}
@ -425,7 +325,7 @@ namespace Opc.Ua.Publisher
string applicationURI = monitoredItem.Subscription.Session.Endpoint.Server.ApplicationUri;
encoder.WriteString("ApplicationUri", applicationURI);
encoder.WriteString("DisplayName", monitoredItem.DisplayName);
// write NodeId as ns=x;i=y
NodeId nodeId = monitoredItem.ResolvedNodeId;
encoder.WriteString("NodeId", new NodeId(nodeId.Identifier, nodeId.NamespaceIndex).ToString());
@ -435,31 +335,25 @@ namespace Opc.Ua.Publisher
encoder.WriteDataValue("Value", value);
string json = encoder.CloseAndReturnText();
// publish
var properties = new Dictionary<string, string>();
properties.Add("content-type", "application/opcua+uajson");
properties.Add("deviceName", m_deviceName);
var eventMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(json));
if (m_accessKey != null)
{
properties.Add("source", "mapping");
properties.Add("deviceKey", m_accessKey);
}
// publish
eventMessage.Properties.Add("content-type", "application/opcua+uajson");
eventMessage.Properties.Add("deviceName", m_applicationName);
try
{
Publish(new Message(json, properties));
m_deviceClient.SendEventAsync(eventMessage).Wait();
}
catch (Exception ex)
{
Trace("Opc.Ua.Publisher.Module: Failed to publish message, dropping...");
Trace("Failed to publish message, dropping...");
Trace(ex.ToString());
}
}
catch (Exception exception)
{
Trace("Opc.Ua.Publisher.Module: Error processing monitored item notification: " + exception.ToString());
Trace("Error processing monitored item notification: " + exception.ToString());
}
}
@ -500,8 +394,8 @@ namespace Opc.Ua.Publisher
}
catch (Exception e)
{
Trace("Opc.Ua.Publisher.Module: Could not fetch endpoints from url: " + discoveryUrl.ToString());
Trace("Opc.Ua.Publisher.Module: Reason = " + e.Message);
Trace("Could not fetch endpoints from url: " + discoveryUrl.ToString());
Trace("Reason = " + e.Message);
throw e;
}
}
@ -554,11 +448,11 @@ namespace Opc.Ua.Publisher
/// <summary>
/// Standard certificate validation callback
/// </summary>
private void CertificateValidator_CertificateValidation(CertificateValidator validator, CertificateValidationEventArgs e)
private static void CertificateValidator_CertificateValidation(CertificateValidator validator, CertificateValidationEventArgs e)
{
if (e.Error.StatusCode == StatusCodes.BadCertificateUntrusted)
{
Trace("Opc.Ua.Publisher.Module: Certificate \""
Trace("Certificate \""
+ e.Certificate.Subject
+ "\" not trusted. If you want to trust this certificate, please copy it from the \""
+ m_configuration.SecurityConfiguration.RejectedCertificateStore.StorePath + "/certs"
@ -567,5 +461,6 @@ namespace Opc.Ua.Publisher
+ "\" folder. A restart of the gateway is NOT required.");
}
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
{
"profiles": {
"Opc.Ua.Publisher": {
"commandName": "Project",
"commandLineArgs": "myApp"
}
}
}

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -1,5 +1,6 @@

using IoTHubCredentialTools;
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Opc.Ua;
using Opc.Ua.Client;
@ -33,7 +34,7 @@ namespace Publisher
{
if (inputArguments[0] == null || inputArguments[1] == null)
{
Module.Trace("PublishNodeMethod: Invalid Arguments!");
Program.Trace("PublishNodeMethod: Invalid Arguments!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments!");
}
@ -41,7 +42,7 @@ namespace Publisher
string uri = inputArguments[1] as string;
if (string.IsNullOrEmpty(nodeID) || string.IsNullOrEmpty(uri))
{
Module.Trace("PublishNodeMethod: Arguments are not valid strings!");
Program.Trace("PublishNodeMethod: Arguments are not valid strings!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
}
@ -53,20 +54,20 @@ namespace Publisher
}
catch (UriFormatException)
{
Module.Trace("PublishNodeMethod: Invalid endpoint URL!");
Program.Trace("PublishNodeMethod: Invalid endpoint URL!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA endpoint URL as second argument!");
}
// create session, if it doesn't exist already and complete asynchonourly (do to thread dependencies in the UA stack)
if (!Module.m_endpointUrls.Contains(lookup.EndPointURL))
if (!Program.m_endpointUrls.Contains(lookup.EndPointURL))
{
try
{
Task.Run(() =>
{
Module.Trace("PublishNodeMethod: Session not found, creating one for " + lookup.EndPointURL);
Module.EndpointConnect(lookup.EndPointURL).Wait();
Module.Trace("PublishNodeMethod: Session created.");
Program.Trace("PublishNodeMethod: Session not found, creating one for " + lookup.EndPointURL);
Program.EndpointConnect(lookup.EndPointURL).Wait();
Program.Trace("PublishNodeMethod: Session created.");
return DoPublish(lookup);
});
@ -75,7 +76,7 @@ namespace Publisher
}
catch (Exception ex)
{
Module.Trace("PublishNodeMethod: Exception: " + ex.ToString());
Program.Trace("PublishNodeMethod: Exception: " + ex.ToString());
return ServiceResult.Create(ex, StatusCodes.BadUnexpectedError, "Unexpected error publishing node: " + ex.Message);
}
}
@ -95,7 +96,7 @@ namespace Publisher
{
// find the right session using our lookup
Session matchingSession = null;
foreach (Session session in Module.m_sessions)
foreach (Session session in Program.m_sessions)
{
char[] trimChars = { '/', ' ' };
if (session.Endpoint.EndpointUrl.TrimEnd(trimChars).StartsWith(lookup.EndPointURL.ToString().TrimEnd(trimChars), StringComparison.OrdinalIgnoreCase))
@ -108,10 +109,10 @@ namespace Publisher
if (matchingSession == null)
{
Module.Trace("PublishNodeMethod: No matching session found for " + lookup.EndPointURL.ToString());
Program.Trace("PublishNodeMethod: No matching session found for " + lookup.EndPointURL.ToString());
return ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session for published node not found!");
}
Module.Trace("PublishNodeMethod: Session found.");
Program.Trace("PublishNodeMethod: Session found.");
// check if the node has already been published
@ -119,20 +120,20 @@ namespace Publisher
{
if (item.StartNodeId == lookup.NodeID)
{
Module.Trace("PublishNodeMethod: Node ID has already been published " + lookup.NodeID.ToString());
Program.Trace("PublishNodeMethod: Node ID has already been published " + lookup.NodeID.ToString());
return ServiceResult.Create(StatusCodes.BadNodeIdExists, "Node has already been published!");
}
}
// subscribe to the node
Module.CreateMonitoredItem(lookup);
Module.Trace("PublishNodeMethod: Monitored item created.");
Program.CreateMonitoredItem(lookup);
Program.Trace("PublishNodeMethod: Monitored item created.");
// update our data
Module.m_nodesLookups.Add(lookup);
if (!Module.m_endpointUrls.Contains(lookup.EndPointURL))
Program.m_nodesLookups.Add(lookup);
if (!Program.m_endpointUrls.Contains(lookup.EndPointURL))
{
Module.m_endpointUrls.Add(lookup.EndPointURL);
Program.m_endpointUrls.Add(lookup.EndPointURL);
}
//serialize Program.m_nodesLookups to disk
@ -141,14 +142,14 @@ namespace Publisher
{
publishedNodesFilePath = Environment.GetEnvironmentVariable("_GW_PNFP");
}
File.WriteAllText(publishedNodesFilePath, JsonConvert.SerializeObject(Module.m_nodesLookups));
File.WriteAllText(publishedNodesFilePath, JsonConvert.SerializeObject(Program.m_nodesLookups));
Module.Trace("PublishNodeMethod: Successful publish: " + lookup.ToString());
Program.Trace("PublishNodeMethod: Successful publish: " + lookup.ToString());
return ServiceResult.Good;
}
catch (Exception ex)
{
Module.Trace("PublishNodeMethod: Exception: " + ex.ToString());
Program.Trace("PublishNodeMethod: Exception: " + ex.ToString());
return ServiceResult.Create(ex, StatusCodes.BadUnexpectedError, "Unexpected error publishing node: " + ex.Message);
}
}
@ -160,7 +161,7 @@ namespace Publisher
{
if (inputArguments[0] == null || inputArguments[1] == null)
{
Module.Trace("UnPublishNodeMethod: Invalid arguments!");
Program.Trace("UnPublishNodeMethod: Invalid arguments!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments!");
}
@ -168,7 +169,7 @@ namespace Publisher
string uri = inputArguments[1] as string;
if (string.IsNullOrEmpty(nodeID) || string.IsNullOrEmpty(uri))
{
Module.Trace("UnPublishNodeMethod: Arguments are not valid strings!");
Program.Trace("UnPublishNodeMethod: Arguments are not valid strings!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
}
@ -180,13 +181,13 @@ namespace Publisher
}
catch (UriFormatException)
{
Module.Trace("UnPublishNodeMethod: Invalid endpoint URL!");
Program.Trace("UnPublishNodeMethod: Invalid endpoint URL!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA endpoint URL as second argument!");
}
// find the right session using our lookup
Session matchingSession = null;
foreach (Session session in Module.m_sessions)
foreach (Session session in Program.m_sessions)
{
char[] trimChars = { '/', ' ' };
if (session.Endpoint.EndpointUrl.TrimEnd(trimChars).Equals(lookup.EndPointURL.ToString().TrimEnd(trimChars), StringComparison.OrdinalIgnoreCase))
@ -198,7 +199,7 @@ namespace Publisher
if (matchingSession == null)
{
Module.Trace("UnPublishNodeMethod: Session for published node not found: " + lookup.EndPointURL.ToString());
Program.Trace("UnPublishNodeMethod: Session for published node not found: " + lookup.EndPointURL.ToString());
return ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session for published node not found!");
}
@ -208,11 +209,11 @@ namespace Publisher
if (item.StartNodeId == lookup.NodeID)
{
matchingSession.DefaultSubscription.RemoveItem(item);
Module.Trace("UnPublishNodeMethod: Successful unpublish: " + lookup.NodeID.ToString());
Program.Trace("UnPublishNodeMethod: Successful unpublish: " + lookup.NodeID.ToString());
// update our data on success only
// we keep the session to the server, as there may be other nodes still published on it
Module.m_nodesLookups.Remove(lookup);
Program.m_nodesLookups.Remove(lookup);
//serialize Program.m_nodesLookups to disk
string publishedNodesFilePath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + "publishednodes.json";
@ -220,13 +221,13 @@ namespace Publisher
{
publishedNodesFilePath = Environment.GetEnvironmentVariable("_GW_PNFP");
}
File.WriteAllText(publishedNodesFilePath, JsonConvert.SerializeObject(Module.m_nodesLookups));
File.WriteAllText(publishedNodesFilePath, JsonConvert.SerializeObject(Program.m_nodesLookups));
return ServiceResult.Good;
}
}
Module.Trace("UnPublishNodeMethod: Monitored item for node ID not found " + lookup.NodeID.ToString());
Program.Trace("UnPublishNodeMethod: Monitored item for node ID not found " + lookup.NodeID.ToString());
return ServiceResult.Create(StatusCodes.BadNodeIdInvalid, "Monitored item for node ID not found!");
}
@ -235,8 +236,8 @@ namespace Publisher
/// </summary>
private ServiceResult GetListOfPublishedNodesMethod(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{
outputArguments[0] = JsonConvert.SerializeObject(Module.m_nodesLookups);
Module.Trace("GetListOfPublishedNodesMethod: Success!");
outputArguments[0] = JsonConvert.SerializeObject(Program.m_nodesLookups);
Program.Trace("GetListOfPublishedNodesMethod: Success!");
return ServiceResult.Good;
}
@ -249,7 +250,7 @@ namespace Publisher
var connectionString = value as string;
if (string.IsNullOrEmpty(connectionString))
{
Module.Trace("ConnectionStringWrite: Invalid Argument!");
Program.Trace("ConnectionStringWrite: Invalid Argument!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
}
@ -257,47 +258,33 @@ namespace Publisher
timestamp = DateTime.Now;
// read current connection string and compare to the one passed in
string currentConnectionString = SecureIoTHubToken.Read(Module.m_configuration.ApplicationName);
string currentConnectionString = SecureIoTHubToken.Read(Program.m_configuration.ApplicationName);
if (string.Equals(connectionString, currentConnectionString, StringComparison.OrdinalIgnoreCase))
{
Module.Trace("ConnectionStringWrite: Connection string up to date!");
Program.Trace("ConnectionStringWrite: Connection string up to date!");
return ServiceResult.Create(StatusCodes.Bad, "Connection string already up-to-date!");
}
// try to parse connection string
string[] parsedConnectionString = IoTHubRegistration.ParseConnectionString(connectionString, true);
if ((parsedConnectionString == null) || (parsedConnectionString.Length != 3))
{
Module.Trace("ConnectionStringWrite: Connection string parsing error: " + connectionString);
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Could not parse connection string!");
}
// write connection string
Program.Trace("Attemping to configure publisher with connection string: " + connectionString);
// configure publisher and write connection string
try
{
SecureIoTHubToken.Write(Module.m_configuration.ApplicationName, connectionString);
DeviceClient newClient = DeviceClient.CreateFromConnectionString(connectionString, Microsoft.Azure.Devices.Client.TransportType.Mqtt);
newClient.OpenAsync().Wait();
newClient.RetryPolicy = RetryPolicyType.Exponential_Backoff_With_Jitter;
SecureIoTHubToken.Write(Program.m_configuration.ApplicationName, connectionString);
Program.m_deviceClient = newClient;
}
catch (Exception ex)
{
statusCode = StatusCodes.Bad;
Module.Trace("ConnectionStringWrite: Exception: " + ex.ToString());
Program.Trace("ConnectionStringWrite: Exception: " + ex.ToString());
return ServiceResult.Create(StatusCodes.Bad, "Publisher registration failed: " + ex.Message);
}
try
{
// try to configure our publisher component
Module.TryConfigurePublisherAsync().Wait();
}
catch (Exception ex)
{
statusCode = StatusCodes.Bad;
Module.Trace("ConnectionStringWrite: Exception: " + ex.ToString());
return ServiceResult.Create(StatusCodes.Bad, "Publisher configuration failed: " + ex.Message);
}
statusCode = StatusCodes.Good;
Module.Trace("ConnectionStringWrite: Success!");
Program.Trace("ConnectionStringWrite: Success!");
return statusCode;
}

Просмотреть файл