Rework async execution paths, move variables in corresponding classes.

This commit is contained in:
Hans Gschossmann 2017-10-10 17:36:56 +02:00
Родитель 9919ec90bd
Коммит 9f33f8e8d3
8 изменённых файлов: 585 добавлений и 399 удалений

Просмотреть файл

@ -13,14 +13,59 @@ namespace OpcPublisher
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Client;
using Opc.Ua;
using static Opc.Ua.CertificateStoreType;
using static OpcPublisher.Workarounds.TraceWorkaround;
using static Program;
using static OpcStackConfiguration;
/// <summary>
/// Class to handle all IoTHub communication.
/// </summary>
public class IotHubMessaging
{
public static string IotHubOwnerConnectionString
{
get => _iotHubOwnerConnectionString;
set => _iotHubOwnerConnectionString = value;
}
private static string _iotHubOwnerConnectionString = string.Empty;
public static Microsoft.Azure.Devices.Client.TransportType IotHubProtocol
{
get => _iotHubProtocol;
set => _iotHubProtocol = value;
}
private static Microsoft.Azure.Devices.Client.TransportType _iotHubProtocol = Microsoft.Azure.Devices.Client.TransportType.Mqtt;
public static uint MaxSizeOfIoTHubMessageBytes
{
get => _maxSizeOfIoTHubMessageBytes;
set => _maxSizeOfIoTHubMessageBytes = value;
}
private static uint _maxSizeOfIoTHubMessageBytes = 4096;
public static int DefaultSendIntervalSeconds
{
get => _defaultSendIntervalSeconds;
set => _defaultSendIntervalSeconds = value;
}
private static int _defaultSendIntervalSeconds = 1;
public static string IotDeviceCertStoreType
{
get => _iotDeviceCertStoreType;
set => _iotDeviceCertStoreType = value;
}
private static string _iotDeviceCertStoreType = X509Store;
public static string IotDeviceCertDirectoryStorePathDefault => "CertificateStores/IoTHub";
public static string IotDeviceCertX509StorePathDefault => "My";
public static string IotDeviceCertStorePath
{
get => _iotDeviceCertStorePath;
set => _iotDeviceCertStorePath = value;
}
private static string _iotDeviceCertStorePath = IotDeviceCertX509StorePathDefault;
/// <summary>
/// Classes for the telemetry message sent to IoTHub.
/// </summary>
@ -42,8 +87,6 @@ namespace OpcPublisher
private int _currentSizeOfIotHubMessageBytes;
private List<OpcUaMessage> _messageList;
private SemaphoreSlim _messageListSemaphore;
private uint _maxSizeOfIoTHubMessageBytes;
private int _defaultSendIntervalSeconds;
private CancellationTokenSource _tokenSource;
private Task _dequeueAndSendTask;
private Timer _sendTimer;
@ -66,22 +109,19 @@ namespace OpcPublisher
/// Initializes the communication with secrets and details for (batched) send process.
/// </summary>
/// <returns></returns>
public bool Init(string iotHubOwnerConnectionString, uint maxSizeOfIoTHubMessageBytes, int defaultSendIntervalSeconds)
public async Task<bool> InitAsync()
{
_maxSizeOfIoTHubMessageBytes = maxSizeOfIoTHubMessageBytes;
_defaultSendIntervalSeconds = defaultSendIntervalSeconds;
try
{
// check if we also received an owner connection string
if (string.IsNullOrEmpty(iotHubOwnerConnectionString))
if (string.IsNullOrEmpty(_iotHubOwnerConnectionString))
{
Trace("IoT Hub owner connection string not passed as argument.");
// check if we have an environment variable to register ourselves with IoT Hub
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_HUB_CS")))
{
iotHubOwnerConnectionString = Environment.GetEnvironmentVariable("_HUB_CS");
_iotHubOwnerConnectionString = Environment.GetEnvironmentVariable("_HUB_CS");
Trace("IoT Hub owner connection string read from environment.");
}
}
@ -90,49 +130,50 @@ namespace OpcPublisher
string deviceConnectionString;
Trace($"IoTHub device cert store type is: {IotDeviceCertStoreType}");
Trace($"IoTHub device cert path is: {IotDeviceCertStorePath}");
if (string.IsNullOrEmpty(iotHubOwnerConnectionString))
if (string.IsNullOrEmpty(_iotHubOwnerConnectionString))
{
Trace("IoT Hub owner connection string not specified. Assume device connection string already in cert store.");
}
else
{
Trace($"Attempting to register ourselves with IoT Hub using owner connection string: {iotHubOwnerConnectionString}");
RegistryManager manager = RegistryManager.CreateFromConnectionString(iotHubOwnerConnectionString);
Trace($"Attempting to register ourselves with IoT Hub using owner connection string: {_iotHubOwnerConnectionString}");
RegistryManager manager = RegistryManager.CreateFromConnectionString(_iotHubOwnerConnectionString);
// remove any existing device
Device existingDevice = manager.GetDeviceAsync(ApplicationName).Result;
Device existingDevice = await manager.GetDeviceAsync(ApplicationName);
if (existingDevice != null)
{
Trace($"Device '{ApplicationName}' found in IoTHub registry. Remove it.");
manager.RemoveDeviceAsync(ApplicationName).Wait();
await manager.RemoveDeviceAsync(ApplicationName);
}
Trace($"Adding device '{ApplicationName}' to IoTHub registry.");
Device newDevice = manager.AddDeviceAsync(new Device(ApplicationName)).Result;
Device newDevice = await manager.AddDeviceAsync(new Device(ApplicationName));
if (newDevice != null)
{
string hostname = iotHubOwnerConnectionString.Substring(0, iotHubOwnerConnectionString.IndexOf(";"));
string hostname = _iotHubOwnerConnectionString.Substring(0, _iotHubOwnerConnectionString.IndexOf(";"));
deviceConnectionString = hostname + ";DeviceId=" + ApplicationName + ";SharedAccessKey=" + newDevice.Authentication.SymmetricKey.PrimaryKey;
Trace($"Device connection string is: {deviceConnectionString}");
Trace($"Adding it to device cert store.");
SecureIoTHubToken.Write(ApplicationName, deviceConnectionString, IotDeviceCertStoreType, IotDeviceCertStoreType);
await SecureIoTHubToken.WriteAsync(ApplicationName, deviceConnectionString, IotDeviceCertStoreType, IotDeviceCertStorePath);
}
else
{
Trace($"Could not register ourselves with IoT Hub using owner connection string: {iotHubOwnerConnectionString}");
Trace($"Could not register ourselves with IoT Hub using owner connection string: {_iotHubOwnerConnectionString}");
Trace("exiting...");
return false;
}
}
// try to read connection string from secure store and open IoTHub client
Trace($"Attempting to read device connection string from cert store using subject name: {ApplicationName}");
deviceConnectionString = SecureIoTHubToken.Read(ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath);
deviceConnectionString = await SecureIoTHubToken.ReadAsync(ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath);
if (!string.IsNullOrEmpty(deviceConnectionString))
{
Trace($"Create Publisher IoTHub client with device connection string: '{deviceConnectionString}' using '{IotHubProtocol}' for communication.");
_iotHubClient = DeviceClient.CreateFromConnectionString(deviceConnectionString, IotHubProtocol);
_iotHubClient.OpenAsync().Wait();
await _iotHubClient.OpenAsync();
}
else
{
@ -160,11 +201,11 @@ namespace OpcPublisher
/// Method to write the IoTHub owner connection string into the cert store.
/// </summary>
/// <param name="iotHubOwnerConnectionString"></param>
public void ConnectionStringWrite(string iotHubOwnerConnectionString)
public async Task ConnectionStringWriteAsync(string iotHubOwnerConnectionString)
{
DeviceClient newClient = DeviceClient.CreateFromConnectionString(iotHubOwnerConnectionString, IotHubProtocol);
newClient.OpenAsync().Wait();
SecureIoTHubToken.Write(OpcConfiguration.ApplicationName, iotHubOwnerConnectionString, IotDeviceCertStoreType, IotDeviceCertStorePath);
await newClient.OpenAsync();
await SecureIoTHubToken.WriteAsync(PublisherOpcApplicationConfiguration.ApplicationName, iotHubOwnerConnectionString, IotDeviceCertStoreType, IotDeviceCertStorePath);
_iotHubClient = newClient;
}

Просмотреть файл

@ -37,9 +37,7 @@
<PackageReference Include="Microsoft.Azure.Devices" Version="1.3.2" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.5.0" />
<PackageReference Include="Mono.Options" Version="5.3.0.1" />
<PackageReference Include="Mono.Posix" Version="4.0.0" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="0.4.5" />
<PackageReference Include="System.Threading.Timer" Version="4.3.0" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -10,6 +10,7 @@ namespace OpcPublisher
using System.Threading;
using System.Threading.Tasks;
using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcStackConfiguration;
using static Program;
/// <summary>
@ -35,7 +36,7 @@ namespace OpcPublisher
public bool DiscardOldest;
public MonitoredItemNotificationEventHandler Notification;
public Uri EndpointUri;
public MonitoredItem MonitoredItem;
public MonitoredItem OpcUaClientMonitoredItem;
public string ConfigNodeId;
/// <summary>
@ -101,7 +102,7 @@ namespace OpcPublisher
JsonEncoder encoder = new JsonEncoder(monitoredItem.Subscription.Session.MessageContext, false);
string applicationURI = monitoredItem.Subscription.Session.Endpoint.Server.ApplicationUri;
encoder.WriteString("ApplicationUri", (applicationURI + (string.IsNullOrEmpty(ShopfloorDomain) ? "" : $":{ShopfloorDomain}")));
encoder.WriteString("ApplicationUri", (applicationURI + (string.IsNullOrEmpty(OpcSession.ShopfloorDomain) ? "" : $":{OpcSession.ShopfloorDomain}")));
encoder.WriteString("DisplayName", monitoredItem.DisplayName);
// use the node Id as configured, to also have the namespace URI in case of a ExpandedNodeId.
@ -115,7 +116,7 @@ namespace OpcPublisher
// add message to fifo send queue
Trace(Utils.TraceMasks.OperationDetail, $"Enqueue a new message from subscription {monitoredItem.Subscription.Id} (publishing interval: {monitoredItem.Subscription.PublishingInterval}, sampling interval: {monitoredItem.SamplingInterval}):");
Trace(Utils.TraceMasks.OperationDetail, " ApplicationUri: " + (applicationURI + (string.IsNullOrEmpty(ShopfloorDomain) ? "" : $":{ShopfloorDomain}")));
Trace(Utils.TraceMasks.OperationDetail, " ApplicationUri: " + (applicationURI + (string.IsNullOrEmpty(OpcSession.ShopfloorDomain) ? "" : $":{OpcSession.ShopfloorDomain}")));
Trace(Utils.TraceMasks.OperationDetail, $" DisplayName: {monitoredItem.DisplayName}");
Trace(Utils.TraceMasks.OperationDetail, $" Value: {value}");
IotHubCommunication.Enqueue(json);
@ -137,7 +138,7 @@ namespace OpcPublisher
public List<OpcMonitoredItem> OpcMonitoredItems;
public int RequestedPublishingInterval;
public double PublishingInterval;
public Subscription Subscription;
public Subscription OpcUaClientSubscription;
public OpcSubscription(int? publishingInterval)
{
@ -157,9 +158,22 @@ namespace OpcPublisher
Disconnected = 0,
Connected,
}
public static bool FetchOpcNodeDisplayName
{
get => _fetchOpcNodeDisplayName;
set => _fetchOpcNodeDisplayName = value;
}
private static bool _fetchOpcNodeDisplayName = false;
public static string ShopfloorDomain
{
get => _shopfloorDomain;
set => _shopfloorDomain = value;
}
private static string _shopfloorDomain;
public Uri EndpointUri;
public Session Session;
public Session OpcUaClientSession;
public SessionState State;
public List<OpcSubscription> OpcSubscriptions;
public uint SessionTimeout { get; }
@ -169,6 +183,7 @@ namespace OpcPublisher
private SemaphoreSlim _opcSessionSemaphore;
private NamespaceTable _namespaceTable;
private double _minSupportedSamplingInterval;
public KeepAliveEventHandler StandardKeepAliveEventHandlerAsync;
/// <summary>
/// Ctor for the session.
@ -197,11 +212,12 @@ namespace OpcPublisher
/// - sessions with out subscriptions are removed.
/// </summary>
/// <returns></returns>
public async Task ConnectAndMonitor()
public async Task ConnectAndMonitorAsync()
{
await _opcSessionSemaphore.WaitAsync();
try
{
await _opcSessionSemaphore.WaitAsync();
// if the session is disconnected, create one.
if (State == SessionState.Disconnected)
{
@ -213,22 +229,22 @@ namespace OpcPublisher
// start connecting
EndpointDescription selectedEndpoint = CoreClientUtils.SelectEndpoint(EndpointUri.AbsoluteUri, true);
ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(OpcConfiguration));
ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(PublisherOpcApplicationConfiguration));
uint timeout = SessionTimeout * ((UnsuccessfulConnectionCount >= OpcSessionCreationBackoffMax) ? OpcSessionCreationBackoffMax : UnsuccessfulConnectionCount + 1);
Trace($"Create session for endpoint URI '{EndpointUri.AbsoluteUri}' with timeout of {timeout} ms.");
Session = await Session.Create(
OpcConfiguration,
OpcUaClientSession = await Session.Create(
PublisherOpcApplicationConfiguration,
configuredEndpoint,
true,
false,
OpcConfiguration.ApplicationName,
PublisherOpcApplicationConfiguration.ApplicationName,
timeout,
new UserIdentity(new AnonymousIdentityToken()),
null);
if (Session != null)
if (OpcUaClientSession != null)
{
Trace($"Session successfully created with Id {Session.SessionId}.");
Trace($"Session successfully created with Id {OpcUaClientSession.SessionId}.");
if (!selectedEndpoint.EndpointUrl.Equals(configuredEndpoint.EndpointUrl.AbsoluteUri))
{
Trace($"the Server has updated the EndpointUrl to '{selectedEndpoint.EndpointUrl}'");
@ -236,11 +252,12 @@ namespace OpcPublisher
// init object state and install keep alive
UnsuccessfulConnectionCount = 0;
Session.KeepAliveInterval = OpcKeepAliveIntervalInSec * 1000;
Session.KeepAlive += StandardClient_KeepAlive;
OpcUaClientSession.KeepAliveInterval = OpcKeepAliveIntervalInSec * 1000;
StandardKeepAliveEventHandlerAsync = async (session, keepAliveEventArgs) => await StandardClient_KeepAlive(session, keepAliveEventArgs);
OpcUaClientSession.KeepAlive += StandardKeepAliveEventHandlerAsync;
// fetch the namespace array and cache it. it will not change as long the session exists.
DataValue namespaceArrayNodeValue = Session.ReadValue(VariableIds.Server_NamespaceArray);
DataValue namespaceArrayNodeValue = OpcUaClientSession.ReadValue(VariableIds.Server_NamespaceArray);
_namespaceTable.Update(namespaceArrayNodeValue.GetValue<string[]>(null));
// show the available namespaces
@ -252,7 +269,7 @@ namespace OpcPublisher
}
// fetch the minimum supported item sampling interval from the server.
DataValue minSupportedSamplingInterval = Session.ReadValue(VariableIds.Server_ServerCapabilities_MinSupportedSampleRate);
DataValue minSupportedSamplingInterval = OpcUaClientSession.ReadValue(VariableIds.Server_ServerCapabilities_MinSupportedSampleRate);
_minSupportedSamplingInterval = minSupportedSamplingInterval.GetValue(0);
Trace($"The server on endpoint '{selectedEndpoint.EndpointUrl}' supports a minimal sampling interval of {_minSupportedSamplingInterval} ms.");
}
@ -261,13 +278,13 @@ namespace OpcPublisher
{
Trace(e, $"Session creation to endpoint '{EndpointUri.AbsoluteUri}' failed {++UnsuccessfulConnectionCount} time(s). Please verify if server is up and Publisher configuration is correct.");
State = SessionState.Disconnected;
Session = null;
OpcUaClientSession = null;
return;
}
finally
{
await _opcSessionSemaphore.WaitAsync();
if (Session != null)
if (OpcUaClientSession != null)
{
State = SessionState.Connected;
}
@ -280,8 +297,8 @@ namespace OpcPublisher
var itemsToRemove = opcSubscription.OpcMonitoredItems.Where(i => i.State == OpcMonitoredItem.OpcMonitoredItemState.StopMonitoring);
if (itemsToRemove.Any())
{
Trace($"Remove nodes in subscription with id {opcSubscription.Subscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
opcSubscription.Subscription.RemoveItems(itemsToRemove.Select( i => i.MonitoredItem ));
Trace($"Remove nodes in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
opcSubscription.OpcUaClientSubscription.RemoveItems(itemsToRemove.Select( i => i.OpcUaClientMonitoredItem ));
}
}
@ -289,10 +306,10 @@ namespace OpcPublisher
foreach (var opcSubscription in OpcSubscriptions)
{
// create the subscription, if it is not yet there.
if (opcSubscription.Subscription == null)
if (opcSubscription.OpcUaClientSubscription == null)
{
int revisedPublishingInterval;
opcSubscription.Subscription = CreateSubscription(opcSubscription.RequestedPublishingInterval, out revisedPublishingInterval);
opcSubscription.OpcUaClientSubscription = CreateSubscription(opcSubscription.RequestedPublishingInterval, out revisedPublishingInterval);
opcSubscription.PublishingInterval = revisedPublishingInterval;
Trace($"Create subscription on endpoint '{EndpointUri.AbsoluteUri}' requested OPC publishing interval is {opcSubscription.RequestedPublishingInterval} ms. (revised: {revisedPublishingInterval} ms)");
}
@ -326,7 +343,7 @@ namespace OpcPublisher
Node node;
if (FetchOpcNodeDisplayName == true)
{
node = Session.ReadNode(currentNodeId);
node = OpcUaClientSession.ReadNode(currentNodeId);
item.DisplayName = node.DisplayName.Text ?? currentNodeId.ToString();
}
else
@ -346,13 +363,13 @@ namespace OpcPublisher
DiscardOldest = item.DiscardOldest
};
monitoredItem.Notification += item.Notification;
opcSubscription.Subscription.AddItem(monitoredItem);
opcSubscription.Subscription.SetPublishingMode(true);
opcSubscription.Subscription.ApplyChanges();
item.MonitoredItem = monitoredItem;
opcSubscription.OpcUaClientSubscription.AddItem(monitoredItem);
opcSubscription.OpcUaClientSubscription.SetPublishingMode(true);
opcSubscription.OpcUaClientSubscription.ApplyChanges();
item.OpcUaClientMonitoredItem = monitoredItem;
item.State = OpcMonitoredItem.OpcMonitoredItemState.Monitoreded;
item.EndpointUri = EndpointUri;
Trace($"Created monitored item for node '{currentNodeId}' in subscription with id {opcSubscription.Subscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
Trace($"Created monitored item for node '{currentNodeId}' in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
if (item.RequestedSamplingInterval != monitoredItem.SamplingInterval)
{
Trace($"Sampling interval: requested: {item.RequestedSamplingInterval}; revised: {monitoredItem.SamplingInterval}");
@ -366,10 +383,10 @@ namespace OpcPublisher
{
case StatusCodes.BadSessionIdInvalid:
{
Trace($"Session with Id {Session.SessionId} is no longer available on endpoint '{EndpointUri}'. Cleaning up.");
Trace($"Session with Id {OpcUaClientSession.SessionId} is no longer available on endpoint '{EndpointUri}'. Cleaning up.");
// clean up the session
_opcSessionSemaphore.Release();
await Disconnect();
await DisconnectAsync();
break;
}
case StatusCodes.BadNodeIdInvalid:
@ -398,21 +415,21 @@ namespace OpcPublisher
{
if (opcSubscription.OpcMonitoredItems.Count == 0)
{
Trace($"Subscription with id {opcSubscription.Subscription.Id} on endpoint '{EndpointUri}' is not used and will be deleted.");
Session.RemoveSubscription(opcSubscription.Subscription);
opcSubscription.Subscription = null;
Trace($"Subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri}' is not used and will be deleted.");
OpcUaClientSession.RemoveSubscription(opcSubscription.OpcUaClientSubscription);
opcSubscription.OpcUaClientSubscription = null;
}
}
// shutdown unused sessions.
try
{
await OpcSessionsSemaphore.WaitAsync();
await OpcSessionsListSemaphore.WaitAsync();
var unusedSessions = OpcSessions.Where(s => s.OpcSubscriptions.Count == 0);
foreach (var unusedSession in unusedSessions)
{
OpcSessions.Remove(unusedSession);
await unusedSession.Shutdown();
await unusedSession.ShutdownAsync();
}
// Shutdown everything on shutdown.
@ -422,13 +439,13 @@ namespace OpcPublisher
foreach (var session in allSessions)
{
OpcSessions.Remove(session);
await session.Shutdown();
await session.ShutdownAsync();
}
}
}
finally
{
OpcSessionsSemaphore.Release();
OpcSessionsListSemaphore.Release();
}
}
catch (Exception e)
@ -446,7 +463,7 @@ namespace OpcPublisher
/// as unmonitored.
/// </summary>
/// <returns></returns>
public async Task Disconnect()
public async Task DisconnectAsync()
{
await _opcSessionSemaphore.WaitAsync();
try
@ -455,7 +472,7 @@ namespace OpcPublisher
{
try
{
Session.RemoveSubscription(opcSubscription.Subscription);
OpcUaClientSession.RemoveSubscription(opcSubscription.OpcUaClientSubscription);
}
catch
{
@ -463,13 +480,13 @@ namespace OpcPublisher
}
try
{
opcSubscription.Subscription.Delete(true);
opcSubscription.OpcUaClientSubscription.Delete(true);
}
catch
{
// the subscription might be already invalidated. ignore.
}
opcSubscription.Subscription = null;
opcSubscription.OpcUaClientSubscription = null;
// mark all monitored items as unmonitored
foreach (var opcMonitoredItem in opcSubscription.OpcMonitoredItems)
@ -479,13 +496,13 @@ namespace OpcPublisher
}
try
{
Session.Close();
OpcUaClientSession.Close();
}
catch
{
// the session might be already invalidated. ignore.
}
Session = null;
catch
{
// the session might be already invalidated. ignore.
}
OpcUaClientSession = null;
}
catch (Exception e)
{
@ -504,11 +521,12 @@ namespace OpcPublisher
/// <param name="publishingInterval"></param>
/// <param name="samplingInterval"></param>
/// <param name="nodeId"></param>
public void AddNodeForMonitoring(int publishingInterval, int samplingInterval, NodeId nodeId)
public async Task AddNodeForMonitoring(int publishingInterval, int samplingInterval, NodeId nodeId)
{
_opcSessionSemaphore.Wait();
try
{
await _opcSessionSemaphore.WaitAsync();
if (PublisherShutdownInProgress)
{
return;
@ -540,9 +558,13 @@ namespace OpcPublisher
Trace($"AddNodeForMonitoring: Added item with nodeId '{nodeId.ToString()}' for monitoring.");
// Start publishing.
Task.Run(async () => await ConnectAndMonitor());
await ConnectAndMonitorAsync();
}
}
catch (Exception e)
{
Trace(e, $"AddNodeForMonitoring: Exception while trying to add node '{nodeId.ToString()}' for monitoring. (message: '{e.Message}'");
}
finally
{
_opcSessionSemaphore.Release();
@ -553,11 +575,12 @@ namespace OpcPublisher
/// Tags a monitored node to stop monitoring.
/// </summary>
/// <param name="nodeId"></param>
public void TagNodeForMonitoringStop(NodeId nodeId)
public async Task TagNodeForMonitoringStop(NodeId nodeId)
{
_opcSessionSemaphore.Wait();
try
{
await _opcSessionSemaphore.WaitAsync();
if (PublisherShutdownInProgress)
{
return;
@ -578,7 +601,11 @@ namespace OpcPublisher
}
// Stop publishing.
Task.Run(async () => await ConnectAndMonitor());
await ConnectAndMonitorAsync();
}
catch (Exception e)
{
Trace(e, $"TagNodeForMonitoringStop: Exception while trying to tag node '{nodeId.ToString()}' to stop monitoring. (message: '{e.Message}'");
}
finally
{
@ -590,11 +617,12 @@ namespace OpcPublisher
/// Shutsdown all connected sessions.
/// </summary>
/// <returns></returns>
public async Task Shutdown()
public async Task ShutdownAsync()
{
await _opcSessionSemaphore.WaitAsync();
try
{
await _opcSessionSemaphore.WaitAsync();
// if the session is connected, close it.
if (State == SessionState.Connected)
{
@ -602,13 +630,19 @@ namespace OpcPublisher
{
foreach (var opcSubscription in OpcSubscriptions)
{
Trace($"Removing {opcSubscription.Subscription.MonitoredItemCount} monitored items from subscription with id {opcSubscription.Subscription.Id}.");
opcSubscription.Subscription.RemoveItems(opcSubscription.Subscription.MonitoredItems);
Trace($"Removing {opcSubscription.OpcUaClientSubscription.MonitoredItemCount} monitored items from subscription with id {opcSubscription.OpcUaClientSubscription.Id}.");
opcSubscription.OpcUaClientSubscription.RemoveItems(opcSubscription.OpcUaClientSubscription.MonitoredItems);
}
Trace($"Removing {OpcUaClientSession.SubscriptionCount} subscriptions from session.");
while (OpcSubscriptions.Count > 0)
{
OpcSubscription opcSubscription = OpcSubscriptions.ElementAt(0);
OpcSubscriptions.RemoveAt(0);
Subscription opcUaClientSubscription = opcSubscription.OpcUaClientSubscription;
opcUaClientSubscription.Delete(true);
}
Trace($"Removing {Session.SubscriptionCount} subscriptions from session.");
Session.RemoveSubscriptions(Session.Subscriptions);
Trace($"Closing session to endpoint URI '{EndpointUri.AbsoluteUri}' closed successfully.");
Session.Close();
OpcUaClientSession.Close();
State = SessionState.Disconnected;
Trace($"Session to endpoint URI '{EndpointUri.AbsoluteUri}' closed successfully.");
}
@ -640,7 +674,7 @@ namespace OpcPublisher
PublishingInterval = requestedPublishingInterval,
};
// need to happen before the create to set the Session property.
Session.AddSubscription(subscription);
OpcUaClientSession.AddSubscription(subscription);
subscription.Create();
Trace($"Created subscription with id {subscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
if (requestedPublishingInterval != subscription.PublishingInterval)
@ -654,7 +688,7 @@ namespace OpcPublisher
/// <summary>
/// Handler for the standard "keep alive" event sent by all OPC UA servers
/// </summary>
private static void StandardClient_KeepAlive(Session session, KeepAliveEventArgs e)
private async Task StandardClient_KeepAlive(Session session, KeepAliveEventArgs e)
{
// Ignore if we are shutting down.
if (PublisherShutdownInProgress == true)
@ -662,17 +696,10 @@ namespace OpcPublisher
return;
}
if (e != null && session != null && session.ConfiguredEndpoint != null)
if (e != null && session != null && session.ConfiguredEndpoint != null && OpcUaClientSession != null)
{
OpcSession opcSession = null;
try
{
OpcSessionsSemaphore.Wait();
var opcSessions = OpcSessions.Where(s => s.Session != null);
opcSession = opcSessions.Where(s => s.Session.ConfiguredEndpoint.EndpointUrl.Equals(session.ConfiguredEndpoint.EndpointUrl)).FirstOrDefault();
if (!ServiceResult.IsGood(e.Status))
{
Trace($"Session endpoint: {session.ConfiguredEndpoint.EndpointUrl} has Status: {e.Status}");
@ -680,32 +707,31 @@ namespace OpcPublisher
Trace($"Good publish requests: {session.GoodPublishRequestCount}, KeepAlive interval: {session.KeepAliveInterval}");
Trace($"SessionId: {session.SessionId}");
if (opcSession != null && opcSession.State == SessionState.Connected)
if (State == SessionState.Connected)
{
opcSession.MissedKeepAlives++;
Trace($"Missed KeepAlives: {opcSession.MissedKeepAlives}");
if (opcSession.MissedKeepAlives >= OpcKeepAliveDisconnectThreshold)
MissedKeepAlives++;
Trace($"Missed KeepAlives: {MissedKeepAlives}");
if (MissedKeepAlives >= OpcKeepAliveDisconnectThreshold)
{
Trace($"Hit configured missed keep alive threshold of {Program.OpcKeepAliveDisconnectThreshold}. Disconnecting the session to endpoint {session.ConfiguredEndpoint.EndpointUrl}.");
session.KeepAlive -= StandardClient_KeepAlive;
Task disconnectTask = Task.Run(() => opcSession.Disconnect());
disconnectTask.Wait();
Trace($"Hit configured missed keep alive threshold of {OpcKeepAliveDisconnectThreshold}. Disconnecting the session to endpoint {session.ConfiguredEndpoint.EndpointUrl}.");
session.KeepAlive -= StandardKeepAliveEventHandlerAsync;
Task.Run(async () => await DisconnectAsync());
}
}
}
else
{
if (opcSession != null && opcSession.MissedKeepAlives != 0)
if (MissedKeepAlives != 0)
{
// Reset missed keep alive count
Trace($"Session endpoint: {session.ConfiguredEndpoint.EndpointUrl} got a keep alive after {opcSession.MissedKeepAlives} {(opcSession.MissedKeepAlives == 1 ? "was" : "were")} missed.");
opcSession.MissedKeepAlives = 0;
Trace($"Session endpoint: {session.ConfiguredEndpoint.EndpointUrl} got a keep alive after {MissedKeepAlives} {(MissedKeepAlives == 1 ? "was" : "were")} missed.");
MissedKeepAlives = 0;
}
}
}
finally
catch (Exception ex)
{
OpcSessionsSemaphore.Release();
Trace(ex, $"Error during keep alive handling for endpoint '{session.ConfiguredEndpoint.EndpointUrl}'. (message: '{ex.Message}'");
}
}
else

Просмотреть файл

@ -6,57 +6,242 @@ using System.Security.Cryptography.X509Certificates;
namespace OpcPublisher
{
using System.IO;
using System.Threading.Tasks;
using static Opc.Ua.CertificateStoreType;
using static OpcPublisher.Workarounds.TraceWorkaround;
public class OpcStackConfiguration
{
/// <summary>
/// Opc client configuration
/// </summary>
public ApplicationConfiguration Configuration;
public OpcStackConfiguration(string applicationName)
public static string ApplicationName
{
get => _applicationName;
set => _applicationName = value;
}
private static string _applicationName = "publisher";
public static string LogFileName
{
get => _logFileName;
set => _logFileName = value;
}
private static string _logFileName;
public static ushort PublisherServerPort
{
get => _publisherServerPort;
set => _publisherServerPort = value;
}
private static ushort _publisherServerPort = 62222;
public static string PublisherServerPath
{
get => _publisherServerPath;
set => _publisherServerPath = value;
}
private static string _publisherServerPath = "/UA/Publisher";
public static int OpcOperationTimeout
{
get => _opcOperationTimeout;
set => _opcOperationTimeout = value;
}
private static int _opcOperationTimeout = 120000;
public static bool TrustMyself
{
get => _trustMyself;
set => _trustMyself = value;
}
private static bool _trustMyself = true;
// Enable Utils.TraceMasks.OperationDetail to get output for IoTHub telemetry operations. Current: 0x287 (647), with OperationDetail: 0x2C7 (711)
public static int OpcStackTraceMask
{
get => _opcStackTraceMask;
set => _opcStackTraceMask = value;
}
private static int _opcStackTraceMask = Utils.TraceMasks.Error | Utils.TraceMasks.Security | Utils.TraceMasks.StackTrace | Utils.TraceMasks.StartStop;
public static bool OpcPublisherAutoTrustServerCerts
{
get => _opcPublisherAutoTrustServerCerts;
set => _opcPublisherAutoTrustServerCerts = value;
}
private static bool _opcPublisherAutoTrustServerCerts = false;
public static uint OpcSessionCreationTimeout
{
get => _opcSessionCreationTimeout;
set => _opcSessionCreationTimeout = value;
}
private static uint _opcSessionCreationTimeout = 10;
public static uint OpcSessionCreationBackoffMax
{
get => _opcSessionCreationBackoffMax;
set => _opcSessionCreationBackoffMax = value;
}
private static uint _opcSessionCreationBackoffMax = 5;
public static uint OpcKeepAliveDisconnectThreshold
{
get => _opcKeepAliveDisconnectThreshold;
set => _opcKeepAliveDisconnectThreshold = value;
}
private static uint _opcKeepAliveDisconnectThreshold = 5;
public static int OpcKeepAliveIntervalInSec
{
get => _opcKeepAliveIntervalInSec;
set => _opcKeepAliveIntervalInSec = value;
}
private static int _opcKeepAliveIntervalInSec = 2;
public static int OpcSamplingInterval
{
get => _opcSamplingInterval;
set => _opcSamplingInterval = value;
}
private static int _opcSamplingInterval = 1000;
public static int OpcPublishingInterval
{
get => _opcPublishingInterval;
set => _opcPublishingInterval = value;
}
private static int _opcPublishingInterval = 0;
public static string PublisherServerSecurityPolicy
{
get => _publisherServerSecurityPolicy;
set => _publisherServerSecurityPolicy = value;
}
private static string _publisherServerSecurityPolicy = SecurityPolicies.Basic128Rsa15;
public static string OpcOwnCertStoreType
{
get => _opcOwnCertStoreType;
set => _opcOwnCertStoreType = value;
}
private static string _opcOwnCertStoreType = X509Store;
public static string OpcOwnCertDirectoryStorePathDefault => "CertificateStores/own";
public static string OpcOwnCertX509StorePathDefault => "CurrentUser\\UA_MachineDefault";
public static string OpcOwnCertStorePath
{
get => _opcOwnCertStorePath;
set => _opcOwnCertStorePath = value;
}
private static string _opcOwnCertStorePath = OpcOwnCertX509StorePathDefault;
public static string OpcTrustedCertStoreType
{
get => _opcTrustedCertStoreType;
set => _opcTrustedCertStoreType = value;
}
private static string _opcTrustedCertStoreType = CertificateStoreType.Directory;
public static string OpcTrustedCertDirectoryStorePathDefault => "CertificateStores/trusted";
public static string OpcTrustedCertX509StorePathDefault => "CurrentUser\\UA_MachineDefault";
public static string OpcTrustedCertStorePath
{
get => _opcTrustedCertStorePath;
set => _opcTrustedCertStorePath = value;
}
private static string _opcTrustedCertStorePath = null;
public static string OpcRejectedCertStoreType
{
get => _opcRejectedCertStoreType;
set => _opcRejectedCertStoreType = value;
}
private static string _opcRejectedCertStoreType = CertificateStoreType.Directory;
public static string OpcRejectedCertDirectoryStorePathDefault => "CertificateStores/rejected";
public static string OpcRejectedCertX509StorePathDefault => "CurrentUser\\UA_MachineDefault";
public static string OpcRejectedCertStorePath
{
get => _opcRejectedCertStorePath;
set => _opcRejectedCertStorePath = value;
}
private static string _opcRejectedCertStorePath = OpcRejectedCertDirectoryStorePathDefault;
public static string OpcIssuerCertStoreType
{
get => _opcIssuerCertStoreType;
set => _opcIssuerCertStoreType = value;
}
private static string _opcIssuerCertStoreType = CertificateStoreType.Directory;
public static string OpcIssuerCertDirectoryStorePathDefault => "CertificateStores/issuers";
public static string OpcIssuerCertX509StorePathDefault => "CurrentUser\\UA_MachineDefault";
public static string OpcIssuerCertStorePath
{
get => _opcIssuerCertStorePath;
set => _opcIssuerCertStorePath = value;
}
private static string _opcIssuerCertStorePath = OpcIssuerCertDirectoryStorePathDefault;
public static int LdsRegistrationInterval
{
get => _ldsRegistrationInterval;
set => _ldsRegistrationInterval = value;
}
private static int _ldsRegistrationInterval = 0;
public static ApplicationConfiguration PublisherOpcApplicationConfiguration
{
get => _configuration;
}
private static ApplicationConfiguration _configuration;
/// <summary>
/// Configures all OPC stack settings
/// </summary>
public async Task ConfigureAsync()
{
// Instead of using a Config.xml we configure everything programmatically.
//
// OPC UA Application configuration
//
Configuration = new ApplicationConfiguration();
_configuration = new ApplicationConfiguration();
// Passed in as command line argument
Configuration.ApplicationName = applicationName;
Configuration.ApplicationUri = $"urn:{Utils.GetHostName()}:{Configuration.ApplicationName}:microsoft:";
Configuration.ProductUri = "https://github.com/Azure/iot-edge-opc-publisher";
Configuration.ApplicationType = ApplicationType.ClientAndServer;
_configuration.ApplicationName = _applicationName;
_configuration.ApplicationUri = $"urn:{Utils.GetHostName()}:{_configuration.ApplicationName}:microsoft:";
_configuration.ProductUri = "https://github.com/Azure/iot-edge-opc-publisher";
_configuration.ApplicationType = ApplicationType.ClientAndServer;
//
// Security configuration
//
Configuration.SecurityConfiguration = new SecurityConfiguration();
_configuration.SecurityConfiguration = new SecurityConfiguration();
// Application certificate
Configuration.SecurityConfiguration.ApplicationCertificate = new CertificateIdentifier();
Configuration.SecurityConfiguration.ApplicationCertificate.StoreType = Program.OpcOwnCertStoreType;
Configuration.SecurityConfiguration.ApplicationCertificate.StorePath = Program.OpcOwnCertStorePath;
Configuration.SecurityConfiguration.ApplicationCertificate.SubjectName = Configuration.ApplicationName;
Trace($"Application Certificate store type is: {Configuration.SecurityConfiguration.ApplicationCertificate.StoreType}");
Trace($"Application Certificate store path is: {Configuration.SecurityConfiguration.ApplicationCertificate.StorePath}");
Trace($"Application Certificate subject name is: {Configuration.SecurityConfiguration.ApplicationCertificate.SubjectName}");
_configuration.SecurityConfiguration.ApplicationCertificate = new CertificateIdentifier();
_configuration.SecurityConfiguration.ApplicationCertificate.StoreType = _opcOwnCertStoreType;
_configuration.SecurityConfiguration.ApplicationCertificate.StorePath = _opcOwnCertStorePath;
_configuration.SecurityConfiguration.ApplicationCertificate.SubjectName = _configuration.ApplicationName;
Trace($"Application Certificate store type is: {_configuration.SecurityConfiguration.ApplicationCertificate.StoreType}");
Trace($"Application Certificate store path is: {_configuration.SecurityConfiguration.ApplicationCertificate.StorePath}");
Trace($"Application Certificate subject name is: {_configuration.SecurityConfiguration.ApplicationCertificate.SubjectName}");
// Use existing certificate, if it is there.
X509Certificate2 certificate = Configuration.SecurityConfiguration.ApplicationCertificate.Find(true).Result;
X509Certificate2 certificate = await _configuration.SecurityConfiguration.ApplicationCertificate.Find(true);
if (certificate == null)
{
Trace($"No existing Application certificate found. Create a self-signed Application certificate valid from yesterday for {CertificateFactory.defaultLifeTime} months,");
Trace($"with a {CertificateFactory.defaultKeySize} bit key and {CertificateFactory.defaultHashSize} bit hash.");
certificate = CertificateFactory.CreateCertificate(
Configuration.SecurityConfiguration.ApplicationCertificate.StoreType,
Configuration.SecurityConfiguration.ApplicationCertificate.StorePath,
_configuration.SecurityConfiguration.ApplicationCertificate.StoreType,
_configuration.SecurityConfiguration.ApplicationCertificate.StorePath,
null,
Configuration.ApplicationUri,
Configuration.ApplicationName,
Configuration.ApplicationName,
_configuration.ApplicationUri,
_configuration.ApplicationName,
_configuration.ApplicationName,
null,
CertificateFactory.defaultKeySize,
DateTime.UtcNow - TimeSpan.FromDays(1),
@ -66,81 +251,81 @@ namespace OpcPublisher
null,
null
);
Configuration.SecurityConfiguration.ApplicationCertificate.Certificate = certificate ?? throw new Exception("OPC UA application certificate could not be created! Cannot continue without it!");
_configuration.SecurityConfiguration.ApplicationCertificate.Certificate = certificate ?? throw new Exception("OPC UA application certificate could not be created! Cannot continue without it!");
}
else
{
Trace("Application certificate found in Application Certificate Store");
}
Configuration.ApplicationUri = Utils.GetApplicationUriFromCertificate(certificate);
Trace($"Application certificate is for Application URI '{Configuration.ApplicationUri}', Application '{Configuration.ApplicationName} and has Subject '{Configuration.ApplicationName}'");
_configuration.ApplicationUri = Utils.GetApplicationUriFromCertificate(certificate);
Trace($"Application certificate is for Application URI '{_configuration.ApplicationUri}', Application '{_configuration.ApplicationName} and has Subject '{_configuration.ApplicationName}'");
// TrustedIssuerCertificates
Configuration.SecurityConfiguration.TrustedIssuerCertificates = new CertificateTrustList();
Configuration.SecurityConfiguration.TrustedIssuerCertificates.StoreType = Program.OpcIssuerCertStoreType;
Configuration.SecurityConfiguration.TrustedIssuerCertificates.StorePath = Program.OpcIssuerCertStorePath;
Trace($"Trusted Issuer store type is: {Configuration.SecurityConfiguration.TrustedIssuerCertificates.StoreType}");
Trace($"Trusted Issuer Certificate store path is: {Configuration.SecurityConfiguration.TrustedIssuerCertificates.StorePath}");
_configuration.SecurityConfiguration.TrustedIssuerCertificates = new CertificateTrustList();
_configuration.SecurityConfiguration.TrustedIssuerCertificates.StoreType = _opcIssuerCertStoreType;
_configuration.SecurityConfiguration.TrustedIssuerCertificates.StorePath = _opcIssuerCertStorePath;
Trace($"Trusted Issuer store type is: {_configuration.SecurityConfiguration.TrustedIssuerCertificates.StoreType}");
Trace($"Trusted Issuer Certificate store path is: {_configuration.SecurityConfiguration.TrustedIssuerCertificates.StorePath}");
// TrustedPeerCertificates
Configuration.SecurityConfiguration.TrustedPeerCertificates = new CertificateTrustList();
Configuration.SecurityConfiguration.TrustedPeerCertificates.StoreType = Program.OpcTrustedCertStoreType;
if (string.IsNullOrEmpty(Program.OpcTrustedCertStorePath))
_configuration.SecurityConfiguration.TrustedPeerCertificates = new CertificateTrustList();
_configuration.SecurityConfiguration.TrustedPeerCertificates.StoreType = _opcTrustedCertStoreType;
if (string.IsNullOrEmpty(_opcTrustedCertStorePath))
{
// Set default.
Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath = Program.OpcTrustedCertStoreType == CertificateStoreType.X509Store ? Program.OpcTrustedCertX509StorePathDefault : Program.OpcTrustedCertDirectoryStorePathDefault;
_configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath = _opcTrustedCertStoreType == X509Store ? OpcTrustedCertX509StorePathDefault : OpcTrustedCertDirectoryStorePathDefault;
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_TPC_SP")))
{
// Use environment variable.
Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath = Environment.GetEnvironmentVariable("_TPC_SP");
_configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath = Environment.GetEnvironmentVariable("_TPC_SP");
}
}
else
{
Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath = Program.OpcTrustedCertStorePath;
_configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath = _opcTrustedCertStorePath;
}
Trace($"Trusted Peer Certificate store type is: {Configuration.SecurityConfiguration.TrustedPeerCertificates.StoreType}");
Trace($"Trusted Peer Certificate store path is: {Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
Trace($"Trusted Peer Certificate store type is: {_configuration.SecurityConfiguration.TrustedPeerCertificates.StoreType}");
Trace($"Trusted Peer Certificate store path is: {_configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
// RejectedCertificateStore
Configuration.SecurityConfiguration.RejectedCertificateStore = new CertificateTrustList();
Configuration.SecurityConfiguration.RejectedCertificateStore.StoreType = Program.OpcRejectedCertStoreType;
Configuration.SecurityConfiguration.RejectedCertificateStore.StorePath = Program.OpcRejectedCertStorePath;
Trace($"Rejected certificate store type is: {Configuration.SecurityConfiguration.RejectedCertificateStore.StoreType}");
Trace($"Rejected Certificate store path is: {Configuration.SecurityConfiguration.RejectedCertificateStore.StorePath}");
_configuration.SecurityConfiguration.RejectedCertificateStore = new CertificateTrustList();
_configuration.SecurityConfiguration.RejectedCertificateStore.StoreType = _opcRejectedCertStoreType;
_configuration.SecurityConfiguration.RejectedCertificateStore.StorePath = _opcRejectedCertStorePath;
Trace($"Rejected certificate store type is: {_configuration.SecurityConfiguration.RejectedCertificateStore.StoreType}");
Trace($"Rejected Certificate store path is: {_configuration.SecurityConfiguration.RejectedCertificateStore.StorePath}");
// AutoAcceptUntrustedCertificates
// This is a security risk and should be set to true only for debugging purposes.
Configuration.SecurityConfiguration.AutoAcceptUntrustedCertificates = false;
_configuration.SecurityConfiguration.AutoAcceptUntrustedCertificates = false;
// RejectSHA1SignedCertificates
// We allow SHA1 certificates for now as many OPC Servers still use them
Configuration.SecurityConfiguration.RejectSHA1SignedCertificates = false;
Trace($"Rejection of SHA1 signed certificates is {(Configuration.SecurityConfiguration.RejectSHA1SignedCertificates ? "enabled" : "disabled")}");
_configuration.SecurityConfiguration.RejectSHA1SignedCertificates = false;
Trace($"Rejection of SHA1 signed certificates is {(_configuration.SecurityConfiguration.RejectSHA1SignedCertificates ? "enabled" : "disabled")}");
// MinimunCertificatesKeySize
// We allow a minimum key size of 1024 bit, as many OPC UA servers still use them
Configuration.SecurityConfiguration.MinimumCertificateKeySize = 1024;
Trace($"Minimum certificate key size set to {Configuration.SecurityConfiguration.MinimumCertificateKeySize}");
_configuration.SecurityConfiguration.MinimumCertificateKeySize = 1024;
Trace($"Minimum certificate key size set to {_configuration.SecurityConfiguration.MinimumCertificateKeySize}");
// We make the default reference stack behavior configurable to put our own certificate into the trusted peer store.
if (Program.TrustMyself)
if (_trustMyself)
{
// Ensure it is trusted
try
{
ICertificateStore store = Configuration.SecurityConfiguration.TrustedPeerCertificates.OpenStore();
ICertificateStore store = _configuration.SecurityConfiguration.TrustedPeerCertificates.OpenStore();
if (store == null)
{
Trace($"Could not open trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
Trace($"Could not open trusted peer store. StorePath={_configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
}
else
{
try
{
Trace($"Adding publisher certificate to trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
Trace($"Adding publisher certificate to trusted peer store. StorePath={_configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
X509Certificate2 publicKey = new X509Certificate2(certificate.RawData);
store.Add(publicKey).Wait();
await store.Add(publicKey);
}
finally
{
@ -150,7 +335,7 @@ namespace OpcPublisher
}
catch (Exception e)
{
Trace(e, $"Could not add publisher certificate to trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
Trace(e, $"Could not add publisher certificate to trusted peer store. StorePath={_configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
}
}
else
@ -163,24 +348,24 @@ namespace OpcPublisher
// TransportConfigurations
//
Configuration.TransportQuotas = new TransportQuotas();
_configuration.TransportQuotas = new TransportQuotas();
// the OperationTimeout should be twice the minimum value for PublishingInterval * KeepAliveCount, so set to 120s
Configuration.TransportQuotas.OperationTimeout = Program.OpcOperationTimeout;
Trace($"OperationTimeout set to {Configuration.TransportQuotas.OperationTimeout}");
_configuration.TransportQuotas.OperationTimeout = _opcOperationTimeout;
Trace($"OperationTimeout set to {_configuration.TransportQuotas.OperationTimeout}");
//
// ServerConfiguration
//
Configuration.ServerConfiguration = new ServerConfiguration();
_configuration.ServerConfiguration = new ServerConfiguration();
// BaseAddresses
if (Configuration.ServerConfiguration.BaseAddresses.Count == 0)
if (_configuration.ServerConfiguration.BaseAddresses.Count == 0)
{
// We do not use the localhost replacement mechanism of the configuration loading, to immediately show the base address here
Configuration.ServerConfiguration.BaseAddresses.Add($"opc.tcp://{Utils.GetHostName()}:{Program.PublisherServerPort}{Program.PublisherServerPath}");
_configuration.ServerConfiguration.BaseAddresses.Add($"opc.tcp://{Utils.GetHostName()}:{_publisherServerPort}{_publisherServerPath}");
}
foreach (var endpoint in Configuration.ServerConfiguration.BaseAddresses)
foreach (var endpoint in _configuration.ServerConfiguration.BaseAddresses)
{
Trace($"Publisher server base address: {endpoint}");
}
@ -192,53 +377,53 @@ namespace OpcPublisher
SecurityMode = MessageSecurityMode.SignAndEncrypt,
SecurityPolicyUri = SecurityPolicies.Basic256Sha256
};
Configuration.ServerConfiguration.SecurityPolicies.Add(newPolicy);
_configuration.ServerConfiguration.SecurityPolicies.Add(newPolicy);
Trace($"Security policy {newPolicy.SecurityPolicyUri} with mode {newPolicy.SecurityMode} added");
// MaxRegistrationInterval
Configuration.ServerConfiguration.MaxRegistrationInterval = Program.LdsRegistrationInterval;
Trace($"LDS(-ME) registration intervall set to {Program.LdsRegistrationInterval} ms (0 means no registration)");
_configuration.ServerConfiguration.MaxRegistrationInterval = _ldsRegistrationInterval;
Trace($"LDS(-ME) registration intervall set to {_ldsRegistrationInterval} ms (0 means no registration)");
//
// TraceConfiguration
//
Configuration.TraceConfiguration = new TraceConfiguration();
_configuration.TraceConfiguration = new TraceConfiguration();
// Due to a bug in a stack we need to do console output ourselve.
Utils.SetTraceOutput(Utils.TraceOutput.FileOnly);
// OutputFilePath
if (string.IsNullOrEmpty(Program.LogFileName))
if (string.IsNullOrEmpty(_logFileName))
{
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_LOGP")))
{
Configuration.TraceConfiguration.OutputFilePath = Environment.GetEnvironmentVariable("_GW_LOGP");
_configuration.TraceConfiguration.OutputFilePath = Environment.GetEnvironmentVariable("_GW_LOGP");
}
else
{
Configuration.TraceConfiguration.OutputFilePath = "./Logs/" + Configuration.ApplicationName + ".log.txt";
_configuration.TraceConfiguration.OutputFilePath = "./Logs/" + _configuration.ApplicationName + ".log.txt";
}
}
else
{
Configuration.TraceConfiguration.OutputFilePath = Program.LogFileName;
_configuration.TraceConfiguration.OutputFilePath = _logFileName;
}
// DeleteOnLoad
Configuration.TraceConfiguration.DeleteOnLoad = false;
_configuration.TraceConfiguration.DeleteOnLoad = false;
// TraceMasks
Configuration.TraceConfiguration.TraceMasks = Program.OpcStackTraceMask;
_configuration.TraceConfiguration.TraceMasks = _opcStackTraceMask;
// Apply the settings
Configuration.TraceConfiguration.ApplySettings();
Trace($"Current directory is: {Directory.GetCurrentDirectory()}");
Trace($"Log file is: {Utils.GetAbsoluteFilePath(Configuration.TraceConfiguration.OutputFilePath, true, false, false, true)}");
Trace($"opcstacktracemask set to: 0x{Program.OpcStackTraceMask:X} ({Program.OpcStackTraceMask})");
_configuration.TraceConfiguration.ApplySettings();
Trace($"Current directory is: {System.IO.Directory.GetCurrentDirectory()}");
Trace($"Log file is: {Utils.GetAbsoluteFilePath(_configuration.TraceConfiguration.OutputFilePath, true, false, false, true)}");
Trace($"opcstacktracemask set to: 0x{_opcStackTraceMask:X} ({_opcStackTraceMask})");
Configuration.ClientConfiguration = new ClientConfiguration();
_configuration.ClientConfiguration = new ClientConfiguration();
// validate the configuration now
Configuration.Validate(Configuration.ApplicationType).Wait();
await _configuration.Validate(_configuration.ApplicationType);
}
}
}

Просмотреть файл

@ -14,88 +14,48 @@ namespace OpcPublisher
using Opc.Ua;
using Opc.Ua.Server;
using System.Text.RegularExpressions;
using static IotHubMessaging;
using static Opc.Ua.CertificateStoreType;
using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcSession;
using static OpcStackConfiguration;
using static System.Console;
public class Program
{
//
// Publisher app related
//
public static int PublisherSessionConnectWaitSec = 10;
public static List<OpcSession> OpcSessions = new List<OpcSession>();
public static SemaphoreSlim OpcSessionsSemaphore = new SemaphoreSlim(1);
public static SemaphoreSlim OpcSessionsListSemaphore = new SemaphoreSlim(1);
public static List<PublisherConfigFileEntry> PublisherConfigFileEntries = new List<PublisherConfigFileEntry>();
public static List<NodeToPublishConfig> PublishConfig = new List<NodeToPublishConfig>();
public static string NodesToPublishAbsFilenameDefault = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
public static string NodesToPublishAbsFilename;
public static SemaphoreSlim PublishDataSemaphore = new SemaphoreSlim(1);
public static string ShopfloorDomain;
public static bool VerboseConsole;
public static bool PublisherShutdownInProgress = false;
public static IotHubMessaging IotHubCommunication;
public static string NodesToPublishAbsFilename
{
get => _nodesToPublishAbsFilename;
set => _nodesToPublishAbsFilename = value;
}
private static string _nodesToPublishAbsFilename;
public static bool PublisherShutdownInProgress
{
get => _publisherShutdownInProgress;
set => _publisherShutdownInProgress = value;
}
private static bool _publisherShutdownInProgress = false;
public static uint PublisherShutdownWaitPeriod
{
get => _publisherShutdownWaitPeriod;
set => _publisherShutdownWaitPeriod = value;
}
private static uint _publisherShutdownWaitPeriod = 10;
private static PublisherServer _publisherServer;
private static DateTime _lastServerSessionEventTime = DateTime.UtcNow;
public static uint PublisherShutdownWaitPeriod = 10;
//
// IoTHub related
//
private static string _IotHubOwnerConnectionString;
public static Microsoft.Azure.Devices.Client.TransportType IotHubProtocol = Microsoft.Azure.Devices.Client.TransportType.Mqtt;
public static IotHubMessaging IotHubCommunication;
private static uint _MaxSizeOfIoTHubMessageBytes = 4096;
private static int _DefaultSendIntervalSeconds = 1;
public static string IotDeviceCertStoreType = X509Store;
private const string _iotDeviceCertDirectoryStorePathDefault = "CertificateStores/IoTHub";
private const string _iotDeviceCertX509StorePathDefault = "IoTHub";
public static string IotDeviceCertStorePath = _iotDeviceCertX509StorePathDefault;
//
// OPC component related
//
public static ApplicationConfiguration OpcConfiguration = null;
public static string ApplicationName;
public static string LogFileName;
public static ushort PublisherServerPort = 62222;
public static string PublisherServerPath = "/UA/Publisher";
public static int LdsRegistrationInterval = 0;
public static int OpcOperationTimeout = 120000;
public static bool TrustMyself = true;
// Enable Utils.TraceMasks.OperationDetail to get output for IoTHub telemetry operations. Current: 0x287 (647), with OperationDetail: 0x2C7 (711)
public static int OpcStackTraceMask = Utils.TraceMasks.Error | Utils.TraceMasks.Security | Utils.TraceMasks.StackTrace | Utils.TraceMasks.StartStop;
public static bool OpcPublisherAutoTrustServerCerts = false;
public static uint OpcSessionCreationTimeout = 10;
public static uint OpcSessionCreationBackoffMax = 5;
public static uint OpcKeepAliveDisconnectThreshold = 5;
public static int OpcKeepAliveIntervalInSec = 2;
public static int OpcSamplingInterval = 1000;
public static int OpcPublishingInterval = 0;
public static bool FetchOpcNodeDisplayName = false;
public static string PublisherServerSecurityPolicy = SecurityPolicies.Basic128Rsa15;
public static string OpcOwnCertStoreType = X509Store;
private const string _opcOwnCertDirectoryStorePathDefault = "CertificateStores/own";
private const string _opcOwnCertX509StorePathDefault = "CurrentUser\\UA_MachineDefault";
public static string OpcOwnCertStorePath = _opcOwnCertX509StorePathDefault;
public static string OpcTrustedCertStoreType = Directory;
public static string OpcTrustedCertDirectoryStorePathDefault = "CertificateStores/UA Applications";
public static string OpcTrustedCertX509StorePathDefault = "CurrentUser\\UA_MachineDefault";
public static string OpcTrustedCertStorePath = null;
public static string OpcRejectedCertStoreType = Directory;
private const string _opcRejectedCertDirectoryStorePathDefault = "CertificateStores/Rejected Certificates";
private const string _opcRejectedCertX509StorePathDefault = "CurrentUser\\UA_MachineDefault";
public static string OpcRejectedCertStorePath = _opcRejectedCertDirectoryStorePathDefault;
public static string OpcIssuerCertStoreType = Directory;
private const string _opcIssuerCertDirectoryStorePathDefault = "CertificateStores/UA Certificate Authorities";
private const string _opcIssuerCertX509StorePathDefault = "CurrentUser\\UA_MachineDefault";
public static string OpcIssuerCertStorePath = _opcIssuerCertDirectoryStorePathDefault;
private static bool _opcTraceInitialized = false;
private static int _publisherSessionConnectWaitSec = 10;
private static string _nodesToPublishAbsFilenameDefault = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
/// <summary>
/// Usage message.
@ -130,9 +90,19 @@ namespace OpcPublisher
options.WriteOptionDescriptions(Console.Out);
}
/// <summary>
/// Synchronous main method of the app.
/// </summary>
public static void Main(string[] args)
{
var opcTraceInitialized = false;
MainAsync(args).Wait();
}
/// <summary>
/// Asynchronous part of the main method of the app.
/// </summary>
public async static Task MainAsync(string[] args)
{
try
{
var shouldShowHelp = false;
@ -140,7 +110,7 @@ namespace OpcPublisher
// command line options configuration
Mono.Options.OptionSet options = new Mono.Options.OptionSet {
// Publishing configuration options
{ "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{NodesToPublishAbsFilenameDefault}'", (string p) => NodesToPublishAbsFilename = p },
{ "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{_nodesToPublishAbsFilenameDefault}'", (string p) => _nodesToPublishAbsFilename = p },
{ "sd|shopfloordomain=", $"the domain of the shopfloor. if specified this domain is appended (delimited by a ':' to the 'ApplicationURI' property when telemetry is sent to IoTHub.\n" +
"The value must follow the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => {
Regex domainNameRegex = new Regex("^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$");
@ -154,10 +124,10 @@ namespace OpcPublisher
}
}
},
{ "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints and starts monitoring unmonitored items\nMin: 10\nDefault: {PublisherSessionConnectWaitSec}", (int i) => {
{ "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints and starts monitoring unmonitored items\nMin: 10\nDefault: {_publisherSessionConnectWaitSec}", (int i) => {
if (i > 10)
{
PublisherSessionConnectWaitSec = i;
_publisherSessionConnectWaitSec = i;
}
else
{
@ -171,10 +141,10 @@ namespace OpcPublisher
{ "ih|iothubprotocol=", $"the protocol to use for communication with Azure IoTHub (allowed values: {string.Join(", ", Enum.GetNames(IotHubProtocol.GetType()))}).\nDefault: {Enum.GetName(IotHubProtocol.GetType(), IotHubProtocol)}",
(Microsoft.Azure.Devices.Client.TransportType p) => IotHubProtocol = p
},
{ "ms|iothubmessagesize=", $"the max size of a message which can be send to IoTHub. when telemetry of this size is available it will be sent.\n0 will enforce immediate send when telemetry is available\nMin: 0\nMax: 256 * 1024\nDefault: {_MaxSizeOfIoTHubMessageBytes}", (uint u) => {
{ "ms|iothubmessagesize=", $"the max size of a message which can be send to IoTHub. when telemetry of this size is available it will be sent.\n0 will enforce immediate send when telemetry is available\nMin: 0\nMax: 256 * 1024\nDefault: {MaxSizeOfIoTHubMessageBytes}", (uint u) => {
if (u >= 0 && u <= 256 * 1024)
{
_MaxSizeOfIoTHubMessageBytes = u;
MaxSizeOfIoTHubMessageBytes = u;
}
else
{
@ -182,10 +152,10 @@ namespace OpcPublisher
}
}
},
{ "si|iothubsendinterval=", $"the interval in seconds when telemetry should be send to IoTHub. If 0, then only the iothubmessagesize parameter controls when telemetry is sent.\nDefault: '{_DefaultSendIntervalSeconds}'", (int i) => {
{ "si|iothubsendinterval=", $"the interval in seconds when telemetry should be send to IoTHub. If 0, then only the iothubmessagesize parameter controls when telemetry is sent.\nDefault: '{DefaultSendIntervalSeconds}'", (int i) => {
if (i >= 0)
{
_DefaultSendIntervalSeconds = i;
DefaultSendIntervalSeconds = i;
}
else
{
@ -279,7 +249,7 @@ namespace OpcPublisher
}
}
},
{ "st|opcstacktracemask=", $"the trace mask for the OPC stack. See github OPC .NET stack for definitions.\nTo enable IoTHub telemetry tracing set it to 711.\nDefault: {OpcStackTraceMask:X} ({Program.OpcStackTraceMask})", (int i) => {
{ "st|opcstacktracemask=", $"the trace mask for the OPC stack. See github OPC .NET stack for definitions.\nTo enable IoTHub telemetry tracing set it to 711.\nDefault: {OpcStackTraceMask:X} ({OpcStackTraceMask})", (int i) => {
if (i >= 0)
{
OpcStackTraceMask = i;
@ -303,7 +273,7 @@ namespace OpcPublisher
if (s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) || s.Equals(Directory, StringComparison.OrdinalIgnoreCase))
{
OpcOwnCertStoreType = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? X509Store : Directory;
OpcOwnCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? _opcOwnCertX509StorePathDefault : _opcOwnCertDirectoryStorePathDefault;
OpcOwnCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? OpcOwnCertX509StorePathDefault : OpcOwnCertDirectoryStorePathDefault;
}
else
{
@ -312,8 +282,8 @@ namespace OpcPublisher
}
},
{ "ap|appcertstorepath=", $"the path where the own application cert should be stored\nDefault (depends on store type):\n" +
$"X509Store: '{_opcOwnCertX509StorePathDefault}'\n" +
$"Directory: '{_opcOwnCertDirectoryStorePathDefault}'", (string s) => OpcOwnCertStorePath = s
$"X509Store: '{OpcOwnCertX509StorePathDefault}'\n" +
$"Directory: '{OpcOwnCertDirectoryStorePathDefault}'", (string s) => OpcOwnCertStorePath = s
},
// trusted cert store options
@ -340,7 +310,7 @@ namespace OpcPublisher
if (s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) || s.Equals(Directory, StringComparison.OrdinalIgnoreCase))
{
OpcRejectedCertStoreType = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? X509Store : Directory;
OpcRejectedCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? _opcRejectedCertX509StorePathDefault : _opcRejectedCertDirectoryStorePathDefault;
OpcRejectedCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? OpcRejectedCertX509StorePathDefault : OpcRejectedCertDirectoryStorePathDefault;
}
else
{
@ -349,8 +319,8 @@ namespace OpcPublisher
}
},
{ "rp|rejectedcertstorepath=", $"the path of the rejected cert store\nDefault (depends on store type):\n" +
$"X509Store: '{_opcRejectedCertX509StorePathDefault}'\n" +
$"Directory: '{_opcRejectedCertDirectoryStorePathDefault}'", (string s) => OpcRejectedCertStorePath = s
$"X509Store: '{OpcRejectedCertX509StorePathDefault}'\n" +
$"Directory: '{OpcRejectedCertDirectoryStorePathDefault}'", (string s) => OpcRejectedCertStorePath = s
},
// issuer cert store options
@ -359,7 +329,7 @@ namespace OpcPublisher
if (s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) || s.Equals(Directory, StringComparison.OrdinalIgnoreCase))
{
OpcIssuerCertStoreType = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? X509Store : Directory;
OpcIssuerCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? _opcIssuerCertX509StorePathDefault : _opcIssuerCertDirectoryStorePathDefault;
OpcIssuerCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? OpcIssuerCertX509StorePathDefault : OpcIssuerCertDirectoryStorePathDefault;
}
else
{
@ -368,8 +338,8 @@ namespace OpcPublisher
}
},
{ "ip|issuercertstorepath=", $"the path of the trusted issuer cert store\nDefault (depends on store type):\n" +
$"X509Store: '{_opcIssuerCertX509StorePathDefault}'\n" +
$"Directory: '{_opcIssuerCertDirectoryStorePathDefault}'", (string s) => OpcIssuerCertStorePath = s
$"X509Store: '{OpcIssuerCertX509StorePathDefault}'\n" +
$"Directory: '{OpcIssuerCertDirectoryStorePathDefault}'", (string s) => OpcIssuerCertStorePath = s
},
// device connection string cert store options
@ -377,7 +347,7 @@ namespace OpcPublisher
if (s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) || s.Equals(Directory, StringComparison.OrdinalIgnoreCase))
{
IotDeviceCertStoreType = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? X509Store : Directory;
IotDeviceCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? _iotDeviceCertX509StorePathDefault : _iotDeviceCertDirectoryStorePathDefault;
IotDeviceCertStorePath = s.Equals(X509Store, StringComparison.OrdinalIgnoreCase) ? IotDeviceCertX509StorePathDefault : IotDeviceCertDirectoryStorePathDefault;
}
else
{
@ -386,8 +356,8 @@ namespace OpcPublisher
}
},
{ "dp|devicecertstorepath=", $"the path of the iot device cert store\nDefault Default (depends on store type):\n" +
$"X509Store: '{_iotDeviceCertX509StorePathDefault}'\n" +
$"Directory: '{_iotDeviceCertDirectoryStorePathDefault}'", (string s) => IotDeviceCertStorePath = s
$"X509Store: '{IotDeviceCertX509StorePathDefault}'\n" +
$"Directory: '{IotDeviceCertDirectoryStorePathDefault}'", (string s) => IotDeviceCertStorePath = s
},
// misc
@ -418,23 +388,23 @@ namespace OpcPublisher
else if (arguments.Count == 2)
{
ApplicationName = arguments[0];
_IotHubOwnerConnectionString = arguments[1];
IotHubOwnerConnectionString = arguments[1];
}
else if (arguments.Count == 1)
{
ApplicationName = arguments[0];
}
else {
else
{
ApplicationName = Utils.GetHostName();
}
WriteLine("Publisher is starting up...");
// init OPC configuration and tracing
Init(OpcStackTraceMask, VerboseConsole);
OpcStackConfiguration opcStackConfiguration = new OpcStackConfiguration(ApplicationName);
opcTraceInitialized = true;
OpcConfiguration = opcStackConfiguration.Configuration;
OpcStackConfiguration opcStackConfiguration = new OpcStackConfiguration();
await opcStackConfiguration.ConfigureAsync();
_opcTraceInitialized = true;
// log shopfloor domain setting
if (string.IsNullOrEmpty(ShopfloorDomain))
@ -450,20 +420,20 @@ namespace OpcPublisher
if (OpcPublisherAutoTrustServerCerts)
{
Trace("Publisher configured to auto trust server certificates of the servers it is connecting to.");
OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_AutoTrustServerCerts);
PublisherOpcApplicationConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_AutoTrustServerCerts);
}
else
{
Trace("Publisher configured to not auto trust server certificates. When connecting to servers, you need to manually copy the rejected server certs to the trusted store to trust them.");
OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_Default);
PublisherOpcApplicationConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_Default);
}
// start our server interface
try
{
Trace($"Starting server on endpoint {OpcConfiguration.ServerConfiguration.BaseAddresses[0].ToString()} ...");
Trace($"Starting server on endpoint {PublisherOpcApplicationConfiguration.ServerConfiguration.BaseAddresses[0].ToString()} ...");
_publisherServer = new PublisherServer();
_publisherServer.Start(OpcConfiguration);
_publisherServer.Start(PublisherOpcApplicationConfiguration);
Trace("Server started.");
}
catch (Exception e)
@ -476,20 +446,20 @@ namespace OpcPublisher
// get information on the nodes to publish and validate the json by deserializing it.
try
{
PublishDataSemaphore.Wait();
if (string.IsNullOrEmpty(NodesToPublishAbsFilename))
await PublishDataSemaphore.WaitAsync();
if (string.IsNullOrEmpty(_nodesToPublishAbsFilename))
{
// check if we have an env variable specifying the published nodes path, otherwise use the default
NodesToPublishAbsFilename = NodesToPublishAbsFilenameDefault;
_nodesToPublishAbsFilename = _nodesToPublishAbsFilenameDefault;
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_PNFP")))
{
Trace("Publishing node configuration file path read from environment.");
NodesToPublishAbsFilename = Environment.GetEnvironmentVariable("_GW_PNFP");
_nodesToPublishAbsFilename = Environment.GetEnvironmentVariable("_GW_PNFP");
}
}
Trace($"Attempting to load nodes file from: {NodesToPublishAbsFilename}");
PublisherConfigFileEntries = JsonConvert.DeserializeObject<List<PublisherConfigFileEntry>>(File.ReadAllText(NodesToPublishAbsFilename));
Trace($"Attempting to load nodes file from: {_nodesToPublishAbsFilename}");
PublisherConfigFileEntries = JsonConvert.DeserializeObject<List<PublisherConfigFileEntry>>(File.ReadAllText(_nodesToPublishAbsFilename));
Trace($"Loaded {PublisherConfigFileEntries.Count.ToString()} config file entry/entries.");
foreach (var publisherConfigFileEntry in PublisherConfigFileEntries)
@ -523,7 +493,7 @@ namespace OpcPublisher
// initialize and start IoTHub messaging
IotHubCommunication = new IotHubMessaging();
if (!IotHubCommunication.Init(_IotHubOwnerConnectionString, _MaxSizeOfIoTHubMessageBytes, _DefaultSendIntervalSeconds))
if (! await IotHubCommunication.InitAsync())
{
return;
}
@ -531,8 +501,8 @@ namespace OpcPublisher
// create a list to manage sessions, subscriptions and monitored items.
try
{
PublishDataSemaphore.Wait();
OpcSessionsSemaphore.Wait();
await PublishDataSemaphore.WaitAsync();
await OpcSessionsListSemaphore.WaitAsync();
var uniqueEndpointUrls = PublishConfig.Select(n => n.EndpointUri).Distinct();
foreach (var endpointUrl in uniqueEndpointUrls)
{
@ -586,13 +556,13 @@ namespace OpcPublisher
}
finally
{
OpcSessionsSemaphore.Release();
OpcSessionsListSemaphore.Release();
PublishDataSemaphore.Release();
}
// kick off the task to maintain all sessions
var cts = new CancellationTokenSource();
Task.Run( async () => await SessionConnector(cts.Token));
Task.Run(() => SessionConnectorAsync(cts.Token));
// Show notification on session events
_publisherServer.CurrentInstance.SessionManager.SessionActivated += ServerEventStatus;
@ -616,14 +586,14 @@ namespace OpcPublisher
_publisherServer.Stop();
// Clean up Publisher sessions
Task.Run(async () => await SessionShutdown()).Wait();
SessionShutdownAsync().Wait();
// shutdown the IoTHub messaging
IotHubCommunication.Shutdown();
}
catch (Exception e)
{
if (opcTraceInitialized)
if (_opcTraceInitialized)
{
Trace(e, e.StackTrace);
e = e.InnerException ?? null;
@ -653,31 +623,49 @@ namespace OpcPublisher
/// <summary>
/// Kicks of the work horse of the publisher regularily for all sessions.
/// </summary>
public static async Task SessionConnector(CancellationToken cancellationtoken)
public static async Task SessionConnectorAsync(CancellationToken cancellationtoken)
{
while (true)
while (true && !PublisherShutdownInProgress)
{
try
{
// get tasks for all disconnected sessions and start them
OpcSessionsSemaphore.Wait();
var singleSessionHandlerTaskList = OpcSessions.Select(s => s.ConnectAndMonitor());
OpcSessionsSemaphore.Release();
await OpcSessionsListSemaphore.WaitAsync();
var singleSessionHandlerTaskList = OpcSessions.Select(s => s.ConnectAndMonitorAsync());
OpcSessionsListSemaphore.Release();
await Task.WhenAll(singleSessionHandlerTaskList);
}
catch (Exception e)
{
Trace(e, $"Failed to connect and monitor a disconnected server. {(e.InnerException != null ? e.InnerException.Message : "")}");
}
Thread.Sleep(PublisherSessionConnectWaitSec * 1000);
await Task.Delay(_publisherSessionConnectWaitSec * 1000);
}
}
/// <summary>
/// Shutdown all sessions.
/// Wait till all sessions are shutdown.
/// async tasks.
/// </summary>
public static async Task SessionShutdown()
public async static Task SessionShutdownAsync()
{
// Shutdown all sessions.
try
{
await OpcSessionsListSemaphore.WaitAsync();
while (OpcSessions.Count > 0)
{
OpcSession opcSession = OpcSessions.ElementAt(0);
OpcSessions.RemoveAt(0);
await opcSession.ShutdownAsync();
}
}
finally
{
OpcSessionsListSemaphore.Release();
}
// Wait and continue after a while.
uint maxTries = PublisherShutdownWaitPeriod;
while (true)
{
@ -688,11 +676,11 @@ namespace OpcPublisher
}
if (maxTries-- == 0)
{
Trace($"There are stil {sessionCount} sessions alive. Ignore and continue shutdown.");
Trace($"There are still {sessionCount} sessions alive. Ignore and continue shutdown.");
return;
}
Trace($"Publisher is shutting down. Wait {PublisherSessionConnectWaitSec} seconds, since there are stil {sessionCount} sessions alive...");
await Task.Delay(PublisherSessionConnectWaitSec * 1000);
Trace($"Publisher is shutting down. Wait {_publisherSessionConnectWaitSec} seconds, since there are stil {sessionCount} sessions alive...");
await Task.Delay(_publisherSessionConnectWaitSec * 1000);
}
}
@ -705,9 +693,9 @@ namespace OpcPublisher
{
Trace($"The Publisher does not trust the server with the certificate subject '{e.Certificate.Subject}'.");
Trace("If you want to trust this certificate, please copy it from the directory:");
Trace($"{OpcConfiguration.SecurityConfiguration.RejectedCertificateStore.StorePath}/certs");
Trace($"{PublisherOpcApplicationConfiguration.SecurityConfiguration.RejectedCertificateStore.StorePath}/certs");
Trace("to the directory:");
Trace($"{OpcConfiguration.SecurityConfiguration.TrustedPeerCertificates.StorePath}/certs");
Trace($"{PublisherOpcApplicationConfiguration.SecurityConfiguration.TrustedPeerCertificates.StorePath}/certs");
}
}

Просмотреть файл

@ -10,8 +10,10 @@ namespace OpcPublisher
using Newtonsoft.Json;
using System.IO;
using System.Linq;
using static IotHubMessaging;
using static OpcPublisher.Program;
using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcStackConfiguration;
public class PublisherNodeManager : CustomNodeManager2
{
@ -417,7 +419,7 @@ namespace OpcPublisher
}
/// <summary>
/// Method to start monitoring a node and publish the data to IoTHub.
/// Method to start monitoring a node and publish the data to IoTHub. Executes synchronously.
/// </summary>
private ServiceResult OnPublishNodeCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{
@ -464,7 +466,7 @@ namespace OpcPublisher
OpcSession opcSession = null;
try
{
OpcSessionsSemaphore.Wait();
OpcSessionsListSemaphore.WaitAsync();
opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == nodeToPublish.EndpointUri);
// add a new session.
@ -480,19 +482,19 @@ namespace OpcPublisher
Trace($"PublishNode: Session found for endpoint '{nodeToPublish.EndpointUri.OriginalString}'");
}
// add the node info to the subscription with the default publishing interval
opcSession.AddNodeForMonitoring(OpcPublishingInterval, OpcSamplingInterval, nodeToPublish.NodeId);
// add the node info to the subscription with the default publishing interval, execute syncronously
opcSession.AddNodeForMonitoring(OpcPublishingInterval, OpcSamplingInterval, nodeToPublish.NodeId).Wait();
Trace($"PublishNode: Requested to monitor item with NodeId '{nodeToPublish.NodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})");
}
finally
{
OpcSessionsSemaphore.Release();
OpcSessionsListSemaphore.Release();
}
// update our data
try
{
PublishDataSemaphore.Wait();
PublishDataSemaphore.WaitAsync();
PublishConfig.Add(nodeToPublish);
// add it also to the publish file
@ -518,7 +520,7 @@ namespace OpcPublisher
}
/// <summary>
/// Method to remove the node from the subscription and stop publishing telemetry to IoTHub.
/// Method to remove the node from the subscription and stop publishing telemetry to IoTHub. Executes synchronously.
/// </summary>
private ServiceResult OnUnpublishNodeCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{
@ -558,7 +560,7 @@ namespace OpcPublisher
OpcSession opcSession = null;
try
{
OpcSessionsSemaphore.Wait();
OpcSessionsListSemaphore.WaitAsync();
opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == endpointUri);
}
catch
@ -567,7 +569,7 @@ namespace OpcPublisher
}
finally
{
OpcSessionsSemaphore.Release();
OpcSessionsListSemaphore.Release();
}
if (opcSession == null)
@ -582,13 +584,13 @@ namespace OpcPublisher
}
// remove the node from the sessions monitored items list.
opcSession.TagNodeForMonitoringStop(nodeId);
opcSession.TagNodeForMonitoringStop(nodeId).Wait();
Trace("UnpublishNode: Requested to stop monitoring of node.");
// remove node from persisted config file
try
{
PublishDataSemaphore.Wait();
PublishDataSemaphore.WaitAsync();
var entryToRemove = PublisherConfigFileEntries.Find(l => l.NodeId == nodeId && l.EndpointUri == endpointUri);
PublisherConfigFileEntries.Remove(entryToRemove);
File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublisherConfigFileEntries));
@ -607,13 +609,13 @@ namespace OpcPublisher
}
/// <summary>
/// Method to get the list of published nodes.
/// Method to get the list of published nodes. Executes synchronously.
/// </summary>
private ServiceResult OnGetListOfPublishedNodesCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{
try
{
PublishDataSemaphore.Wait();
PublishDataSemaphore.WaitAsync();
outputArguments[0] = JsonConvert.SerializeObject(PublisherConfigFileEntries);
}
finally
@ -626,7 +628,7 @@ namespace OpcPublisher
}
/// <summary>
/// Data node in the server which registers ourselves with IoT Hub when this node is written.
/// Data node in the server which registers ourselves with IoT Hub when this node is written. Executes synchronously.
/// </summary>
public ServiceResult OnConnectionStringWrite(ISystemContext context, NodeState node, NumericRange indexRange, QualifiedName dataEncoding, ref object value, ref StatusCode statusCode, ref DateTime timestamp)
{
@ -641,7 +643,7 @@ namespace OpcPublisher
timestamp = DateTime.Now;
// read current connection string and compare to the one passed in
string currentConnectionString = SecureIoTHubToken.Read(OpcConfiguration.ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath);
string currentConnectionString = SecureIoTHubToken.ReadAsync(PublisherOpcApplicationConfiguration.ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath).Result;
if (string.Equals(connectionString, currentConnectionString, StringComparison.OrdinalIgnoreCase))
{
Trace("ConnectionStringWrite: Connection string up to date!");
@ -653,7 +655,7 @@ namespace OpcPublisher
// configure publisher and write connection string
try
{
IotHubCommunication.ConnectionStringWrite(connectionString);
IotHubCommunication.ConnectionStringWriteAsync(connectionString).Wait();
}
catch (Exception e)
{

Просмотреть файл

@ -13,10 +13,10 @@ using Org.BouncyCastle.X509;
using System;
using System.Collections;
using System.IO;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
namespace IoTHubCredentialTools
{
@ -56,7 +56,7 @@ namespace IoTHubCredentialTools
/// Returns the token from the cert in the given cert store.
/// </summary>
/// <returns></returns>
public static string Read(string name, string storeType, string storePath)
public async static Task<string> ReadAsync(string name, string storeType, string storePath)
{
string token = null;
@ -69,7 +69,7 @@ namespace IoTHubCredentialTools
using (DirectoryCertificateStore store = new DirectoryCertificateStore())
{
store.Open(storePath);
X509CertificateCollection certificates = store.Enumerate().Result;
X509CertificateCollection certificates = await store.Enumerate();
foreach (X509Certificate2 cert in certificates)
{
@ -110,7 +110,7 @@ namespace IoTHubCredentialTools
/// <summary>
/// Creates a cert with the connectionstring (token) and stores it in the given cert store.
/// </summary>
public static void Write(string name, string connectionString, string storeType, string storePath)
public async static Task WriteAsync(string name, string connectionString, string storeType, string storePath)
{
if (string.IsNullOrEmpty(connectionString))
{
@ -176,13 +176,13 @@ namespace IoTHubCredentialTools
{
Pkcs12Store pkcsStore = new Pkcs12StoreBuilder().Build();
X509CertificateEntry[] chain = new X509CertificateEntry[1];
string passcode = "passcode";
string passcode = Guid.NewGuid().ToString();
chain[0] = new X509CertificateEntry(x509);
pkcsStore.SetKeyEntry(name, new AsymmetricKeyEntry(keys.Private), chain);
pkcsStore.Save(pfxData, passcode.ToCharArray(), random);
// create X509Certificate2 object from PKCS12 file
certificate = CreateCertificateFromPKCS12(pfxData.ToArray(), passcode);
certificate = CertificateFactory.CreateCertificateFromPKCS12(pfxData.ToArray(), passcode);
// handle each store type differently
switch (storeType)
@ -193,7 +193,7 @@ namespace IoTHubCredentialTools
using (DirectoryCertificateStore store = new DirectoryCertificateStore())
{
store.Open(storePath);
X509CertificateCollection certificates = store.Enumerate().Result;
X509CertificateCollection certificates = await store.Enumerate();
// remove any existing cert with our name from the store
foreach (X509Certificate2 cert in certificates)
@ -212,7 +212,7 @@ namespace IoTHubCredentialTools
case CertificateStoreType.X509Store:
{
// Add to X509Store
using (X509Store store = new X509Store("iothub", StoreLocation.CurrentUser))
using (X509Store store = new X509Store(storePath, StoreLocation.CurrentUser))
{
store.Open(OpenFlags.ReadWrite);
@ -228,16 +228,6 @@ namespace IoTHubCredentialTools
// add new cert to store
try
{
// .NET Core workaround as described here: https://github.com/dotnet/core/blob/master/release-notes/1.0/Known-Issues-RC2.md
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
Mono.Unix.Native.FilePermissions mode = Mono.Unix.Native.FilePermissions.S_IRWXU;
Mono.Unix.Native.Syscall.chmod("/root/.dotnet", mode);
Mono.Unix.Native.Syscall.chmod("/root/.dotnet/corefx", mode);
Mono.Unix.Native.Syscall.chmod("/root/.dotnet/corefx/cryptography", mode);
Mono.Unix.Native.Syscall.chmod("/root/.dotnet/corefx/cryptography/x509stores", mode);
Mono.Unix.Native.Syscall.chmod("/root/.dotnet/corefx/cryptography/x509stores/iothub", mode);
}
store.Add(certificate);
}
catch (Exception e)
@ -256,48 +246,5 @@ namespace IoTHubCredentialTools
}
}
/// <summary>
/// Creates a X509 cert from a PKCS512 raw data stream.
/// </summary>
/// <returns></returns>
private static X509Certificate2 CreateCertificateFromPKCS12(byte[] rawData, string password)
{
Exception ex = null;
int flagsRetryCounter = 0;
X509Certificate2 certificate = null;
X509KeyStorageFlags[] storageFlags = {
X509KeyStorageFlags.Exportable | X509KeyStorageFlags.PersistKeySet | X509KeyStorageFlags.UserKeySet,
X509KeyStorageFlags.Exportable | X509KeyStorageFlags.PersistKeySet | X509KeyStorageFlags.MachineKeySet
};
// try some combinations of storage flags, support is platform dependent
while (certificate == null &&
flagsRetryCounter < storageFlags.Length)
{
try
{
// merge first cert with private key into X509Certificate2
certificate = new X509Certificate2(
rawData,
(password == null) ? String.Empty : password,
storageFlags[flagsRetryCounter]);
// can we really access the private key?
using (RSA rsa = certificate.GetRSAPrivateKey()) { }
}
catch (Exception e)
{
ex = e;
certificate = null;
}
flagsRetryCounter++;
}
if (certificate == null)
{
throw new NotSupportedException("Creating X509Certificate from PKCS #12 store failed", ex);
}
return certificate;
}
}
}

Просмотреть файл

@ -10,14 +10,13 @@ namespace OpcPublisher.Workarounds
/// </summary>
public static class TraceWorkaround
{
private static int _opcStackTraceMask;
private static bool _verboseConsole;
public static void Init(int opcStackTraceMask, bool verboseConsole)
public static bool VerboseConsole
{
_opcStackTraceMask = opcStackTraceMask;
_verboseConsole = verboseConsole;
get => _verboseConsole;
set => _verboseConsole = value;
}
private static bool _verboseConsole = false;
/// <summary>
/// Trace message helper
/// </summary>
@ -42,7 +41,7 @@ namespace OpcPublisher.Workarounds
public static void Trace(int traceMask, string format, params object[] args)
{
Utils.Trace(traceMask, format, args);
if (_verboseConsole && (_opcStackTraceMask & traceMask) != 0)
if (_verboseConsole && (OpcStackConfiguration.OpcStackTraceMask & traceMask) != 0)
{
WriteLine(DateTime.Now.ToString() + ": " + format, args);
}