From 6b8853e3e8a7f9a48457b1780e05c3c196878e94 Mon Sep 17 00:00:00 2001 From: Hans Gschossmann Date: Mon, 23 Oct 2017 08:54:59 +0200 Subject: [PATCH] Refactor published node configuration handling, refactor published node persistency, changes and fixes in publishers OPC UA server methods --- README.md | 28 +- src/OpcSession.cs | 647 ++++++++++++++++++++++-------- src/OpcStackConfiguration.cs | 1 - src/Program.cs | 130 +----- src/PublisherConfig.cs | 72 ---- src/PublisherNodeConfiguration.cs | 338 ++++++++++++++++ src/PublisherNodeManager.cs | 201 +++++----- src/publishednodes.json | 4 +- 8 files changed, 950 insertions(+), 471 deletions(-) delete mode 100644 src/PublisherConfig.cs create mode 100644 src/PublisherNodeConfiguration.cs diff --git a/README.md b/README.md index ca838b1..0ca2d29 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ This application uses the OPC Foundations's OPC UA reference stack and therefore |master|[![Build status](https://ci.appveyor.com/api/projects/status/6t7ru6ow7t9uv74r/branch/master?svg=true)](https://ci.appveyor.com/project/marcschier/iot-gateway-opc-ua-r4ba5/branch/master) [![Build Status](https://travis-ci.org/Azure/iot-gateway-opc-ua.svg?branch=master)](https://travis-ci.org/Azure/iot-gateway-opc-ua)| # Building the Application -The application requires the .NET Core SDK 1.1. +The application requires the .NET Core SDK 2.0. ## As native Windows application Open the OpcPublisher.sln project with Visual Studio 2017 and build the solution by hitting F7. @@ -34,7 +34,7 @@ From the root of the repository, in a console, type: The `-f` option for `docker build` is optional and the default is to use Dockerfile. Docker also support building directly from a git repository, which means you also can build a Linux container by: - docker build -t .https://github.com/Azure/iot-edge-opc-publisher + docker build -t https://github.com/Azure/iot-edge-opc-publisher # Configuring the OPC UA nodes to publish The OPC UA nodes whose values should be published to Azure IoT Hub can be configured by creating a JSON formatted configuration file (defaultname: "publishednodes.json"). This file is updated and persisted by the application, when using it's OPC UA server methods "PublishNode" or "UnpublishNode". @@ -71,9 +71,9 @@ The syntax of the configuration file is as follows: ] }, - // the format below is only supported for backward compatibility. you need to ensure that the + // the format below (NodeId format) is only supported for backward compatibility. you need to ensure that the // OPC UA server on the configured EndpointUrl has the namespaceindex you expect with your configuration. - // please use the ExpandedNodeId syntax instead. + // please use the ExpandedNodeId format as in the examples above instead. { "EndpointUrl": "opc.tcp://:/", "NodeId": { @@ -268,10 +268,10 @@ The following options are supported: -h, --help show this message and exit There are a couple of environment variables which can be used to control the application: -_HUB_CS: sets the IoTHub owner connectionstring -_GW_LOGP: sets the filename of the log file to use -_TPC_SP: sets the path to store certificates of trusted stations -_GW_PNFP: sets the filename of the publishing configuration file +* _HUB_CS: sets the IoTHub owner connectionstring +* _GW_LOGP: sets the filename of the log file to use +* _TPC_SP: sets the path to store certificates of trusted stations +* _GW_PNFP: sets the filename of the publishing configuration file Command line arguments overrule environment variable settings. @@ -303,7 +303,7 @@ The Publisher OPC UA server listens by default on port 62222. To expose this inb docker run -p 62222:62222 microsoft/iot-edge-opc-publisher [] [options] ### Enable intercontainer nameresolution -To enable name resolution from within the container to other containers, you need to create a user define docker bridge network and connect the container to this network using the `--network`option. +To enable name resolution from within the container to other containers, you need to create a user define docker bridge network and connect the container to this network using the `--network` option. Additionally you need to assign the container a name using the `--name` option as in this example: docker network create -d bridge iot_edge @@ -311,10 +311,18 @@ Additionally you need to assign the container a name using the `--name` option a The container can now be reached by other containers via the name `publisher`over the network. +### Access other hosts from within the container +Hosts which are implemented in other containers, could be reached with the parameters described in the "Enable intercontainer nameresolution" paragraph. +If the environment has DNS working, then accessing the host on which the docker runtime executes as was as any other DNS reachable host is no issue. +Problems occur in a network with Windows NetBIOS name resolution. To enable access to hosts (including the one on which the docker runtime executes) you need to start your container using the `--add-host` option. + + docker run --add-host mydevbox:192.168.178.23 microsoft/iot-edge-opc-publisher [] [options] + ### Assigning a hostname Publisher uses the hostname of the machine is running on for certificate and endpoint generation. docker chooses a random hostname if there is none set by the `-h` option. Here an example to set the internal hostname of the container to `publisher`: docker run -h publisher microsoft/iot-edge-opc-publisher [] [options] + ### Using bind mounts (shared filesystem) In certain use cases it may make sense to read configuration information from or write log files to locations on the host and not keep them in the container file system only. To achieve this you need to use the `-v` option of `docker run` in the bind mount mode. @@ -327,6 +335,8 @@ Storing X509 certificates does not work with bind mounts, since the permissions Open the OpcPublisher.sln project with Visual Studio 2017 and start debugging the app by hitting F5. +If you need to access the OPC UA server in the publisher, you should ensure that the firewall setting allow access to the port the server is listening on (default: 62222). + ## In a docker container Visual Studio 2017 supports debugging of application in docker container. This is done by using docker-compose. Since this does not allow to pass command line parameters it is not convenient. diff --git a/src/OpcSession.cs b/src/OpcSession.cs index b72121f..3e3bef1 100644 --- a/src/OpcSession.cs +++ b/src/OpcSession.cs @@ -9,9 +9,11 @@ namespace OpcPublisher using Opc.Ua; using System.Threading; using System.Threading.Tasks; + using static OpcPublisher.OpcMonitoredItem; using static OpcPublisher.Workarounds.TraceWorkaround; using static OpcStackConfiguration; using static Program; + using static PublisherNodeConfiguration; /// /// Class to manage the OPC monitored items, which are the nodes we need to publish. @@ -21,11 +23,17 @@ namespace OpcPublisher public enum OpcMonitoredItemState { Unmonitored = 0, - Monitoreded, - StopMonitoring, + UnmonitoredNamespaceUpdateRequested, + Monitored, + RemovalRequested, + } + + public enum OpcMonitoredItemConfigurationType + { + NodeId = 0, + ExpandedNodeId } - public ExpandedNodeId StartNodeId; public string DisplayName; public OpcMonitoredItemState State; public uint AttributeId; @@ -37,27 +45,39 @@ namespace OpcPublisher public MonitoredItemNotificationEventHandler Notification; public Uri EndpointUri; public MonitoredItem OpcUaClientMonitoredItem; - public string ConfigNodeId; + public NodeId ConfigNodeId; + public ExpandedNodeId ConfigExpandedNodeId; + public OpcMonitoredItemConfigurationType ConfigType; /// /// Ctor using NodeId (ns syntax for namespace). /// - public OpcMonitoredItem(NodeId nodeId, Uri sessionEndpointUri) + public OpcMonitoredItem(NodeId nodeId, Uri sessionEndpointUri, bool requestNamespaceUpdate = false) { - ConfigNodeId = nodeId.ToString(); - StartNodeId = new ExpandedNodeId(nodeId); + ConfigNodeId = nodeId; + ConfigExpandedNodeId = null; + ConfigType = OpcMonitoredItemConfigurationType.NodeId; Initialize(sessionEndpointUri); + if (requestNamespaceUpdate) + { + State = OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested; + } } /// /// Ctor using ExpandedNodeId (nsu syntax for namespace). /// - public OpcMonitoredItem(ExpandedNodeId expandedNodeId, Uri sessionEndpointUri) + public OpcMonitoredItem(ExpandedNodeId expandedNodeId, Uri sessionEndpointUri, bool requestNamespaceUpdate = false) { - ConfigNodeId = expandedNodeId.ToString(); - StartNodeId = expandedNodeId; + ConfigNodeId = null; + ConfigExpandedNodeId = expandedNodeId; + ConfigType = OpcMonitoredItemConfigurationType.ExpandedNodeId; Initialize(sessionEndpointUri); - } + if (requestNamespaceUpdate) + { + State = OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested; + } +} /// /// Init class variables. @@ -75,6 +95,62 @@ namespace OpcPublisher EndpointUri = sessionEndpointUri; } + /// + /// Checks if the monitored item does monitor the node described by the given objects. + /// + public bool IsMonitoringThisNode(NodeId nodeId, ExpandedNodeId expandedNodeId, NamespaceTable namespaceTable) + { + if (State == OpcMonitoredItemState.RemovalRequested) + { + return false; + } + if (ConfigType == OpcMonitoredItemConfigurationType.NodeId) + { + if (nodeId != null) + { + if (ConfigNodeId == nodeId) + { + return true; + } + } + if (expandedNodeId != null) + { + string namespaceUri = namespaceTable.ToArray().ElementAtOrDefault(ConfigNodeId.NamespaceIndex); + if (expandedNodeId.NamespaceUri != null && expandedNodeId.NamespaceUri.Equals(namespaceUri, StringComparison.OrdinalIgnoreCase)) + { + if (expandedNodeId.Identifier.ToString().Equals(ConfigNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase)) + { + return true; + } + } + } + } + if (ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId) + { + if (nodeId != null) + { + int namespaceIndex = namespaceTable.GetIndex(ConfigExpandedNodeId?.NamespaceUri); + if (nodeId.NamespaceIndex == namespaceIndex) + { + if (nodeId.Identifier.ToString().Equals(ConfigExpandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase)) + { + return true; + } + } + } + if (expandedNodeId != null) + { + if (ConfigExpandedNodeId.NamespaceUri != null && + ConfigExpandedNodeId.NamespaceUri.Equals(expandedNodeId.NamespaceUri, StringComparison.OrdinalIgnoreCase) && + ConfigExpandedNodeId.Identifier.ToString().Equals(expandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase)) + { + return true; + } + } + } + return false; + } + /// /// The notification that the data for a monitored item has changed on an OPC UA server. /// @@ -106,7 +182,7 @@ namespace OpcPublisher encoder.WriteString("DisplayName", monitoredItem.DisplayName); // use the node Id as configured, to also have the namespace URI in case of a ExpandedNodeId. - encoder.WriteString("NodeId", ConfigNodeId); + encoder.WriteString("NodeId", ConfigType == OpcMonitoredItemConfigurationType.NodeId ? ConfigNodeId.ToString() : ConfigExpandedNodeId.ToString()); // suppress output of server timestamp in json by setting it to minvalue value.ServerTimestamp = DateTime.MinValue; @@ -212,60 +288,23 @@ namespace OpcPublisher /// public async Task ConnectAndMonitorAsync() { + bool updateConfigFileRequired = false; try { - await ConnectSessions(); + await ConnectSession(); - await MonitorNodes(); + updateConfigFileRequired = await MonitorNodes(); - // stop monitoring of nodes if requested and remove them from the monitored items list. - try + updateConfigFileRequired |= await StopMonitoringNodes(); + + await RemoveUnusedSubscriptions(); + + await RemoveUnusedSessions(); + + // update the config file if required + if (updateConfigFileRequired) { - await _opcSessionSemaphore.WaitAsync(); - foreach (var opcSubscription in OpcSubscriptions) - { - var itemsToRemove = opcSubscription.OpcMonitoredItems.Where(i => i.State == OpcMonitoredItem.OpcMonitoredItemState.StopMonitoring); - if (itemsToRemove.Any()) - { - Trace($"Remove nodes in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'"); - opcSubscription.OpcUaClientSubscription.RemoveItems(itemsToRemove.Select(i => i.OpcUaClientMonitoredItem)); - } - } - } - finally - { - _opcSessionSemaphore.Release(); - } - - - // remove unused subscriptions. - try - { - await _opcSessionSemaphore.WaitAsync(); - foreach (var opcSubscription in OpcSubscriptions) - { - if (opcSubscription.OpcMonitoredItems.Count == 0) - { - Trace($"Subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri}' is not used and will be deleted."); - OpcUaClientSession.RemoveSubscription(opcSubscription.OpcUaClientSubscription); - opcSubscription.OpcUaClientSubscription = null; - } - } - } - finally - { - _opcSessionSemaphore.Release(); - } - - // shutdown unused sessions. - var unusedSessions = OpcSessions.Where(s => s.OpcSubscriptions.Count == 0); - foreach (var unusedSession in unusedSessions) - { - await OpcSessionsListSemaphore.WaitAsync(); - OpcSessions.Remove(unusedSession); - OpcSessionsListSemaphore.Release(); - - await unusedSession.ShutdownAsync(); + await UpdateNodeConfigurationFile(); } } catch (Exception e) @@ -275,89 +314,91 @@ namespace OpcPublisher } /// - /// Connects all disconnected sessions. + /// Connects the session if it is disconnected. /// - public async Task ConnectSessions() + public async Task ConnectSession() { try { await _opcSessionSemaphore.WaitAsync(); - // if the session is disconnected, create one. - if (State == SessionState.Disconnected) + // if the session is not disconnected, return + if (State != SessionState.Disconnected) { - Trace($"Connect and monitor session and nodes on endpoint '{EndpointUri.AbsoluteUri}'."); - State = SessionState.Connecting; - try + return; + } + + Trace($"Connect and monitor session and nodes on endpoint '{EndpointUri.AbsoluteUri}'."); + State = SessionState.Connecting; + try + { + // release the session to not block for high network timeouts. + _opcSessionSemaphore.Release(); + + // start connecting + EndpointDescription selectedEndpoint = CoreClientUtils.SelectEndpoint(EndpointUri.AbsoluteUri, true); + ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(PublisherOpcApplicationConfiguration)); + uint timeout = SessionTimeout * ((UnsuccessfulConnectionCount >= OpcSessionCreationBackoffMax) ? OpcSessionCreationBackoffMax : UnsuccessfulConnectionCount + 1); + Trace($"Create session for endpoint URI '{EndpointUri.AbsoluteUri}' with timeout of {timeout} ms."); + OpcUaClientSession = await Session.Create( + PublisherOpcApplicationConfiguration, + configuredEndpoint, + true, + false, + PublisherOpcApplicationConfiguration.ApplicationName, + timeout, + new UserIdentity(new AnonymousIdentityToken()), + null); + + if (OpcUaClientSession != null) { - // release the session to not block for high network timeouts. - _opcSessionSemaphore.Release(); - - // start connecting - EndpointDescription selectedEndpoint = CoreClientUtils.SelectEndpoint(EndpointUri.AbsoluteUri, true); - ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(PublisherOpcApplicationConfiguration)); - uint timeout = SessionTimeout * ((UnsuccessfulConnectionCount >= OpcSessionCreationBackoffMax) ? OpcSessionCreationBackoffMax : UnsuccessfulConnectionCount + 1); - Trace($"Create session for endpoint URI '{EndpointUri.AbsoluteUri}' with timeout of {timeout} ms."); - OpcUaClientSession = await Session.Create( - PublisherOpcApplicationConfiguration, - configuredEndpoint, - true, - false, - PublisherOpcApplicationConfiguration.ApplicationName, - timeout, - new UserIdentity(new AnonymousIdentityToken()), - null); - - if (OpcUaClientSession != null) + Trace($"Session successfully created with Id {OpcUaClientSession.SessionId}."); + if (!selectedEndpoint.EndpointUrl.Equals(configuredEndpoint.EndpointUrl.AbsoluteUri, StringComparison.OrdinalIgnoreCase)) { - Trace($"Session successfully created with Id {OpcUaClientSession.SessionId}."); - if (!selectedEndpoint.EndpointUrl.Equals(configuredEndpoint.EndpointUrl.AbsoluteUri)) - { - Trace($"the Server has updated the EndpointUrl to '{selectedEndpoint.EndpointUrl}'"); - } - - // init object state and install keep alive - UnsuccessfulConnectionCount = 0; - OpcUaClientSession.KeepAliveInterval = OpcKeepAliveIntervalInSec * 1000; - StandardKeepAliveEventHandlerAsync = async (session, keepAliveEventArgs) => await StandardClient_KeepAlive(session, keepAliveEventArgs); - OpcUaClientSession.KeepAlive += StandardKeepAliveEventHandlerAsync; - - // fetch the namespace array and cache it. it will not change as long the session exists. - DataValue namespaceArrayNodeValue = OpcUaClientSession.ReadValue(VariableIds.Server_NamespaceArray); - _namespaceTable.Update(namespaceArrayNodeValue.GetValue(null)); - - // show the available namespaces - Trace($"The session to endpoint '{selectedEndpoint.EndpointUrl}' has {_namespaceTable.Count} entries in its namespace array:"); - int i = 0; - foreach (var ns in _namespaceTable.ToArray()) - { - Trace($"Namespace index {i++}: {ns}"); - } - - // fetch the minimum supported item sampling interval from the server. - DataValue minSupportedSamplingInterval = OpcUaClientSession.ReadValue(VariableIds.Server_ServerCapabilities_MinSupportedSampleRate); - _minSupportedSamplingInterval = minSupportedSamplingInterval.GetValue(0); - Trace($"The server on endpoint '{selectedEndpoint.EndpointUrl}' supports a minimal sampling interval of {_minSupportedSamplingInterval} ms."); + Trace($"the Server has updated the EndpointUrl to '{selectedEndpoint.EndpointUrl}'"); } + + // init object state and install keep alive + UnsuccessfulConnectionCount = 0; + OpcUaClientSession.KeepAliveInterval = OpcKeepAliveIntervalInSec * 1000; + StandardKeepAliveEventHandlerAsync = async (session, keepAliveEventArgs) => await StandardClient_KeepAlive(session, keepAliveEventArgs); + OpcUaClientSession.KeepAlive += StandardKeepAliveEventHandlerAsync; + + // fetch the namespace array and cache it. it will not change as long the session exists. + DataValue namespaceArrayNodeValue = OpcUaClientSession.ReadValue(VariableIds.Server_NamespaceArray); + _namespaceTable.Update(namespaceArrayNodeValue.GetValue(null)); + + // show the available namespaces + Trace($"The session to endpoint '{selectedEndpoint.EndpointUrl}' has {_namespaceTable.Count} entries in its namespace array:"); + int i = 0; + foreach (var ns in _namespaceTable.ToArray()) + { + Trace($"Namespace index {i++}: {ns}"); + } + + // fetch the minimum supported item sampling interval from the server. + DataValue minSupportedSamplingInterval = OpcUaClientSession.ReadValue(VariableIds.Server_ServerCapabilities_MinSupportedSampleRate); + _minSupportedSamplingInterval = minSupportedSamplingInterval.GetValue(0); + Trace($"The server on endpoint '{selectedEndpoint.EndpointUrl}' supports a minimal sampling interval of {_minSupportedSamplingInterval} ms."); } - catch (Exception e) + } + catch (Exception e) + { + Trace(e, $"Session creation to endpoint '{EndpointUri.AbsoluteUri}' failed {++UnsuccessfulConnectionCount} time(s). Please verify if server is up and Publisher configuration is correct."); + State = SessionState.Disconnected; + OpcUaClientSession = null; + return; + } + finally + { + await _opcSessionSemaphore.WaitAsync(); + if (OpcUaClientSession != null) + { + State = SessionState.Connected; + } + else { - Trace(e, $"Session creation to endpoint '{EndpointUri.AbsoluteUri}' failed {++UnsuccessfulConnectionCount} time(s). Please verify if server is up and Publisher configuration is correct."); State = SessionState.Disconnected; - OpcUaClientSession = null; - return; - } - finally - { - await _opcSessionSemaphore.WaitAsync(); - if (OpcUaClientSession != null) - { - State = SessionState.Connected; - } - else - { - State = SessionState.Disconnected; - } } } } @@ -374,12 +415,19 @@ namespace OpcPublisher /// /// Monitoring for a node starts if it is required. /// - public async Task MonitorNodes() + public async Task MonitorNodes() { + bool requestConfigFileUpdate = false; try { await _opcSessionSemaphore.WaitAsync(); + // if session is not connected, return + if (State != SessionState.Connected) + { + return requestConfigFileUpdate; + } + // ensure all nodes in all subscriptions of this session are monitored. foreach (var opcSubscription in OpcSubscriptions) { @@ -393,8 +441,9 @@ namespace OpcPublisher } // process all unmonitored items. - var unmonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => i.State == OpcMonitoredItem.OpcMonitoredItemState.Unmonitored); + var unmonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => (i.State == OpcMonitoredItemState.Unmonitored || i.State == OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested)); + Trace($"Start monitoring nodes on endpoint '{EndpointUri.AbsoluteUri}'."); foreach (var item in unmonitoredItems) { // if the session is disconnected, we stop trying and wait for the next cycle @@ -403,18 +452,58 @@ namespace OpcPublisher break; } - Trace($"Start monitoring nodes on endpoint '{EndpointUri.AbsoluteUri}'."); - NodeId currentNodeId; + NodeId currentNodeId = null; try { - // lookup namespace index if ExpandedNodeId format has been used and build NodeId identifier. - if (!string.IsNullOrEmpty(item.StartNodeId.NamespaceUri)) + // update the namespace of the node if requested. there are two cases where this is requested: + // 1) publishing requests via the OPC server method are raised using a NodeId format. for those + // the NodeId format is converted into an ExpandedNodeId format + // 2) ExpandedNodeId configuration file entries do not have at parsing time a session to get + // the namespace index. this is set now. + if (item.State == OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested) { - currentNodeId = NodeId.Create(item.StartNodeId.Identifier, item.StartNodeId.NamespaceUri, _namespaceTable); + if (item.ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId) + { + int namespaceIndex = GetNamespaceIndex(item.ConfigExpandedNodeId?.NamespaceUri); + if (namespaceIndex < 0) + { + Trace($"The namespace URI of node '{item.ConfigExpandedNodeId.ToString()}' could be not mapped to a namespace index."); + } + else + { + item.ConfigExpandedNodeId = new ExpandedNodeId(item.ConfigExpandedNodeId.Identifier, (ushort)namespaceIndex, item.ConfigExpandedNodeId?.NamespaceUri, 0); + } + } + if (item.ConfigType == OpcMonitoredItemConfigurationType.NodeId) + { + string namespaceUri = GetNamespaceUri(item.ConfigNodeId.NamespaceIndex); + if (string.IsNullOrEmpty(namespaceUri)) + { + Trace($"The namespace index of node '{item.ConfigNodeId.ToString()}' is invalid and the node format could not be updated."); + } + else + { + item.ConfigExpandedNodeId = new ExpandedNodeId(item.ConfigNodeId.Identifier, item.ConfigNodeId.NamespaceIndex, namespaceUri, 0); + item.ConfigType = OpcMonitoredItemConfigurationType.ExpandedNodeId; + } + } + item.State = OpcMonitoredItemState.Unmonitored; + } + + // lookup namespace index if ExpandedNodeId format has been used and build NodeId identifier. + if (item.ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId) + { + int namespaceIndex = GetNamespaceIndex(item.ConfigExpandedNodeId?.NamespaceUri); + if (namespaceIndex < 0) + { + Trace($"Syntax or namespace URI of ExpandedNodeId '{item.ConfigExpandedNodeId.ToString()}' is invalid and will be ignored."); + continue; + } + currentNodeId = new NodeId(item.ConfigExpandedNodeId.Identifier, (ushort)namespaceIndex); } else { - currentNodeId = new NodeId((NodeId)item.StartNodeId); + currentNodeId = item.ConfigNodeId; } // if configured, get the DisplayName for the node, otherwise use the nodeId @@ -445,14 +534,15 @@ namespace OpcPublisher opcSubscription.OpcUaClientSubscription.SetPublishingMode(true); opcSubscription.OpcUaClientSubscription.ApplyChanges(); item.OpcUaClientMonitoredItem = monitoredItem; - item.State = OpcMonitoredItem.OpcMonitoredItemState.Monitoreded; + item.State = OpcMonitoredItemState.Monitored; item.EndpointUri = EndpointUri; - Trace($"Created monitored item for node '{currentNodeId}' in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'"); + Trace($"Created monitored item for node '{currentNodeId.ToString()}' in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'"); if (item.RequestedSamplingInterval != monitoredItem.SamplingInterval) { Trace($"Sampling interval: requested: {item.RequestedSamplingInterval}; revised: {monitoredItem.SamplingInterval}"); item.SamplingInterval = monitoredItem.SamplingInterval; } + requestConfigFileUpdate = true; } catch (Exception e) when (e.GetType() == typeof(ServiceResultException)) { @@ -469,20 +559,20 @@ namespace OpcPublisher case StatusCodes.BadNodeIdInvalid: case StatusCodes.BadNodeIdUnknown: { - Trace($"Failed to monitor node '{item.StartNodeId.Identifier}' on endpoint '{EndpointUri}'."); + Trace($"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'."); Trace($"OPC UA ServiceResultException is '{sre.Result}'. Please check your publisher configuration for this node."); break; } default: { - Trace($"Unhandled OPC UA ServiceResultException '{sre.Result}' when monitoring node '{item.StartNodeId.Identifier}' on endpoint '{EndpointUri}'. Continue."); + Trace($"Unhandled OPC UA ServiceResultException '{sre.Result}' when monitoring node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'. Continue."); break; } } } catch (Exception e) { - Trace(e, $"Failed to monitor node '{item.StartNodeId.Identifier}' on endpoint '{EndpointUri}'"); + Trace(e, $"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'"); } } } @@ -495,6 +585,117 @@ namespace OpcPublisher { _opcSessionSemaphore.Release(); } + return requestConfigFileUpdate; + } + + /// + /// Checks if there are monitored nodes tagged to stop monitoring and stop monitoring. + /// + public async Task StopMonitoringNodes() + { + bool requestConfigFileUpdate = false; + try + { + await _opcSessionSemaphore.WaitAsync(); + + // if session is not connected, return + if (State != SessionState.Connected) + { + return requestConfigFileUpdate; + } + + foreach (var opcSubscription in OpcSubscriptions) + { + // remove items tagged to stop in the stack + var itemsToRemove = opcSubscription.OpcMonitoredItems.Where(i => i.State == OpcMonitoredItemState.RemovalRequested); + if (itemsToRemove.Any()) + { + Trace($"Remove nodes in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'"); + try + { + opcSubscription.OpcUaClientSubscription.RemoveItems(itemsToRemove.Select(i => i.OpcUaClientMonitoredItem)); + Trace($"There are now {opcSubscription.OpcUaClientSubscription.MonitoredItemCount} monitored items in this subscription."); + } + catch + { + // nodes may be tagged for stop before they are monitored, just continue + } + // remove them in our data structure + opcSubscription.OpcMonitoredItems.RemoveAll(i => i.State == OpcMonitoredItemState.RemovalRequested); + Trace($"There are now {opcSubscription.OpcMonitoredItems.Count} items managed by publisher for this subscription."); + requestConfigFileUpdate = true; + } + } + } + finally + { + _opcSessionSemaphore.Release(); + } + return requestConfigFileUpdate; + } + + /// + /// Checks if there are subscriptions without any monitored items and remove them. + /// + public async Task RemoveUnusedSubscriptions() + { + try + { + await _opcSessionSemaphore.WaitAsync(); + + // if session is not connected, return + if (State != SessionState.Connected) + { + return; + } + + // remove the subscriptions in the stack + var subscriptionsToRemove = OpcSubscriptions.Where(i => i.OpcMonitoredItems.Count == 0); + if (subscriptionsToRemove.Any()) + { + Trace($"Remove unused subscriptions on endpoint '{EndpointUri}'."); + OpcUaClientSession.RemoveSubscriptions(subscriptionsToRemove.Select(s => s.OpcUaClientSubscription)); + Trace($"There are now {OpcUaClientSession.SubscriptionCount} subscriptions in this sessopm."); + } + // remove them in our data structures + OpcSubscriptions.RemoveAll(s => s.OpcMonitoredItems.Count == 0); + } + finally + { + _opcSessionSemaphore.Release(); + } + + } + + /// + /// Checks if there are session without any subscriptions and remove them. + /// + public async Task RemoveUnusedSessions() + { + try + { + await OpcSessionsListSemaphore.WaitAsync(); + + // if session is not connected, return + if (State != SessionState.Connected) + { + return; + } + + // remove sssions in the stack + var sessionsToRemove = OpcSessions.Where(s => s.OpcSubscriptions.Count == 0); + foreach (var sessionToRemove in sessionsToRemove) + { + Trace($"Remove unused session on endpoint '{EndpointUri}'."); + await sessionToRemove.ShutdownAsync(); + } + // remove then in our data structures + OpcSessions.RemoveAll(s => s.OpcSubscriptions.Count == 0); + } + finally + { + OpcSessionsListSemaphore.Release(); + } } /// @@ -510,6 +711,41 @@ namespace OpcPublisher _opcSessionSemaphore.Release(); } + /// + /// Returns the namespace URI for a namespace index. + /// + public string GetNamespaceUri(int namespaceIndex) + { + try + { + _opcSessionSemaphore.WaitAsync(); + + return _namespaceTable.ToArray().ElementAtOrDefault(namespaceIndex); + } + finally + { + _opcSessionSemaphore.Release(); + } + } + + /// + /// Returns the namespace index for a namespace URI. + /// + public int GetNamespaceIndex(string namespaceUri) + { + try + { + _opcSessionSemaphore.WaitAsync(); + + return _namespaceTable.GetIndex(namespaceUri); + } + finally + { + _opcSessionSemaphore.Release(); + } + } + + /// /// Internal disconnect method. Caller must have taken the _opcSessionSemaphore. /// @@ -540,7 +776,11 @@ namespace OpcPublisher // mark all monitored items as unmonitored foreach (var opcMonitoredItem in opcSubscription.OpcMonitoredItems) { - opcMonitoredItem.State = OpcMonitoredItem.OpcMonitoredItemState.Unmonitored; + // tag all monitored items as unmonitored + if (opcMonitoredItem.State == OpcMonitoredItemState.Monitored) + { + opcMonitoredItem.State = OpcMonitoredItemState.Unmonitored; + } } } try @@ -562,10 +802,10 @@ namespace OpcPublisher } /// - /// Adds a node to be monitored. If there is no session to the endpoint, one is created. - /// If there is no spubscription with the requested publishing interval, one is created. + /// Adds a node to be monitored. If there is no subscription with the requested publishing interval, + /// one is created. /// - public async Task AddNodeForMonitoring(int publishingInterval, int samplingInterval, NodeId nodeId) + public async Task AddNodeForMonitoring(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval) { try { @@ -577,30 +817,34 @@ namespace OpcPublisher } // check if there is already a subscription with the same publishing interval, which could be used to monitor the node - OpcSubscription opcSubscription = OpcSubscriptions.FirstOrDefault(s => s.RequestedPublishingInterval == publishingInterval); + OpcSubscription opcSubscription = OpcSubscriptions.FirstOrDefault(s => s.RequestedPublishingInterval == opcPublishingInterval); // if there was none found, create one if (opcSubscription == null) { - opcSubscription = new OpcSubscription(publishingInterval); + opcSubscription = new OpcSubscription(opcPublishingInterval); OpcSubscriptions.Add(opcSubscription); - Trace($"AddNodeForMonitoring: No matching subscription with publishing interval of {publishingInterval} found'. Requested to create a new one."); + Trace($"AddNodeForMonitoring: No matching subscription with publishing interval of {opcPublishingInterval} found'. Requested to create a new one."); } - // if it is already there, we just ignore it, otherwise we add a new item to monitor. - OpcMonitoredItem opcMonitoredItem = opcSubscription.OpcMonitoredItems.FirstOrDefault(m => m.StartNodeId == nodeId); - - // if there was none found, create one - if (opcMonitoredItem == null) + // if it is already published, we do nothing, else we create a new monitored item + if (!IsNodePublishedInSession(nodeId, expandedNodeId)) { + OpcMonitoredItem opcMonitoredItem = null; // add a new item to monitor - opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUri) + if (expandedNodeId == null) { - RequestedSamplingInterval = samplingInterval - }; + opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUri, true); + } + else + { + opcMonitoredItem = new OpcMonitoredItem(expandedNodeId, EndpointUri); + } + opcMonitoredItem.RequestedSamplingInterval = opcSamplingInterval; opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem); - Trace($"AddNodeForMonitoring: Added item with nodeId '{nodeId.ToString()}' for monitoring."); + Trace($"AddNodeForMonitoring: Added item with nodeId '{(expandedNodeId == null ? nodeId.ToString() : expandedNodeId.ToString())}' for monitoring."); + // update the publishing data // Start publishing. Task.Run(async () => await ConnectAndMonitorAsync()); } @@ -616,9 +860,9 @@ namespace OpcPublisher } /// - /// Tags a monitored node to stop monitoring. + /// Tags a monitored node to stop monitoring and remove it. /// - public async Task TagNodeForMonitoringStop(NodeId nodeId) + public async Task RequestMonitorItemRemoval(NodeId nodeId, ExpandedNodeId expandedNodeId) { try { @@ -629,17 +873,18 @@ namespace OpcPublisher return; } - // find all subscriptions the node is monitored on. - var opcSubscriptions = OpcSubscriptions.Where( s => s.OpcMonitoredItems.Any(m => m.StartNodeId == nodeId)); - // tag all monitored items with nodeId to stop monitoring. - foreach (var opcSubscription in opcSubscriptions) + // if the node to tag is specified as NodeId, it will also tag nodes configured + // in ExpandedNodeId format. + foreach (var opcSubscription in OpcSubscriptions) { - var opcMonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => i.StartNodeId == nodeId); + var opcMonitoredItems = opcSubscription.OpcMonitoredItems.Where(m => { return m.IsMonitoringThisNode(nodeId, expandedNodeId, _namespaceTable); }); foreach (var opcMonitoredItem in opcMonitoredItems) { // tag it for removal. - opcMonitoredItem.State = OpcMonitoredItem.OpcMonitoredItemState.StopMonitoring; + opcMonitoredItem.State = OpcMonitoredItemState.RemovalRequested; + Trace("RequestMonitorItemRemoval: Node " + + $"'{(expandedNodeId == null ? nodeId.ToString() : expandedNodeId.ToString())}' tagged to stop monitoring."); } } @@ -648,7 +893,7 @@ namespace OpcPublisher } catch (Exception e) { - Trace(e, $"TagNodeForMonitoringStop: Exception while trying to tag node '{nodeId.ToString()}' to stop monitoring. (message: '{e.Message}'"); + Trace(e, $"RequestMonitorItemRemoval: Exception while trying to tag node '{nodeId.ToString()}' to stop monitoring. (message: '{e.Message}'"); } finally { @@ -657,9 +902,69 @@ namespace OpcPublisher } /// - /// Shutsdown all connected sessions. + /// Checks if the node specified by either the given NodeId or ExpandedNodeId on the given endpoint is published in the session. /// - public async Task ShutdownAsync() + private bool IsNodePublishedInSession(NodeId nodeId, ExpandedNodeId expandedNodeId) + { + try + { + _opcSessionSemaphore.WaitAsync(); + + foreach (var opcSubscription in OpcSubscriptions) + { + if (opcSubscription.OpcMonitoredItems.Any(m => { return m.IsMonitoringThisNode(nodeId, expandedNodeId, _namespaceTable); })) + { + return true; + } + } + } + catch (Exception e) + { + Trace(e, "Check if node is published failed."); + } + finally + { + _opcSessionSemaphore.Release(); + } + return false; + } + + /// + /// Checks if the node specified by either the given NodeId or ExpandedNodeId on the given endpoint is published. + /// + public static bool IsNodePublished(NodeId nodeId, ExpandedNodeId expandedNodeId, Uri endpointUri) + { + try + { + OpcSessionsListSemaphore.WaitAsync(); + + // itereate through all sessions, subscriptions and monitored items and create config file entries + foreach (var opcSession in OpcSessions) + { + if (opcSession.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)) + { + if (opcSession.IsNodePublishedInSession(nodeId, expandedNodeId)) + { + return true; + } + } + } + } + catch (Exception e) + { + Trace(e, "Check if node is published failed."); + } + finally + { + OpcSessionsListSemaphore.Release(); + } + return false; + } + + /// + /// Shutdown the current session if it is connected. + /// + public async Task ShutdownAsync() { try { diff --git a/src/OpcStackConfiguration.cs b/src/OpcStackConfiguration.cs index dc17bf2..a83c236 100644 --- a/src/OpcStackConfiguration.cs +++ b/src/OpcStackConfiguration.cs @@ -5,7 +5,6 @@ using System.Security.Cryptography.X509Certificates; namespace OpcPublisher { - using System.IO; using System.Threading.Tasks; using static Opc.Ua.CertificateStoreType; using static OpcPublisher.Workarounds.TraceWorkaround; diff --git a/src/Program.cs b/src/Program.cs index ec2c90b..112ac30 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -1,9 +1,7 @@  using Mono.Options; -using Newtonsoft.Json; using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Reflection; using System.Threading; @@ -19,25 +17,14 @@ namespace OpcPublisher using static OpcPublisher.Workarounds.TraceWorkaround; using static OpcSession; using static OpcStackConfiguration; + using static PublisherNodeConfiguration; using static System.Console; public class Program { - public static List OpcSessions = new List(); - public static SemaphoreSlim OpcSessionsListSemaphore = new SemaphoreSlim(1); - public static List PublisherConfigFileEntries = new List(); - public static List PublishConfig = new List(); - public static SemaphoreSlim PublishDataSemaphore = new SemaphoreSlim(1); public static IotHubMessaging IotHubCommunication; public static CancellationTokenSource ShutdownTokenSource; - public static string NodesToPublishAbsFilename - { - get => _nodesToPublishAbsFilename; - set => _nodesToPublishAbsFilename = value; - } - private static string _nodesToPublishAbsFilename; - public static uint PublisherShutdownWaitPeriod { get => _publisherShutdownWaitPeriod; @@ -49,7 +36,6 @@ namespace OpcPublisher private static DateTime _lastServerSessionEventTime = DateTime.UtcNow; private static bool _opcTraceInitialized = false; private static int _publisherSessionConnectWaitSec = 10; - private static string _nodesToPublishAbsFilenameDefault = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json"; /// /// Usage message. @@ -104,7 +90,7 @@ namespace OpcPublisher // command line options configuration Mono.Options.OptionSet options = new Mono.Options.OptionSet { // Publishing configuration options - { "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{_nodesToPublishAbsFilenameDefault}'", (string p) => _nodesToPublishAbsFilename = p }, + { "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{PublisherNodeConfigurationFilename}'", (string p) => PublisherNodeConfigurationFilename = p }, { "sd|shopfloordomain=", $"the domain of the shopfloor. if specified this domain is appended (delimited by a ':' to the 'ApplicationURI' property when telemetry is sent to IoTHub.\n" + "The value must follow the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => { Regex domainNameRegex = new Regex("^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$"); @@ -440,121 +426,23 @@ namespace OpcPublisher return; } - // get information on the nodes to publish and validate the json by deserializing it. - try + // Read node configuration file + PublisherNodeConfiguration publisherNodeConfiguration = new PublisherNodeConfiguration(); + if (!await publisherNodeConfiguration.ReadConfigAsync()) { - await PublishDataSemaphore.WaitAsync(); - if (string.IsNullOrEmpty(_nodesToPublishAbsFilename)) - { - // check if we have an env variable specifying the published nodes path, otherwise use the default - _nodesToPublishAbsFilename = _nodesToPublishAbsFilenameDefault; - if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_PNFP"))) - { - Trace("Publishing node configuration file path read from environment."); - _nodesToPublishAbsFilename = Environment.GetEnvironmentVariable("_GW_PNFP"); - } - } - - Trace($"Attempting to load nodes file from: {_nodesToPublishAbsFilename}"); - PublisherConfigFileEntries = JsonConvert.DeserializeObject>(File.ReadAllText(_nodesToPublishAbsFilename)); - Trace($"Loaded {PublisherConfigFileEntries.Count.ToString()} config file entry/entries."); - - foreach (var publisherConfigFileEntry in PublisherConfigFileEntries) - { - if (publisherConfigFileEntry.NodeId == null) - { - // new node configuration syntax. - foreach (var opcNode in publisherConfigFileEntry.OpcNodes) - { - PublishConfig.Add(new NodeToPublishConfig(ExpandedNodeId.Parse(opcNode.ExpandedNodeId), publisherConfigFileEntry.EndpointUri, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval)); - } - } - else - { - // legacy (using ns=) node configuration syntax using default sampling and publishing interval. - PublishConfig.Add(new NodeToPublishConfig(publisherConfigFileEntry.NodeId, publisherConfigFileEntry.EndpointUri, OpcSamplingInterval, OpcPublishingInterval)); - } - } - } - catch (Exception e) - { - Trace(e, "Loading of the node configuration file failed. Does the file exist and has correct syntax?"); - Trace("exiting..."); return; } - finally - { - PublishDataSemaphore.Release(); - } - Trace($"There are {PublishConfig.Count.ToString()} nodes to publish."); // initialize and start IoTHub messaging IotHubCommunication = new IotHubMessaging(); - if (! await IotHubCommunication.InitAsync()) + if (!await IotHubCommunication.InitAsync()) { return; } - // create a list to manage sessions, subscriptions and monitored items. - try + if (!await publisherNodeConfiguration.CreateOpcPublishingDataAsync()) { - await PublishDataSemaphore.WaitAsync(); - await OpcSessionsListSemaphore.WaitAsync(); - var uniqueEndpointUrls = PublishConfig.Select(n => n.EndpointUri).Distinct(); - foreach (var endpointUrl in uniqueEndpointUrls) - { - // create new session info. - OpcSession opcSession = new OpcSession(endpointUrl, OpcSessionCreationTimeout); - - // create a subscription for each distinct publishing inverval - var nodesDistinctPublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Select(c => c.OpcPublishingInterval).Distinct(); - foreach (var nodeDistinctPublishingInterval in nodesDistinctPublishingInterval) - { - // create a subscription for the publishing interval and add it to the session. - OpcSubscription opcSubscription = new OpcSubscription(nodeDistinctPublishingInterval); - - // add all nodes with this OPC publishing interval to this subscription. - var nodesWithSamePublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Where(n => n.OpcPublishingInterval == nodeDistinctPublishingInterval); - foreach (var nodeInfo in nodesWithSamePublishingInterval) - { - // differentiate if legacy (using ns=) or new syntax (using nsu=) is used - if (nodeInfo.NodeId == null) - { - // create a monitored item for the node - OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.ExpandedNodeId, opcSession.EndpointUri) - { - RequestedSamplingInterval = nodeInfo.OpcSamplingInterval, - SamplingInterval = nodeInfo.OpcSamplingInterval - }; - opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem); - } - else - { - // give user a warning that the syntax is obsolete - Trace($"Please update the syntax of the configuration file and use ExpandedNodeId instead of NodeId property name for node with identifier '{nodeInfo.NodeId.ToString()}' on EndpointUrl '{nodeInfo.EndpointUri.AbsolutePath}'."); - - // create a monitored item for the node with the configured or default sampling interval - OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUri) - { - RequestedSamplingInterval = nodeInfo.OpcSamplingInterval, - SamplingInterval = nodeInfo.OpcSamplingInterval - }; - opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem); - } - } - - // add subscription to session. - opcSession.OpcSubscriptions.Add(opcSubscription); - } - - // add session. - OpcSessions.Add(opcSession); - } - } - finally - { - OpcSessionsListSemaphore.Release(); - PublishDataSemaphore.Release(); + return; } // kick off the task to maintain all sessions @@ -573,7 +461,7 @@ namespace OpcPublisher WriteLine(""); ReadLine(); ShutdownTokenSource.Cancel(); - WriteLine("Publisher is shuting down..."); + WriteLine("Publisher is shutting down..."); // Wait for session connector completion await sessionConnectorAsync; diff --git a/src/PublisherConfig.cs b/src/PublisherConfig.cs deleted file mode 100644 index 565e802..0000000 --- a/src/PublisherConfig.cs +++ /dev/null @@ -1,72 +0,0 @@ - -using Newtonsoft.Json; -using Opc.Ua; -using System; -using System.Collections.Generic; - -namespace OpcPublisher -{ - /// - /// Class describing a list of nodes in the ExpandedNodeId format (using nsu as namespace syntax) - /// - public class OpcNodesOnEndpointUrl - { - public string ExpandedNodeId; - [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] - public int? OpcSamplingInterval; - [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] - public int? OpcPublishingInterval; - } - - /// - /// Class describing the nodes which should be published. It supports three formats: - /// - NodeId syntax using the namespace index (ns) syntax - /// - ExpandedNodeId syntax, using the namespace URI (nsu) syntax - /// - List of ExpandedNodeId syntax, to allow putting nodes with similar publishing and/or sampling intervals in one object - /// - public partial class PublisherConfigFileEntry - { - public PublisherConfigFileEntry() - { - } - - public PublisherConfigFileEntry(string nodeId, string endpointUrl) - { - NodeId = new NodeId(nodeId); - EndpointUri = new Uri(endpointUrl); - } - - [JsonProperty("EndpointUrl")] - public Uri EndpointUri; - [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] - public NodeId NodeId; - [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] - public List OpcNodes; - } - - public class NodeToPublishConfig - { - public Uri EndpointUri; - public NodeId NodeId; - public ExpandedNodeId ExpandedNodeId; - public int OpcSamplingInterval; - public int OpcPublishingInterval; - - public NodeToPublishConfig(NodeId nodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval) - { - NodeId = nodeId; - ExpandedNodeId = null; - EndpointUri = endpointUri; - OpcSamplingInterval = opcSamplingInterval; - OpcPublishingInterval = opcPublishingInterval; - } - public NodeToPublishConfig(ExpandedNodeId expandedNodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval) - { - NodeId = null; - ExpandedNodeId = expandedNodeId; - EndpointUri = endpointUri; - OpcSamplingInterval = opcSamplingInterval; - OpcPublishingInterval = opcPublishingInterval; - } - } -} diff --git a/src/PublisherNodeConfiguration.cs b/src/PublisherNodeConfiguration.cs new file mode 100644 index 0000000..ad2343f --- /dev/null +++ b/src/PublisherNodeConfiguration.cs @@ -0,0 +1,338 @@ + +using Newtonsoft.Json; +using Opc.Ua; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace OpcPublisher +{ + using System.IO; + using System.Linq; + using System.Threading; + using static OpcMonitoredItem; + using static OpcPublisher.Workarounds.TraceWorkaround; + using static OpcStackConfiguration; + + public class PublisherNodeConfiguration + { + public static SemaphoreSlim PublisherNodeConfigurationSemaphore = new SemaphoreSlim(1); + public static List OpcSessions = new List(); + public static SemaphoreSlim OpcSessionsListSemaphore = new SemaphoreSlim(1); + + public static string PublisherNodeConfigurationFilename + { + get => _publisherNodeConfigurationFilename; + set => _publisherNodeConfigurationFilename = value; + } + private static string _publisherNodeConfigurationFilename = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json"; + + private List _nodePublishingConfiguration; + private static List _configurationFileEntries = new List(); + + public PublisherNodeConfiguration() + { + _nodePublishingConfiguration = new List(); + } + + /// + /// Read and parse the publisher node configuration file. + /// + /// + public async Task ReadConfigAsync() + { + // get information on the nodes to publish and validate the json by deserializing it. + try + { + await PublisherNodeConfigurationSemaphore.WaitAsync(); + if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_PNFP"))) + { + Trace("Publishing node configuration file path read from environment."); + _publisherNodeConfigurationFilename = Environment.GetEnvironmentVariable("_GW_PNFP"); + } + + Trace($"Attempting to load nodes file from: {_publisherNodeConfigurationFilename}"); + _configurationFileEntries = JsonConvert.DeserializeObject>(File.ReadAllText(_publisherNodeConfigurationFilename)); + Trace($"Loaded {_configurationFileEntries.Count} config file entry/entries."); + + foreach (var publisherConfigFileEntry in _configurationFileEntries) + { + if (publisherConfigFileEntry.NodeId == null) + { + // new node configuration syntax. + foreach (var opcNode in publisherConfigFileEntry.OpcNodes) + { + ExpandedNodeId expandedNodeId = ExpandedNodeId.Parse(opcNode.ExpandedNodeId); + _nodePublishingConfiguration.Add(new NodePublishingConfiguration(expandedNodeId, publisherConfigFileEntry.EndpointUri, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval)); + } + } + else + { + // NodeId (ns=) format node configuration syntax using default sampling and publishing interval. + _nodePublishingConfiguration.Add(new NodePublishingConfiguration(publisherConfigFileEntry.NodeId, publisherConfigFileEntry.EndpointUri, OpcSamplingInterval, OpcPublishingInterval)); + // give user a warning that the syntax is obsolete + Trace($"Please update the syntax of the configuration file and use ExpandedNodeId instead of NodeId property name for node with identifier '{publisherConfigFileEntry.NodeId.ToString()}' on EndpointUrl '{publisherConfigFileEntry.EndpointUri.AbsoluteUri}'."); + + } + } + } + catch (Exception e) + { + Trace(e, "Loading of the node configuration file failed. Does the file exist and has correct syntax?"); + Trace("exiting..."); + return false; + } + finally + { + PublisherNodeConfigurationSemaphore.Release(); + } + Trace($"There are {_nodePublishingConfiguration.Count.ToString()} nodes to publish."); + return true; + } + + /// + /// Create the publisher data structures to manage OPC sessions, subscriptions and monitored items. + /// + /// + public async Task CreateOpcPublishingDataAsync() + { + // create a list to manage sessions, subscriptions and monitored items. + try + { + await PublisherNodeConfigurationSemaphore.WaitAsync(); + await OpcSessionsListSemaphore.WaitAsync(); + + var uniqueEndpointUris = _nodePublishingConfiguration.Select(n => n.EndpointUri).Distinct(); + foreach (var endpointUri in uniqueEndpointUris) + { + // create new session info. + OpcSession opcSession = new OpcSession(endpointUri, OpcSessionCreationTimeout); + + // create a subscription for each distinct publishing inverval + var nodesDistinctPublishingInterval = _nodePublishingConfiguration.Where(n => n.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)).Select(c => c.OpcPublishingInterval).Distinct(); + foreach (var nodeDistinctPublishingInterval in nodesDistinctPublishingInterval) + { + // create a subscription for the publishing interval and add it to the session. + OpcSubscription opcSubscription = new OpcSubscription(nodeDistinctPublishingInterval); + + // add all nodes with this OPC publishing interval to this subscription. + var nodesWithSamePublishingInterval = _nodePublishingConfiguration.Where(n => n.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)).Where(n => n.OpcPublishingInterval == nodeDistinctPublishingInterval); + foreach (var nodeInfo in nodesWithSamePublishingInterval) + { + // differentiate if NodeId or ExpandedNodeId format is used + if (nodeInfo.NodeId == null) + { + // create a monitored item for the node, we do not have the namespace index without a connected session. + // so request a namespace update. + OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.ExpandedNodeId, opcSession.EndpointUri, true) + { + RequestedSamplingInterval = nodeInfo.OpcSamplingInterval, + SamplingInterval = nodeInfo.OpcSamplingInterval + }; + opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem); + } + else + { + // create a monitored item for the node with the configured or default sampling interval + OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUri) + { + RequestedSamplingInterval = nodeInfo.OpcSamplingInterval, + SamplingInterval = nodeInfo.OpcSamplingInterval + }; + opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem); + } + } + + // add subscription to session. + opcSession.OpcSubscriptions.Add(opcSubscription); + } + + // add session. + OpcSessions.Add(opcSession); + } + } + catch (Exception e) + { + Trace(e, "Creation of the internal OPC data managment structures failed."); + Trace("exiting..."); + return false; + } + finally + { + OpcSessionsListSemaphore.Release(); + PublisherNodeConfigurationSemaphore.Release(); + } + return true; + } + + /// + /// Returns a list of all published nodes for a specific endpoint in config file format. + /// + /// + public static async Task> GetPublisherConfigurationFileEntries(Uri endpointUri, OpcMonitoredItemConfigurationType? requestedType, bool getAll) + { + List publisherConfigurationFileEntries = new List(); + try + { + await PublisherNodeConfigurationSemaphore.WaitAsync(); + + // itereate through all sessions, subscriptions and monitored items and create config file entries + foreach (var session in OpcSessions) + { + if (endpointUri == null || session.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)) + { + PublisherConfigurationFileEntry publisherConfigurationFileEntry = new PublisherConfigurationFileEntry(); + + publisherConfigurationFileEntry.EndpointUri = session.EndpointUri; + publisherConfigurationFileEntry.NodeId = null; + publisherConfigurationFileEntry.OpcNodes = null; + foreach (var subscription in session.OpcSubscriptions) + { + foreach (var monitoredItem in subscription.OpcMonitoredItems) + { + // ignore items tagged to stop + if (monitoredItem.State != OpcMonitoredItemState.RemovalRequested || getAll == true) + { + OpcNodeOnEndpointUrl opcNodeOnEndpointUrl = new OpcNodeOnEndpointUrl(); + if (monitoredItem.ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId) + { + // for certain scenarios we support returning the NodeId format even so the + // actual configuration of the node was in ExpandedNodeId format + if (requestedType == OpcMonitoredItemConfigurationType.NodeId) + { + PublisherConfigurationFileEntry legacyPublisherConfigFileEntry = new PublisherConfigurationFileEntry(); + legacyPublisherConfigFileEntry.EndpointUri = session.EndpointUri; + legacyPublisherConfigFileEntry.NodeId = new NodeId(monitoredItem.ConfigExpandedNodeId.Identifier, (ushort)session.GetNamespaceIndex(monitoredItem.ConfigExpandedNodeId?.NamespaceUri)); + publisherConfigurationFileEntries.Add(legacyPublisherConfigFileEntry); + } + else + { + opcNodeOnEndpointUrl.ExpandedNodeId = monitoredItem.ConfigExpandedNodeId.ToString(); + opcNodeOnEndpointUrl.OpcPublishingInterval = (int)subscription.RequestedPublishingInterval; + opcNodeOnEndpointUrl.OpcSamplingInterval = monitoredItem.RequestedSamplingInterval; + if (publisherConfigurationFileEntry.OpcNodes == null) + { + publisherConfigurationFileEntry.OpcNodes = new List(); + } + publisherConfigurationFileEntry.OpcNodes.Add(opcNodeOnEndpointUrl); + } + } + else + { + // we do not convert nodes with legacy configuration to the new format to keep backward + // compatibility with external configurations. + // the conversion would only be possible, if the session is connected, to have access to the + // server namespace array. + PublisherConfigurationFileEntry legacyPublisherConfigFileEntry = new PublisherConfigurationFileEntry(); + legacyPublisherConfigFileEntry.EndpointUri = session.EndpointUri; + legacyPublisherConfigFileEntry.NodeId = monitoredItem.ConfigNodeId; + publisherConfigurationFileEntries.Add(legacyPublisherConfigFileEntry); + } + } + } + } + if (publisherConfigurationFileEntry.OpcNodes != null) + { + publisherConfigurationFileEntries.Add(publisherConfigurationFileEntry); + } + } + } + } + catch (Exception e) + { + Trace(e, "Creation of configuration file entries failed."); + } + finally + { + PublisherNodeConfigurationSemaphore.Release(); + } + return publisherConfigurationFileEntries; + } + + /// + /// Updates the configuration file to persist all currently published nodes + /// + public static async Task UpdateNodeConfigurationFile() + { + try + { + // itereate through all sessions, subscriptions and monitored items and create config file entries + List publisherNodeConfiguration = await GetPublisherConfigurationFileEntries(null, null, true); + + // update the config file + File.WriteAllText(PublisherNodeConfigurationFilename, JsonConvert.SerializeObject(publisherNodeConfiguration, Formatting.Indented)); + } + catch (Exception e) + { + Trace(e, "Update of node configuration file failed."); + } + } + } + + /// + /// Class describing a list of nodes in the ExpandedNodeId format + /// + public class OpcNodeOnEndpointUrl + { + public string ExpandedNodeId; + [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] + public int? OpcSamplingInterval; + [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] + public int? OpcPublishingInterval; + } + + /// + /// Class describing the nodes which should be published. It supports three formats: + /// - NodeId syntax using the namespace index (ns) syntax + /// - ExpandedNodeId syntax, using the namespace URI (nsu) syntax + /// - List of ExpandedNodeId syntax, to allow putting nodes with similar publishing and/or sampling intervals in one object + /// + public partial class PublisherConfigurationFileEntry + { + public PublisherConfigurationFileEntry() + { + } + + public PublisherConfigurationFileEntry(string nodeId, string endpointUrl) + { + NodeId = new NodeId(nodeId); + EndpointUri = new Uri(endpointUrl); + } + + [JsonProperty("EndpointUrl")] + public Uri EndpointUri; + [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] + public NodeId NodeId; + [JsonProperty(NullValueHandling = NullValueHandling.Ignore)] + public List OpcNodes; + } + + /// + /// Describes the publishing information of a node. + /// + public class NodePublishingConfiguration + { + public Uri EndpointUri; + public NodeId NodeId; + public ExpandedNodeId ExpandedNodeId; + public int OpcSamplingInterval; + public int OpcPublishingInterval; + + public NodePublishingConfiguration(NodeId nodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval) + { + NodeId = nodeId; + ExpandedNodeId = null; + EndpointUri = endpointUri; + OpcSamplingInterval = opcSamplingInterval; + OpcPublishingInterval = opcPublishingInterval; + } + public NodePublishingConfiguration(ExpandedNodeId expandedNodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval) + { + NodeId = null; + ExpandedNodeId = expandedNodeId; + EndpointUri = endpointUri; + OpcSamplingInterval = opcSamplingInterval; + OpcPublishingInterval = opcPublishingInterval; + } + } +} diff --git a/src/PublisherNodeManager.cs b/src/PublisherNodeManager.cs index 303f1b2..0b09084 100644 --- a/src/PublisherNodeManager.cs +++ b/src/PublisherNodeManager.cs @@ -8,16 +8,17 @@ namespace OpcPublisher { using IoTHubCredentialTools; using Newtonsoft.Json; - using System.IO; using System.Linq; using static IotHubMessaging; + using static OpcPublisher.OpcMonitoredItem; using static OpcPublisher.Program; using static OpcPublisher.Workarounds.TraceWorkaround; using static OpcStackConfiguration; + using static PublisherNodeConfiguration; + public class PublisherNodeManager : CustomNodeManager2 { - public PublisherNodeManager(Opc.Ua.Server.IServerInternal server, ApplicationConfiguration configuration) : base(server, configuration, Namespaces.PublisherApplications) { @@ -114,8 +115,8 @@ namespace OpcPublisher MethodState unpublishNodeMethod = CreateMethod(methodsFolder, "UnpublishNode", "UnpublishNode"); SetUnpublishNodeMethodProperties(ref unpublishNodeMethod); - MethodState getListOfPublishedNodesMethod = CreateMethod(methodsFolder, "GetListOfPublishedNodes", "GetListOfPublishedNodes"); - SetGetListOfPublishedNodesMethodProperties(ref getListOfPublishedNodesMethod); + MethodState getPublishedNodesMethod = CreateMethod(methodsFolder, "GetPublishedNodes", "GetPublishedNodes"); + SetGetPublishedNodesMethodProperties(ref getPublishedNodesMethod); } catch (Exception e) { @@ -127,10 +128,27 @@ namespace OpcPublisher } /// - /// Sets properies of the GetListOfPublishedNodes method. + /// Sets properies of the GetPublishedNodes method. /// - private void SetGetListOfPublishedNodesMethodProperties(ref MethodState method) + private void SetGetPublishedNodesMethodProperties(ref MethodState method) { + // define input arguments + method.InputArguments = new PropertyState(method) + { + NodeId = new NodeId(method.BrowseName.Name + "InArgs", NamespaceIndex), + BrowseName = BrowseNames.InputArguments + }; + method.InputArguments.DisplayName = method.InputArguments.BrowseName.Name; + method.InputArguments.TypeDefinitionId = VariableTypeIds.PropertyType; + method.InputArguments.ReferenceTypeId = ReferenceTypeIds.HasProperty; + method.InputArguments.DataType = DataTypeIds.Argument; + method.InputArguments.ValueRank = ValueRanks.OneDimension; + + method.InputArguments.Value = new Argument[] + { + new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server to return the published nodes for.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar } + }; + // set output arguments method.OutputArguments = new PropertyState(method) { @@ -147,7 +165,7 @@ namespace OpcPublisher { new Argument() { Name = "Published nodes", Description = "List of the nodes published by Publisher", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar } }; - method.OnCallMethod = new GenericMethodCalledEventHandler(OnGetListOfPublishedNodesCall); + method.OnCallMethod = new GenericMethodCalledEventHandler(OnGetPublishedNodesCall); } /// @@ -169,7 +187,7 @@ namespace OpcPublisher method.InputArguments.Value = new Argument[] { - new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in 'ns=' syntax.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, + new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in NodeId format.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server owning the node.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar } }; @@ -195,7 +213,7 @@ namespace OpcPublisher method.InputArguments.Value = new Argument[] { - new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in 'ns=' syntax.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, + new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in NodeId format.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server owning the node.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, }; @@ -412,25 +430,18 @@ namespace OpcPublisher /// private ServiceResult OnPublishNodeCall(ISystemContext context, MethodState method, IList inputArguments, IList outputArguments) { - if (inputArguments[0] == null || inputArguments[1] == null) + if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string)) { Trace("PublishNode: Invalid Arguments when trying to publish a node."); - return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments!"); + return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!"); } - NodeToPublishConfig nodeToPublish; NodeId nodeId = null; Uri endpointUri = null; try { - if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string)) - { - Trace($"PublishNode: Arguments (0 (nodeId), 1 (endpointUrl)) are not valid strings!"); - return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!"); - } nodeId = NodeId.Parse(inputArguments[0] as string); endpointUri = new Uri(inputArguments[1] as string); - nodeToPublish = new NodeToPublishConfig(nodeId, endpointUri, OpcSamplingInterval, OpcPublishingInterval); } catch (UriFormatException) { @@ -440,12 +451,15 @@ namespace OpcPublisher catch (Exception e) { Trace(e, $"PublishNode: The NodeId has an invalid format '{inputArguments[0] as string}'!"); - return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA NodeId in 'ns=' syntax as first argument!"); + return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA NodeId in NodeId format as first argument!"); } // find/create a session to the endpoint URL and start monitoring the node. try { + // lock the publishing configuration till we are done + OpcSessionsListSemaphore.WaitAsync(); + if (ShutdownTokenSource.IsCancellationRequested) { return ServiceResult.Create(StatusCodes.BadUnexpectedError, $"Publisher shutdown in progress."); @@ -453,59 +467,45 @@ namespace OpcPublisher // find the session we need to monitor the node OpcSession opcSession = null; - try - { - OpcSessionsListSemaphore.WaitAsync(); - opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == nodeToPublish.EndpointUri); + opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)); + string namespaceUri = null; + ExpandedNodeId expandedNodeId = null; - // add a new session. - if (opcSession == null) + // add a new session. + if (opcSession == null) + { + // create new session info. + opcSession = new OpcSession(endpointUri, OpcSessionCreationTimeout); + OpcSessions.Add(opcSession); + Trace($"PublishNode: No matching session found for endpoint '{endpointUri.OriginalString}'. Requested to create a new one."); + } + else + { + Trace($"PublishNode: Session found for endpoint '{endpointUri.OriginalString}'"); + + // check if node is already published + namespaceUri = opcSession.GetNamespaceUri(nodeId.NamespaceIndex); + if (string.IsNullOrEmpty(namespaceUri)) { - // create new session info. - opcSession = new OpcSession(nodeToPublish.EndpointUri, OpcSessionCreationTimeout); - OpcSessions.Add(opcSession); - Trace($"PublishNode: No matching session found for endpoint '{nodeToPublish.EndpointUri.OriginalString}'. Requested to create a new one."); + return ServiceResult.Create(StatusCodes.BadUnexpectedError, $"The namespace index of the node id is invalid."); } - else - { - Trace($"PublishNode: Session found for endpoint '{nodeToPublish.EndpointUri.OriginalString}'"); - } - - // add the node info to the subscription with the default publishing interval, execute syncronously - opcSession.AddNodeForMonitoring(OpcPublishingInterval, OpcSamplingInterval, nodeToPublish.NodeId).Wait(); - Trace($"PublishNode: Requested to monitor item with NodeId '{nodeToPublish.NodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})"); - } - finally - { - OpcSessionsListSemaphore.Release(); + expandedNodeId = new ExpandedNodeId(nodeId.Identifier, nodeId.NamespaceIndex, namespaceUri, 0); } - // update our data - try - { - PublishDataSemaphore.WaitAsync(); - PublishConfig.Add(nodeToPublish); - - // add it also to the publish file - var publisherConfigFileEntry = new PublisherConfigFileEntry() - { - EndpointUri = endpointUri, - NodeId = nodeId - }; - PublisherConfigFileEntries.Add(publisherConfigFileEntry); - File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublisherConfigFileEntries)); - } - finally - { - PublishDataSemaphore.Release(); - } - return ServiceResult.Good; + // add the node info to the subscription with the default publishing interval, execute syncronously + opcSession.AddNodeForMonitoring(nodeId, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval).Wait(); + Trace($"PublishNode: Requested to monitor item with NodeId '{nodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})"); } catch (Exception e) { - Trace(e, $"PublishNode: Exception while trying to configure publishing node '{nodeToPublish.NodeId.ToString()}'"); + Trace(e, $"PublishNode: Exception while trying to configure publishing node '{nodeId.ToString()}'"); return ServiceResult.Create(e, StatusCodes.BadUnexpectedError, $"Unexpected error publishing node: {e.Message}"); } + finally + { + OpcSessionsListSemaphore.Release(); + } + return ServiceResult.Good; } /// @@ -513,7 +513,7 @@ namespace OpcPublisher /// private ServiceResult OnUnpublishNodeCall(ISystemContext context, MethodState method, IList inputArguments, IList outputArguments) { - if (inputArguments[0] == null || inputArguments[1] == null) + if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string)) { Trace("UnpublishNode: Invalid arguments!"); return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments!"); @@ -523,12 +523,7 @@ namespace OpcPublisher Uri endpointUri = null; try { - if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string)) - { - Trace($"UnpublishNode: Arguments (0 (nodeId), 1 (endpointUrl)) are not valid strings!"); - return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!"); - } - nodeId = inputArguments[0] as string; + nodeId = NodeId.Parse(inputArguments[0] as string); endpointUri = new Uri(inputArguments[1] as string); } catch (UriFormatException) @@ -550,7 +545,7 @@ namespace OpcPublisher try { OpcSessionsListSemaphore.WaitAsync(); - opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == endpointUri); + opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)); } catch { @@ -561,57 +556,73 @@ namespace OpcPublisher OpcSessionsListSemaphore.Release(); } + + + ExpandedNodeId expandedNodeId = null; if (opcSession == null) { // do nothing if there is no session for this endpoint. Trace($"UnpublishNode: Session for endpoint '{endpointUri.OriginalString}' not found."); - return ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session for endpoint of published node not found!"); + return ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session for endpoint of node to unpublished not found!"); } else { - Trace($"UnpublishNode: Session found for endpoint '{endpointUri.OriginalString}'"); + // check if node is already published + string namespaceUri = opcSession.GetNamespaceUri(nodeId.NamespaceIndex); + if (string.IsNullOrEmpty(namespaceUri)) + { + return ServiceResult.Create(StatusCodes.BadUnexpectedError, $"The namespace index of the node id is invalid."); + } + expandedNodeId = new ExpandedNodeId(nodeId.Identifier, nodeId.NamespaceIndex, namespaceUri, 0); + if (!OpcSession.IsNodePublished(nodeId, expandedNodeId, endpointUri)) + { + Trace($"UnpublishNode: Node with id '{nodeId.Identifier.ToString()}' on endpoint '{endpointUri.OriginalString}' is not published."); + return ServiceResult.Good; + } } // remove the node from the sessions monitored items list. - opcSession.TagNodeForMonitoringStop(nodeId).Wait(); + opcSession.RequestMonitorItemRemoval(nodeId, expandedNodeId).Wait(); Trace("UnpublishNode: Requested to stop monitoring of node."); - - // remove node from persisted config file - try - { - PublishDataSemaphore.WaitAsync(); - var entryToRemove = PublisherConfigFileEntries.Find(l => l.NodeId == nodeId && l.EndpointUri == endpointUri); - PublisherConfigFileEntries.Remove(entryToRemove); - File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublisherConfigFileEntries)); - } - finally - { - PublishDataSemaphore.Release(); - } } catch (Exception e) { Trace(e, $"UnpublishNode: Exception while trying to configure publishing node '{nodeId.ToString()}'"); - return ServiceResult.Create(e, StatusCodes.BadUnexpectedError, $"Unexpected error publishing node: {e.Message}"); + return ServiceResult.Create(e, StatusCodes.BadUnexpectedError, $"Unexpected error unpublishing node: {e.Message}"); } return ServiceResult.Good; } /// /// Method to get the list of published nodes. Executes synchronously. + /// The format of the returned node description is using NodeId format. The assumption + /// is that the caller is able to access the namespace array of the server + /// on the endpoint URL(s) themselve and do the correct mapping. /// - private ServiceResult OnGetListOfPublishedNodesCall(ISystemContext context, MethodState method, IList inputArguments, IList outputArguments) + private ServiceResult OnGetPublishedNodesCall(ISystemContext context, MethodState method, IList inputArguments, IList outputArguments) { - try + Uri endpointUri = null; + + if (string.IsNullOrEmpty(inputArguments[0] as string)) { - PublishDataSemaphore.WaitAsync(); - outputArguments[0] = JsonConvert.SerializeObject(PublisherConfigFileEntries); + Trace("GetPublishedNodes: Return all published nodes"); } - finally + else { - PublishDataSemaphore.Release(); + try + { + endpointUri = new Uri(inputArguments[0] as string); + } + catch (UriFormatException) + { + Trace($"GetPublishedNodes: The endpointUrl is invalid '{inputArguments[0] as string}'!"); + return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA endpoint URL as first argument!"); + } } - Trace("GetListOfPublishedNodes: Success!"); + + // get the list of published nodes in NodeId format + outputArguments[0] = JsonConvert.SerializeObject(GetPublisherConfigurationFileEntries(endpointUri, OpcMonitoredItemConfigurationType.NodeId, false)); + Trace("GetPublishedNodes: Success!"); return ServiceResult.Good; } @@ -633,7 +644,7 @@ namespace OpcPublisher // read current connection string and compare to the one passed in string currentConnectionString = SecureIoTHubToken.ReadAsync(PublisherOpcApplicationConfiguration.ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath).Result; - if (string.Equals(connectionString, currentConnectionString, StringComparison.OrdinalIgnoreCase)) + if (connectionString.Equals(currentConnectionString, StringComparison.OrdinalIgnoreCase)) { Trace("ConnectionStringWrite: Connection string up to date!"); return ServiceResult.Create(StatusCodes.Bad, "Connection string already up-to-date!"); diff --git a/src/publishednodes.json b/src/publishednodes.json index 376925a..54c4e16 100644 --- a/src/publishednodes.json +++ b/src/publishednodes.json @@ -28,9 +28,9 @@ ] }, - // the format below is only supported for backward compatibility. you need to ensure that the + // the format below (NodeId format) is only supported for backward compatibility. you need to ensure that the // OPC UA server on the configured EndpointUrl has the namespaceindex you expect with your configuration. - // please use the ExpandedNodeId syntax instead. + // please use the ExpandedNodeId format as in the examples above instead. { "EndpointUrl": "opc.tcp://:/", "NodeId": {