Refactor published node configuration handling, refactor published node

persistency, changes and fixes in publishers OPC UA server methods
This commit is contained in:
Hans Gschossmann 2017-10-23 08:54:59 +02:00
Родитель 79e9b53c64
Коммит 6b8853e3e8
8 изменённых файлов: 950 добавлений и 471 удалений

Просмотреть файл

@ -20,7 +20,7 @@ This application uses the OPC Foundations's OPC UA reference stack and therefore
|master|[![Build status](https://ci.appveyor.com/api/projects/status/6t7ru6ow7t9uv74r/branch/master?svg=true)](https://ci.appveyor.com/project/marcschier/iot-gateway-opc-ua-r4ba5/branch/master) [![Build Status](https://travis-ci.org/Azure/iot-gateway-opc-ua.svg?branch=master)](https://travis-ci.org/Azure/iot-gateway-opc-ua)| |master|[![Build status](https://ci.appveyor.com/api/projects/status/6t7ru6ow7t9uv74r/branch/master?svg=true)](https://ci.appveyor.com/project/marcschier/iot-gateway-opc-ua-r4ba5/branch/master) [![Build Status](https://travis-ci.org/Azure/iot-gateway-opc-ua.svg?branch=master)](https://travis-ci.org/Azure/iot-gateway-opc-ua)|
# Building the Application # Building the Application
The application requires the .NET Core SDK 1.1. The application requires the .NET Core SDK 2.0.
## As native Windows application ## As native Windows application
Open the OpcPublisher.sln project with Visual Studio 2017 and build the solution by hitting F7. Open the OpcPublisher.sln project with Visual Studio 2017 and build the solution by hitting F7.
@ -34,7 +34,7 @@ From the root of the repository, in a console, type:
The `-f` option for `docker build` is optional and the default is to use Dockerfile. Docker also support building directly from a git repository, which means you also can build a Linux container by: The `-f` option for `docker build` is optional and the default is to use Dockerfile. Docker also support building directly from a git repository, which means you also can build a Linux container by:
docker build -t <your-container-name> .https://github.com/Azure/iot-edge-opc-publisher docker build -t <your-container-name> https://github.com/Azure/iot-edge-opc-publisher
# Configuring the OPC UA nodes to publish # Configuring the OPC UA nodes to publish
The OPC UA nodes whose values should be published to Azure IoT Hub can be configured by creating a JSON formatted configuration file (defaultname: "publishednodes.json"). This file is updated and persisted by the application, when using it's OPC UA server methods "PublishNode" or "UnpublishNode". The OPC UA nodes whose values should be published to Azure IoT Hub can be configured by creating a JSON formatted configuration file (defaultname: "publishednodes.json"). This file is updated and persisted by the application, when using it's OPC UA server methods "PublishNode" or "UnpublishNode".
@ -71,9 +71,9 @@ The syntax of the configuration file is as follows:
] ]
}, },
// the format below is only supported for backward compatibility. you need to ensure that the // the format below (NodeId format) is only supported for backward compatibility. you need to ensure that the
// OPC UA server on the configured EndpointUrl has the namespaceindex you expect with your configuration. // OPC UA server on the configured EndpointUrl has the namespaceindex you expect with your configuration.
// please use the ExpandedNodeId syntax instead. // please use the ExpandedNodeId format as in the examples above instead.
{ {
"EndpointUrl": "opc.tcp://<your_opcua_server>:<your_opcua_server_port>/<your_opcua_server_path>", "EndpointUrl": "opc.tcp://<your_opcua_server>:<your_opcua_server_port>/<your_opcua_server_path>",
"NodeId": { "NodeId": {
@ -268,10 +268,10 @@ The following options are supported:
-h, --help show this message and exit -h, --help show this message and exit
There are a couple of environment variables which can be used to control the application: There are a couple of environment variables which can be used to control the application:
_HUB_CS: sets the IoTHub owner connectionstring * _HUB_CS: sets the IoTHub owner connectionstring
_GW_LOGP: sets the filename of the log file to use * _GW_LOGP: sets the filename of the log file to use
_TPC_SP: sets the path to store certificates of trusted stations * _TPC_SP: sets the path to store certificates of trusted stations
_GW_PNFP: sets the filename of the publishing configuration file * _GW_PNFP: sets the filename of the publishing configuration file
Command line arguments overrule environment variable settings. Command line arguments overrule environment variable settings.
@ -303,7 +303,7 @@ The Publisher OPC UA server listens by default on port 62222. To expose this inb
docker run -p 62222:62222 microsoft/iot-edge-opc-publisher <applicationname> [<iothubconnectionstring>] [options] docker run -p 62222:62222 microsoft/iot-edge-opc-publisher <applicationname> [<iothubconnectionstring>] [options]
### Enable intercontainer nameresolution ### Enable intercontainer nameresolution
To enable name resolution from within the container to other containers, you need to create a user define docker bridge network and connect the container to this network using the `--network`option. To enable name resolution from within the container to other containers, you need to create a user define docker bridge network and connect the container to this network using the `--network` option.
Additionally you need to assign the container a name using the `--name` option as in this example: Additionally you need to assign the container a name using the `--name` option as in this example:
docker network create -d bridge iot_edge docker network create -d bridge iot_edge
@ -311,10 +311,18 @@ Additionally you need to assign the container a name using the `--name` option a
The container can now be reached by other containers via the name `publisher`over the network. The container can now be reached by other containers via the name `publisher`over the network.
### Access other hosts from within the container
Hosts which are implemented in other containers, could be reached with the parameters described in the "Enable intercontainer nameresolution" paragraph.
If the environment has DNS working, then accessing the host on which the docker runtime executes as was as any other DNS reachable host is no issue.
Problems occur in a network with Windows NetBIOS name resolution. To enable access to hosts (including the one on which the docker runtime executes) you need to start your container using the `--add-host` option.
docker run --add-host mydevbox:192.168.178.23 microsoft/iot-edge-opc-publisher <applicationname> [<iothubconnectionstring>] [options]
### Assigning a hostname ### Assigning a hostname
Publisher uses the hostname of the machine is running on for certificate and endpoint generation. docker chooses a random hostname if there is none set by the `-h` option. Here an example to set the internal hostname of the container to `publisher`: Publisher uses the hostname of the machine is running on for certificate and endpoint generation. docker chooses a random hostname if there is none set by the `-h` option. Here an example to set the internal hostname of the container to `publisher`:
docker run -h publisher microsoft/iot-edge-opc-publisher <applicationname> [<iothubconnectionstring>] [options] docker run -h publisher microsoft/iot-edge-opc-publisher <applicationname> [<iothubconnectionstring>] [options]
### Using bind mounts (shared filesystem) ### Using bind mounts (shared filesystem)
In certain use cases it may make sense to read configuration information from or write log files to locations on the host and not keep them in the container file system only. To achieve this you need to use the `-v` option of `docker run` in the bind mount mode. In certain use cases it may make sense to read configuration information from or write log files to locations on the host and not keep them in the container file system only. To achieve this you need to use the `-v` option of `docker run` in the bind mount mode.
@ -327,6 +335,8 @@ Storing X509 certificates does not work with bind mounts, since the permissions
Open the OpcPublisher.sln project with Visual Studio 2017 and start debugging the app by hitting F5. Open the OpcPublisher.sln project with Visual Studio 2017 and start debugging the app by hitting F5.
If you need to access the OPC UA server in the publisher, you should ensure that the firewall setting allow access to the port the server is listening on (default: 62222).
## In a docker container ## In a docker container
Visual Studio 2017 supports debugging of application in docker container. This is done by using docker-compose. Since this does not allow to pass command line parameters it is not convenient. Visual Studio 2017 supports debugging of application in docker container. This is done by using docker-compose. Since this does not allow to pass command line parameters it is not convenient.

Просмотреть файл

@ -9,9 +9,11 @@ namespace OpcPublisher
using Opc.Ua; using Opc.Ua;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using static OpcPublisher.OpcMonitoredItem;
using static OpcPublisher.Workarounds.TraceWorkaround; using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcStackConfiguration; using static OpcStackConfiguration;
using static Program; using static Program;
using static PublisherNodeConfiguration;
/// <summary> /// <summary>
/// Class to manage the OPC monitored items, which are the nodes we need to publish. /// Class to manage the OPC monitored items, which are the nodes we need to publish.
@ -21,11 +23,17 @@ namespace OpcPublisher
public enum OpcMonitoredItemState public enum OpcMonitoredItemState
{ {
Unmonitored = 0, Unmonitored = 0,
Monitoreded, UnmonitoredNamespaceUpdateRequested,
StopMonitoring, Monitored,
RemovalRequested,
}
public enum OpcMonitoredItemConfigurationType
{
NodeId = 0,
ExpandedNodeId
} }
public ExpandedNodeId StartNodeId;
public string DisplayName; public string DisplayName;
public OpcMonitoredItemState State; public OpcMonitoredItemState State;
public uint AttributeId; public uint AttributeId;
@ -37,27 +45,39 @@ namespace OpcPublisher
public MonitoredItemNotificationEventHandler Notification; public MonitoredItemNotificationEventHandler Notification;
public Uri EndpointUri; public Uri EndpointUri;
public MonitoredItem OpcUaClientMonitoredItem; public MonitoredItem OpcUaClientMonitoredItem;
public string ConfigNodeId; public NodeId ConfigNodeId;
public ExpandedNodeId ConfigExpandedNodeId;
public OpcMonitoredItemConfigurationType ConfigType;
/// <summary> /// <summary>
/// Ctor using NodeId (ns syntax for namespace). /// Ctor using NodeId (ns syntax for namespace).
/// </summary> /// </summary>
public OpcMonitoredItem(NodeId nodeId, Uri sessionEndpointUri) public OpcMonitoredItem(NodeId nodeId, Uri sessionEndpointUri, bool requestNamespaceUpdate = false)
{ {
ConfigNodeId = nodeId.ToString(); ConfigNodeId = nodeId;
StartNodeId = new ExpandedNodeId(nodeId); ConfigExpandedNodeId = null;
ConfigType = OpcMonitoredItemConfigurationType.NodeId;
Initialize(sessionEndpointUri); Initialize(sessionEndpointUri);
if (requestNamespaceUpdate)
{
State = OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested;
}
} }
/// <summary> /// <summary>
/// Ctor using ExpandedNodeId (nsu syntax for namespace). /// Ctor using ExpandedNodeId (nsu syntax for namespace).
/// </summary> /// </summary>
public OpcMonitoredItem(ExpandedNodeId expandedNodeId, Uri sessionEndpointUri) public OpcMonitoredItem(ExpandedNodeId expandedNodeId, Uri sessionEndpointUri, bool requestNamespaceUpdate = false)
{ {
ConfigNodeId = expandedNodeId.ToString(); ConfigNodeId = null;
StartNodeId = expandedNodeId; ConfigExpandedNodeId = expandedNodeId;
ConfigType = OpcMonitoredItemConfigurationType.ExpandedNodeId;
Initialize(sessionEndpointUri); Initialize(sessionEndpointUri);
} if (requestNamespaceUpdate)
{
State = OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested;
}
}
/// <summary> /// <summary>
/// Init class variables. /// Init class variables.
@ -75,6 +95,62 @@ namespace OpcPublisher
EndpointUri = sessionEndpointUri; EndpointUri = sessionEndpointUri;
} }
/// <summary>
/// Checks if the monitored item does monitor the node described by the given objects.
/// </summary>
public bool IsMonitoringThisNode(NodeId nodeId, ExpandedNodeId expandedNodeId, NamespaceTable namespaceTable)
{
if (State == OpcMonitoredItemState.RemovalRequested)
{
return false;
}
if (ConfigType == OpcMonitoredItemConfigurationType.NodeId)
{
if (nodeId != null)
{
if (ConfigNodeId == nodeId)
{
return true;
}
}
if (expandedNodeId != null)
{
string namespaceUri = namespaceTable.ToArray().ElementAtOrDefault(ConfigNodeId.NamespaceIndex);
if (expandedNodeId.NamespaceUri != null && expandedNodeId.NamespaceUri.Equals(namespaceUri, StringComparison.OrdinalIgnoreCase))
{
if (expandedNodeId.Identifier.ToString().Equals(ConfigNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
{
return true;
}
}
}
}
if (ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId)
{
if (nodeId != null)
{
int namespaceIndex = namespaceTable.GetIndex(ConfigExpandedNodeId?.NamespaceUri);
if (nodeId.NamespaceIndex == namespaceIndex)
{
if (nodeId.Identifier.ToString().Equals(ConfigExpandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
{
return true;
}
}
}
if (expandedNodeId != null)
{
if (ConfigExpandedNodeId.NamespaceUri != null &&
ConfigExpandedNodeId.NamespaceUri.Equals(expandedNodeId.NamespaceUri, StringComparison.OrdinalIgnoreCase) &&
ConfigExpandedNodeId.Identifier.ToString().Equals(expandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
{
return true;
}
}
}
return false;
}
/// <summary> /// <summary>
/// The notification that the data for a monitored item has changed on an OPC UA server. /// The notification that the data for a monitored item has changed on an OPC UA server.
/// </summary> /// </summary>
@ -106,7 +182,7 @@ namespace OpcPublisher
encoder.WriteString("DisplayName", monitoredItem.DisplayName); encoder.WriteString("DisplayName", monitoredItem.DisplayName);
// use the node Id as configured, to also have the namespace URI in case of a ExpandedNodeId. // use the node Id as configured, to also have the namespace URI in case of a ExpandedNodeId.
encoder.WriteString("NodeId", ConfigNodeId); encoder.WriteString("NodeId", ConfigType == OpcMonitoredItemConfigurationType.NodeId ? ConfigNodeId.ToString() : ConfigExpandedNodeId.ToString());
// suppress output of server timestamp in json by setting it to minvalue // suppress output of server timestamp in json by setting it to minvalue
value.ServerTimestamp = DateTime.MinValue; value.ServerTimestamp = DateTime.MinValue;
@ -212,60 +288,23 @@ namespace OpcPublisher
/// </summary> /// </summary>
public async Task ConnectAndMonitorAsync() public async Task ConnectAndMonitorAsync()
{ {
bool updateConfigFileRequired = false;
try try
{ {
await ConnectSessions(); await ConnectSession();
await MonitorNodes(); updateConfigFileRequired = await MonitorNodes();
// stop monitoring of nodes if requested and remove them from the monitored items list. updateConfigFileRequired |= await StopMonitoringNodes();
try
await RemoveUnusedSubscriptions();
await RemoveUnusedSessions();
// update the config file if required
if (updateConfigFileRequired)
{ {
await _opcSessionSemaphore.WaitAsync(); await UpdateNodeConfigurationFile();
foreach (var opcSubscription in OpcSubscriptions)
{
var itemsToRemove = opcSubscription.OpcMonitoredItems.Where(i => i.State == OpcMonitoredItem.OpcMonitoredItemState.StopMonitoring);
if (itemsToRemove.Any())
{
Trace($"Remove nodes in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
opcSubscription.OpcUaClientSubscription.RemoveItems(itemsToRemove.Select(i => i.OpcUaClientMonitoredItem));
}
}
}
finally
{
_opcSessionSemaphore.Release();
}
// remove unused subscriptions.
try
{
await _opcSessionSemaphore.WaitAsync();
foreach (var opcSubscription in OpcSubscriptions)
{
if (opcSubscription.OpcMonitoredItems.Count == 0)
{
Trace($"Subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri}' is not used and will be deleted.");
OpcUaClientSession.RemoveSubscription(opcSubscription.OpcUaClientSubscription);
opcSubscription.OpcUaClientSubscription = null;
}
}
}
finally
{
_opcSessionSemaphore.Release();
}
// shutdown unused sessions.
var unusedSessions = OpcSessions.Where(s => s.OpcSubscriptions.Count == 0);
foreach (var unusedSession in unusedSessions)
{
await OpcSessionsListSemaphore.WaitAsync();
OpcSessions.Remove(unusedSession);
OpcSessionsListSemaphore.Release();
await unusedSession.ShutdownAsync();
} }
} }
catch (Exception e) catch (Exception e)
@ -275,89 +314,91 @@ namespace OpcPublisher
} }
/// <summary> /// <summary>
/// Connects all disconnected sessions. /// Connects the session if it is disconnected.
/// </summary> /// </summary>
public async Task ConnectSessions() public async Task ConnectSession()
{ {
try try
{ {
await _opcSessionSemaphore.WaitAsync(); await _opcSessionSemaphore.WaitAsync();
// if the session is disconnected, create one. // if the session is not disconnected, return
if (State == SessionState.Disconnected) if (State != SessionState.Disconnected)
{ {
Trace($"Connect and monitor session and nodes on endpoint '{EndpointUri.AbsoluteUri}'."); return;
State = SessionState.Connecting; }
try
Trace($"Connect and monitor session and nodes on endpoint '{EndpointUri.AbsoluteUri}'.");
State = SessionState.Connecting;
try
{
// release the session to not block for high network timeouts.
_opcSessionSemaphore.Release();
// start connecting
EndpointDescription selectedEndpoint = CoreClientUtils.SelectEndpoint(EndpointUri.AbsoluteUri, true);
ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(PublisherOpcApplicationConfiguration));
uint timeout = SessionTimeout * ((UnsuccessfulConnectionCount >= OpcSessionCreationBackoffMax) ? OpcSessionCreationBackoffMax : UnsuccessfulConnectionCount + 1);
Trace($"Create session for endpoint URI '{EndpointUri.AbsoluteUri}' with timeout of {timeout} ms.");
OpcUaClientSession = await Session.Create(
PublisherOpcApplicationConfiguration,
configuredEndpoint,
true,
false,
PublisherOpcApplicationConfiguration.ApplicationName,
timeout,
new UserIdentity(new AnonymousIdentityToken()),
null);
if (OpcUaClientSession != null)
{ {
// release the session to not block for high network timeouts. Trace($"Session successfully created with Id {OpcUaClientSession.SessionId}.");
_opcSessionSemaphore.Release(); if (!selectedEndpoint.EndpointUrl.Equals(configuredEndpoint.EndpointUrl.AbsoluteUri, StringComparison.OrdinalIgnoreCase))
// start connecting
EndpointDescription selectedEndpoint = CoreClientUtils.SelectEndpoint(EndpointUri.AbsoluteUri, true);
ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(PublisherOpcApplicationConfiguration));
uint timeout = SessionTimeout * ((UnsuccessfulConnectionCount >= OpcSessionCreationBackoffMax) ? OpcSessionCreationBackoffMax : UnsuccessfulConnectionCount + 1);
Trace($"Create session for endpoint URI '{EndpointUri.AbsoluteUri}' with timeout of {timeout} ms.");
OpcUaClientSession = await Session.Create(
PublisherOpcApplicationConfiguration,
configuredEndpoint,
true,
false,
PublisherOpcApplicationConfiguration.ApplicationName,
timeout,
new UserIdentity(new AnonymousIdentityToken()),
null);
if (OpcUaClientSession != null)
{ {
Trace($"Session successfully created with Id {OpcUaClientSession.SessionId}."); Trace($"the Server has updated the EndpointUrl to '{selectedEndpoint.EndpointUrl}'");
if (!selectedEndpoint.EndpointUrl.Equals(configuredEndpoint.EndpointUrl.AbsoluteUri))
{
Trace($"the Server has updated the EndpointUrl to '{selectedEndpoint.EndpointUrl}'");
}
// init object state and install keep alive
UnsuccessfulConnectionCount = 0;
OpcUaClientSession.KeepAliveInterval = OpcKeepAliveIntervalInSec * 1000;
StandardKeepAliveEventHandlerAsync = async (session, keepAliveEventArgs) => await StandardClient_KeepAlive(session, keepAliveEventArgs);
OpcUaClientSession.KeepAlive += StandardKeepAliveEventHandlerAsync;
// fetch the namespace array and cache it. it will not change as long the session exists.
DataValue namespaceArrayNodeValue = OpcUaClientSession.ReadValue(VariableIds.Server_NamespaceArray);
_namespaceTable.Update(namespaceArrayNodeValue.GetValue<string[]>(null));
// show the available namespaces
Trace($"The session to endpoint '{selectedEndpoint.EndpointUrl}' has {_namespaceTable.Count} entries in its namespace array:");
int i = 0;
foreach (var ns in _namespaceTable.ToArray())
{
Trace($"Namespace index {i++}: {ns}");
}
// fetch the minimum supported item sampling interval from the server.
DataValue minSupportedSamplingInterval = OpcUaClientSession.ReadValue(VariableIds.Server_ServerCapabilities_MinSupportedSampleRate);
_minSupportedSamplingInterval = minSupportedSamplingInterval.GetValue(0);
Trace($"The server on endpoint '{selectedEndpoint.EndpointUrl}' supports a minimal sampling interval of {_minSupportedSamplingInterval} ms.");
} }
// init object state and install keep alive
UnsuccessfulConnectionCount = 0;
OpcUaClientSession.KeepAliveInterval = OpcKeepAliveIntervalInSec * 1000;
StandardKeepAliveEventHandlerAsync = async (session, keepAliveEventArgs) => await StandardClient_KeepAlive(session, keepAliveEventArgs);
OpcUaClientSession.KeepAlive += StandardKeepAliveEventHandlerAsync;
// fetch the namespace array and cache it. it will not change as long the session exists.
DataValue namespaceArrayNodeValue = OpcUaClientSession.ReadValue(VariableIds.Server_NamespaceArray);
_namespaceTable.Update(namespaceArrayNodeValue.GetValue<string[]>(null));
// show the available namespaces
Trace($"The session to endpoint '{selectedEndpoint.EndpointUrl}' has {_namespaceTable.Count} entries in its namespace array:");
int i = 0;
foreach (var ns in _namespaceTable.ToArray())
{
Trace($"Namespace index {i++}: {ns}");
}
// fetch the minimum supported item sampling interval from the server.
DataValue minSupportedSamplingInterval = OpcUaClientSession.ReadValue(VariableIds.Server_ServerCapabilities_MinSupportedSampleRate);
_minSupportedSamplingInterval = minSupportedSamplingInterval.GetValue(0);
Trace($"The server on endpoint '{selectedEndpoint.EndpointUrl}' supports a minimal sampling interval of {_minSupportedSamplingInterval} ms.");
} }
catch (Exception e) }
catch (Exception e)
{
Trace(e, $"Session creation to endpoint '{EndpointUri.AbsoluteUri}' failed {++UnsuccessfulConnectionCount} time(s). Please verify if server is up and Publisher configuration is correct.");
State = SessionState.Disconnected;
OpcUaClientSession = null;
return;
}
finally
{
await _opcSessionSemaphore.WaitAsync();
if (OpcUaClientSession != null)
{
State = SessionState.Connected;
}
else
{ {
Trace(e, $"Session creation to endpoint '{EndpointUri.AbsoluteUri}' failed {++UnsuccessfulConnectionCount} time(s). Please verify if server is up and Publisher configuration is correct.");
State = SessionState.Disconnected; State = SessionState.Disconnected;
OpcUaClientSession = null;
return;
}
finally
{
await _opcSessionSemaphore.WaitAsync();
if (OpcUaClientSession != null)
{
State = SessionState.Connected;
}
else
{
State = SessionState.Disconnected;
}
} }
} }
} }
@ -374,12 +415,19 @@ namespace OpcPublisher
/// <summary> /// <summary>
/// Monitoring for a node starts if it is required. /// Monitoring for a node starts if it is required.
/// </summary> /// </summary>
public async Task MonitorNodes() public async Task<bool> MonitorNodes()
{ {
bool requestConfigFileUpdate = false;
try try
{ {
await _opcSessionSemaphore.WaitAsync(); await _opcSessionSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
{
return requestConfigFileUpdate;
}
// ensure all nodes in all subscriptions of this session are monitored. // ensure all nodes in all subscriptions of this session are monitored.
foreach (var opcSubscription in OpcSubscriptions) foreach (var opcSubscription in OpcSubscriptions)
{ {
@ -393,8 +441,9 @@ namespace OpcPublisher
} }
// process all unmonitored items. // process all unmonitored items.
var unmonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => i.State == OpcMonitoredItem.OpcMonitoredItemState.Unmonitored); var unmonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => (i.State == OpcMonitoredItemState.Unmonitored || i.State == OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested));
Trace($"Start monitoring nodes on endpoint '{EndpointUri.AbsoluteUri}'.");
foreach (var item in unmonitoredItems) foreach (var item in unmonitoredItems)
{ {
// if the session is disconnected, we stop trying and wait for the next cycle // if the session is disconnected, we stop trying and wait for the next cycle
@ -403,18 +452,58 @@ namespace OpcPublisher
break; break;
} }
Trace($"Start monitoring nodes on endpoint '{EndpointUri.AbsoluteUri}'."); NodeId currentNodeId = null;
NodeId currentNodeId;
try try
{ {
// lookup namespace index if ExpandedNodeId format has been used and build NodeId identifier. // update the namespace of the node if requested. there are two cases where this is requested:
if (!string.IsNullOrEmpty(item.StartNodeId.NamespaceUri)) // 1) publishing requests via the OPC server method are raised using a NodeId format. for those
// the NodeId format is converted into an ExpandedNodeId format
// 2) ExpandedNodeId configuration file entries do not have at parsing time a session to get
// the namespace index. this is set now.
if (item.State == OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested)
{ {
currentNodeId = NodeId.Create(item.StartNodeId.Identifier, item.StartNodeId.NamespaceUri, _namespaceTable); if (item.ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId)
{
int namespaceIndex = GetNamespaceIndex(item.ConfigExpandedNodeId?.NamespaceUri);
if (namespaceIndex < 0)
{
Trace($"The namespace URI of node '{item.ConfigExpandedNodeId.ToString()}' could be not mapped to a namespace index.");
}
else
{
item.ConfigExpandedNodeId = new ExpandedNodeId(item.ConfigExpandedNodeId.Identifier, (ushort)namespaceIndex, item.ConfigExpandedNodeId?.NamespaceUri, 0);
}
}
if (item.ConfigType == OpcMonitoredItemConfigurationType.NodeId)
{
string namespaceUri = GetNamespaceUri(item.ConfigNodeId.NamespaceIndex);
if (string.IsNullOrEmpty(namespaceUri))
{
Trace($"The namespace index of node '{item.ConfigNodeId.ToString()}' is invalid and the node format could not be updated.");
}
else
{
item.ConfigExpandedNodeId = new ExpandedNodeId(item.ConfigNodeId.Identifier, item.ConfigNodeId.NamespaceIndex, namespaceUri, 0);
item.ConfigType = OpcMonitoredItemConfigurationType.ExpandedNodeId;
}
}
item.State = OpcMonitoredItemState.Unmonitored;
}
// lookup namespace index if ExpandedNodeId format has been used and build NodeId identifier.
if (item.ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId)
{
int namespaceIndex = GetNamespaceIndex(item.ConfigExpandedNodeId?.NamespaceUri);
if (namespaceIndex < 0)
{
Trace($"Syntax or namespace URI of ExpandedNodeId '{item.ConfigExpandedNodeId.ToString()}' is invalid and will be ignored.");
continue;
}
currentNodeId = new NodeId(item.ConfigExpandedNodeId.Identifier, (ushort)namespaceIndex);
} }
else else
{ {
currentNodeId = new NodeId((NodeId)item.StartNodeId); currentNodeId = item.ConfigNodeId;
} }
// if configured, get the DisplayName for the node, otherwise use the nodeId // if configured, get the DisplayName for the node, otherwise use the nodeId
@ -445,14 +534,15 @@ namespace OpcPublisher
opcSubscription.OpcUaClientSubscription.SetPublishingMode(true); opcSubscription.OpcUaClientSubscription.SetPublishingMode(true);
opcSubscription.OpcUaClientSubscription.ApplyChanges(); opcSubscription.OpcUaClientSubscription.ApplyChanges();
item.OpcUaClientMonitoredItem = monitoredItem; item.OpcUaClientMonitoredItem = monitoredItem;
item.State = OpcMonitoredItem.OpcMonitoredItemState.Monitoreded; item.State = OpcMonitoredItemState.Monitored;
item.EndpointUri = EndpointUri; item.EndpointUri = EndpointUri;
Trace($"Created monitored item for node '{currentNodeId}' in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'"); Trace($"Created monitored item for node '{currentNodeId.ToString()}' in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
if (item.RequestedSamplingInterval != monitoredItem.SamplingInterval) if (item.RequestedSamplingInterval != monitoredItem.SamplingInterval)
{ {
Trace($"Sampling interval: requested: {item.RequestedSamplingInterval}; revised: {monitoredItem.SamplingInterval}"); Trace($"Sampling interval: requested: {item.RequestedSamplingInterval}; revised: {monitoredItem.SamplingInterval}");
item.SamplingInterval = monitoredItem.SamplingInterval; item.SamplingInterval = monitoredItem.SamplingInterval;
} }
requestConfigFileUpdate = true;
} }
catch (Exception e) when (e.GetType() == typeof(ServiceResultException)) catch (Exception e) when (e.GetType() == typeof(ServiceResultException))
{ {
@ -469,20 +559,20 @@ namespace OpcPublisher
case StatusCodes.BadNodeIdInvalid: case StatusCodes.BadNodeIdInvalid:
case StatusCodes.BadNodeIdUnknown: case StatusCodes.BadNodeIdUnknown:
{ {
Trace($"Failed to monitor node '{item.StartNodeId.Identifier}' on endpoint '{EndpointUri}'."); Trace($"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'.");
Trace($"OPC UA ServiceResultException is '{sre.Result}'. Please check your publisher configuration for this node."); Trace($"OPC UA ServiceResultException is '{sre.Result}'. Please check your publisher configuration for this node.");
break; break;
} }
default: default:
{ {
Trace($"Unhandled OPC UA ServiceResultException '{sre.Result}' when monitoring node '{item.StartNodeId.Identifier}' on endpoint '{EndpointUri}'. Continue."); Trace($"Unhandled OPC UA ServiceResultException '{sre.Result}' when monitoring node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'. Continue.");
break; break;
} }
} }
} }
catch (Exception e) catch (Exception e)
{ {
Trace(e, $"Failed to monitor node '{item.StartNodeId.Identifier}' on endpoint '{EndpointUri}'"); Trace(e, $"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'");
} }
} }
} }
@ -495,6 +585,117 @@ namespace OpcPublisher
{ {
_opcSessionSemaphore.Release(); _opcSessionSemaphore.Release();
} }
return requestConfigFileUpdate;
}
/// <summary>
/// Checks if there are monitored nodes tagged to stop monitoring and stop monitoring.
/// </summary>
public async Task<bool> StopMonitoringNodes()
{
bool requestConfigFileUpdate = false;
try
{
await _opcSessionSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
{
return requestConfigFileUpdate;
}
foreach (var opcSubscription in OpcSubscriptions)
{
// remove items tagged to stop in the stack
var itemsToRemove = opcSubscription.OpcMonitoredItems.Where(i => i.State == OpcMonitoredItemState.RemovalRequested);
if (itemsToRemove.Any())
{
Trace($"Remove nodes in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
try
{
opcSubscription.OpcUaClientSubscription.RemoveItems(itemsToRemove.Select(i => i.OpcUaClientMonitoredItem));
Trace($"There are now {opcSubscription.OpcUaClientSubscription.MonitoredItemCount} monitored items in this subscription.");
}
catch
{
// nodes may be tagged for stop before they are monitored, just continue
}
// remove them in our data structure
opcSubscription.OpcMonitoredItems.RemoveAll(i => i.State == OpcMonitoredItemState.RemovalRequested);
Trace($"There are now {opcSubscription.OpcMonitoredItems.Count} items managed by publisher for this subscription.");
requestConfigFileUpdate = true;
}
}
}
finally
{
_opcSessionSemaphore.Release();
}
return requestConfigFileUpdate;
}
/// <summary>
/// Checks if there are subscriptions without any monitored items and remove them.
/// </summary>
public async Task RemoveUnusedSubscriptions()
{
try
{
await _opcSessionSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
{
return;
}
// remove the subscriptions in the stack
var subscriptionsToRemove = OpcSubscriptions.Where(i => i.OpcMonitoredItems.Count == 0);
if (subscriptionsToRemove.Any())
{
Trace($"Remove unused subscriptions on endpoint '{EndpointUri}'.");
OpcUaClientSession.RemoveSubscriptions(subscriptionsToRemove.Select(s => s.OpcUaClientSubscription));
Trace($"There are now {OpcUaClientSession.SubscriptionCount} subscriptions in this sessopm.");
}
// remove them in our data structures
OpcSubscriptions.RemoveAll(s => s.OpcMonitoredItems.Count == 0);
}
finally
{
_opcSessionSemaphore.Release();
}
}
/// <summary>
/// Checks if there are session without any subscriptions and remove them.
/// </summary>
public async Task RemoveUnusedSessions()
{
try
{
await OpcSessionsListSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
{
return;
}
// remove sssions in the stack
var sessionsToRemove = OpcSessions.Where(s => s.OpcSubscriptions.Count == 0);
foreach (var sessionToRemove in sessionsToRemove)
{
Trace($"Remove unused session on endpoint '{EndpointUri}'.");
await sessionToRemove.ShutdownAsync();
}
// remove then in our data structures
OpcSessions.RemoveAll(s => s.OpcSubscriptions.Count == 0);
}
finally
{
OpcSessionsListSemaphore.Release();
}
} }
/// <summary> /// <summary>
@ -510,6 +711,41 @@ namespace OpcPublisher
_opcSessionSemaphore.Release(); _opcSessionSemaphore.Release();
} }
/// <summary>
/// Returns the namespace URI for a namespace index.
/// </summary>
public string GetNamespaceUri(int namespaceIndex)
{
try
{
_opcSessionSemaphore.WaitAsync();
return _namespaceTable.ToArray().ElementAtOrDefault(namespaceIndex);
}
finally
{
_opcSessionSemaphore.Release();
}
}
/// <summary>
/// Returns the namespace index for a namespace URI.
/// </summary>
public int GetNamespaceIndex(string namespaceUri)
{
try
{
_opcSessionSemaphore.WaitAsync();
return _namespaceTable.GetIndex(namespaceUri);
}
finally
{
_opcSessionSemaphore.Release();
}
}
/// <summary> /// <summary>
/// Internal disconnect method. Caller must have taken the _opcSessionSemaphore. /// Internal disconnect method. Caller must have taken the _opcSessionSemaphore.
/// </summary> /// </summary>
@ -540,7 +776,11 @@ namespace OpcPublisher
// mark all monitored items as unmonitored // mark all monitored items as unmonitored
foreach (var opcMonitoredItem in opcSubscription.OpcMonitoredItems) foreach (var opcMonitoredItem in opcSubscription.OpcMonitoredItems)
{ {
opcMonitoredItem.State = OpcMonitoredItem.OpcMonitoredItemState.Unmonitored; // tag all monitored items as unmonitored
if (opcMonitoredItem.State == OpcMonitoredItemState.Monitored)
{
opcMonitoredItem.State = OpcMonitoredItemState.Unmonitored;
}
} }
} }
try try
@ -562,10 +802,10 @@ namespace OpcPublisher
} }
/// <summary> /// <summary>
/// Adds a node to be monitored. If there is no session to the endpoint, one is created. /// Adds a node to be monitored. If there is no subscription with the requested publishing interval,
/// If there is no spubscription with the requested publishing interval, one is created. /// one is created.
/// </summary> /// </summary>
public async Task AddNodeForMonitoring(int publishingInterval, int samplingInterval, NodeId nodeId) public async Task AddNodeForMonitoring(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval)
{ {
try try
{ {
@ -577,30 +817,34 @@ namespace OpcPublisher
} }
// check if there is already a subscription with the same publishing interval, which could be used to monitor the node // check if there is already a subscription with the same publishing interval, which could be used to monitor the node
OpcSubscription opcSubscription = OpcSubscriptions.FirstOrDefault(s => s.RequestedPublishingInterval == publishingInterval); OpcSubscription opcSubscription = OpcSubscriptions.FirstOrDefault(s => s.RequestedPublishingInterval == opcPublishingInterval);
// if there was none found, create one // if there was none found, create one
if (opcSubscription == null) if (opcSubscription == null)
{ {
opcSubscription = new OpcSubscription(publishingInterval); opcSubscription = new OpcSubscription(opcPublishingInterval);
OpcSubscriptions.Add(opcSubscription); OpcSubscriptions.Add(opcSubscription);
Trace($"AddNodeForMonitoring: No matching subscription with publishing interval of {publishingInterval} found'. Requested to create a new one."); Trace($"AddNodeForMonitoring: No matching subscription with publishing interval of {opcPublishingInterval} found'. Requested to create a new one.");
} }
// if it is already there, we just ignore it, otherwise we add a new item to monitor. // if it is already published, we do nothing, else we create a new monitored item
OpcMonitoredItem opcMonitoredItem = opcSubscription.OpcMonitoredItems.FirstOrDefault(m => m.StartNodeId == nodeId); if (!IsNodePublishedInSession(nodeId, expandedNodeId))
// if there was none found, create one
if (opcMonitoredItem == null)
{ {
OpcMonitoredItem opcMonitoredItem = null;
// add a new item to monitor // add a new item to monitor
opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUri) if (expandedNodeId == null)
{ {
RequestedSamplingInterval = samplingInterval opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUri, true);
}; }
else
{
opcMonitoredItem = new OpcMonitoredItem(expandedNodeId, EndpointUri);
}
opcMonitoredItem.RequestedSamplingInterval = opcSamplingInterval;
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem); opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
Trace($"AddNodeForMonitoring: Added item with nodeId '{nodeId.ToString()}' for monitoring."); Trace($"AddNodeForMonitoring: Added item with nodeId '{(expandedNodeId == null ? nodeId.ToString() : expandedNodeId.ToString())}' for monitoring.");
// update the publishing data
// Start publishing. // Start publishing.
Task.Run(async () => await ConnectAndMonitorAsync()); Task.Run(async () => await ConnectAndMonitorAsync());
} }
@ -616,9 +860,9 @@ namespace OpcPublisher
} }
/// <summary> /// <summary>
/// Tags a monitored node to stop monitoring. /// Tags a monitored node to stop monitoring and remove it.
/// </summary> /// </summary>
public async Task TagNodeForMonitoringStop(NodeId nodeId) public async Task RequestMonitorItemRemoval(NodeId nodeId, ExpandedNodeId expandedNodeId)
{ {
try try
{ {
@ -629,17 +873,18 @@ namespace OpcPublisher
return; return;
} }
// find all subscriptions the node is monitored on.
var opcSubscriptions = OpcSubscriptions.Where( s => s.OpcMonitoredItems.Any(m => m.StartNodeId == nodeId));
// tag all monitored items with nodeId to stop monitoring. // tag all monitored items with nodeId to stop monitoring.
foreach (var opcSubscription in opcSubscriptions) // if the node to tag is specified as NodeId, it will also tag nodes configured
// in ExpandedNodeId format.
foreach (var opcSubscription in OpcSubscriptions)
{ {
var opcMonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => i.StartNodeId == nodeId); var opcMonitoredItems = opcSubscription.OpcMonitoredItems.Where(m => { return m.IsMonitoringThisNode(nodeId, expandedNodeId, _namespaceTable); });
foreach (var opcMonitoredItem in opcMonitoredItems) foreach (var opcMonitoredItem in opcMonitoredItems)
{ {
// tag it for removal. // tag it for removal.
opcMonitoredItem.State = OpcMonitoredItem.OpcMonitoredItemState.StopMonitoring; opcMonitoredItem.State = OpcMonitoredItemState.RemovalRequested;
Trace("RequestMonitorItemRemoval: Node " +
$"'{(expandedNodeId == null ? nodeId.ToString() : expandedNodeId.ToString())}' tagged to stop monitoring.");
} }
} }
@ -648,7 +893,7 @@ namespace OpcPublisher
} }
catch (Exception e) catch (Exception e)
{ {
Trace(e, $"TagNodeForMonitoringStop: Exception while trying to tag node '{nodeId.ToString()}' to stop monitoring. (message: '{e.Message}'"); Trace(e, $"RequestMonitorItemRemoval: Exception while trying to tag node '{nodeId.ToString()}' to stop monitoring. (message: '{e.Message}'");
} }
finally finally
{ {
@ -657,9 +902,69 @@ namespace OpcPublisher
} }
/// <summary> /// <summary>
/// Shutsdown all connected sessions. /// Checks if the node specified by either the given NodeId or ExpandedNodeId on the given endpoint is published in the session.
/// </summary> /// </summary>
public async Task ShutdownAsync() private bool IsNodePublishedInSession(NodeId nodeId, ExpandedNodeId expandedNodeId)
{
try
{
_opcSessionSemaphore.WaitAsync();
foreach (var opcSubscription in OpcSubscriptions)
{
if (opcSubscription.OpcMonitoredItems.Any(m => { return m.IsMonitoringThisNode(nodeId, expandedNodeId, _namespaceTable); }))
{
return true;
}
}
}
catch (Exception e)
{
Trace(e, "Check if node is published failed.");
}
finally
{
_opcSessionSemaphore.Release();
}
return false;
}
/// <summary>
/// Checks if the node specified by either the given NodeId or ExpandedNodeId on the given endpoint is published.
/// </summary>
public static bool IsNodePublished(NodeId nodeId, ExpandedNodeId expandedNodeId, Uri endpointUri)
{
try
{
OpcSessionsListSemaphore.WaitAsync();
// itereate through all sessions, subscriptions and monitored items and create config file entries
foreach (var opcSession in OpcSessions)
{
if (opcSession.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase))
{
if (opcSession.IsNodePublishedInSession(nodeId, expandedNodeId))
{
return true;
}
}
}
}
catch (Exception e)
{
Trace(e, "Check if node is published failed.");
}
finally
{
OpcSessionsListSemaphore.Release();
}
return false;
}
/// <summary>
/// Shutdown the current session if it is connected.
/// </summary>
public async Task ShutdownAsync()
{ {
try try
{ {

Просмотреть файл

@ -5,7 +5,6 @@ using System.Security.Cryptography.X509Certificates;
namespace OpcPublisher namespace OpcPublisher
{ {
using System.IO;
using System.Threading.Tasks; using System.Threading.Tasks;
using static Opc.Ua.CertificateStoreType; using static Opc.Ua.CertificateStoreType;
using static OpcPublisher.Workarounds.TraceWorkaround; using static OpcPublisher.Workarounds.TraceWorkaround;

Просмотреть файл

@ -1,9 +1,7 @@
 
using Mono.Options; using Mono.Options;
using Newtonsoft.Json;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading; using System.Threading;
@ -19,25 +17,14 @@ namespace OpcPublisher
using static OpcPublisher.Workarounds.TraceWorkaround; using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcSession; using static OpcSession;
using static OpcStackConfiguration; using static OpcStackConfiguration;
using static PublisherNodeConfiguration;
using static System.Console; using static System.Console;
public class Program public class Program
{ {
public static List<OpcSession> OpcSessions = new List<OpcSession>();
public static SemaphoreSlim OpcSessionsListSemaphore = new SemaphoreSlim(1);
public static List<PublisherConfigFileEntry> PublisherConfigFileEntries = new List<PublisherConfigFileEntry>();
public static List<NodeToPublishConfig> PublishConfig = new List<NodeToPublishConfig>();
public static SemaphoreSlim PublishDataSemaphore = new SemaphoreSlim(1);
public static IotHubMessaging IotHubCommunication; public static IotHubMessaging IotHubCommunication;
public static CancellationTokenSource ShutdownTokenSource; public static CancellationTokenSource ShutdownTokenSource;
public static string NodesToPublishAbsFilename
{
get => _nodesToPublishAbsFilename;
set => _nodesToPublishAbsFilename = value;
}
private static string _nodesToPublishAbsFilename;
public static uint PublisherShutdownWaitPeriod public static uint PublisherShutdownWaitPeriod
{ {
get => _publisherShutdownWaitPeriod; get => _publisherShutdownWaitPeriod;
@ -49,7 +36,6 @@ namespace OpcPublisher
private static DateTime _lastServerSessionEventTime = DateTime.UtcNow; private static DateTime _lastServerSessionEventTime = DateTime.UtcNow;
private static bool _opcTraceInitialized = false; private static bool _opcTraceInitialized = false;
private static int _publisherSessionConnectWaitSec = 10; private static int _publisherSessionConnectWaitSec = 10;
private static string _nodesToPublishAbsFilenameDefault = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
/// <summary> /// <summary>
/// Usage message. /// Usage message.
@ -104,7 +90,7 @@ namespace OpcPublisher
// command line options configuration // command line options configuration
Mono.Options.OptionSet options = new Mono.Options.OptionSet { Mono.Options.OptionSet options = new Mono.Options.OptionSet {
// Publishing configuration options // Publishing configuration options
{ "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{_nodesToPublishAbsFilenameDefault}'", (string p) => _nodesToPublishAbsFilename = p }, { "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{PublisherNodeConfigurationFilename}'", (string p) => PublisherNodeConfigurationFilename = p },
{ "sd|shopfloordomain=", $"the domain of the shopfloor. if specified this domain is appended (delimited by a ':' to the 'ApplicationURI' property when telemetry is sent to IoTHub.\n" + { "sd|shopfloordomain=", $"the domain of the shopfloor. if specified this domain is appended (delimited by a ':' to the 'ApplicationURI' property when telemetry is sent to IoTHub.\n" +
"The value must follow the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => { "The value must follow the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => {
Regex domainNameRegex = new Regex("^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$"); Regex domainNameRegex = new Regex("^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$");
@ -440,121 +426,23 @@ namespace OpcPublisher
return; return;
} }
// get information on the nodes to publish and validate the json by deserializing it. // Read node configuration file
try PublisherNodeConfiguration publisherNodeConfiguration = new PublisherNodeConfiguration();
if (!await publisherNodeConfiguration.ReadConfigAsync())
{ {
await PublishDataSemaphore.WaitAsync();
if (string.IsNullOrEmpty(_nodesToPublishAbsFilename))
{
// check if we have an env variable specifying the published nodes path, otherwise use the default
_nodesToPublishAbsFilename = _nodesToPublishAbsFilenameDefault;
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_PNFP")))
{
Trace("Publishing node configuration file path read from environment.");
_nodesToPublishAbsFilename = Environment.GetEnvironmentVariable("_GW_PNFP");
}
}
Trace($"Attempting to load nodes file from: {_nodesToPublishAbsFilename}");
PublisherConfigFileEntries = JsonConvert.DeserializeObject<List<PublisherConfigFileEntry>>(File.ReadAllText(_nodesToPublishAbsFilename));
Trace($"Loaded {PublisherConfigFileEntries.Count.ToString()} config file entry/entries.");
foreach (var publisherConfigFileEntry in PublisherConfigFileEntries)
{
if (publisherConfigFileEntry.NodeId == null)
{
// new node configuration syntax.
foreach (var opcNode in publisherConfigFileEntry.OpcNodes)
{
PublishConfig.Add(new NodeToPublishConfig(ExpandedNodeId.Parse(opcNode.ExpandedNodeId), publisherConfigFileEntry.EndpointUri, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval));
}
}
else
{
// legacy (using ns=) node configuration syntax using default sampling and publishing interval.
PublishConfig.Add(new NodeToPublishConfig(publisherConfigFileEntry.NodeId, publisherConfigFileEntry.EndpointUri, OpcSamplingInterval, OpcPublishingInterval));
}
}
}
catch (Exception e)
{
Trace(e, "Loading of the node configuration file failed. Does the file exist and has correct syntax?");
Trace("exiting...");
return; return;
} }
finally
{
PublishDataSemaphore.Release();
}
Trace($"There are {PublishConfig.Count.ToString()} nodes to publish.");
// initialize and start IoTHub messaging // initialize and start IoTHub messaging
IotHubCommunication = new IotHubMessaging(); IotHubCommunication = new IotHubMessaging();
if (! await IotHubCommunication.InitAsync()) if (!await IotHubCommunication.InitAsync())
{ {
return; return;
} }
// create a list to manage sessions, subscriptions and monitored items. if (!await publisherNodeConfiguration.CreateOpcPublishingDataAsync())
try
{ {
await PublishDataSemaphore.WaitAsync(); return;
await OpcSessionsListSemaphore.WaitAsync();
var uniqueEndpointUrls = PublishConfig.Select(n => n.EndpointUri).Distinct();
foreach (var endpointUrl in uniqueEndpointUrls)
{
// create new session info.
OpcSession opcSession = new OpcSession(endpointUrl, OpcSessionCreationTimeout);
// create a subscription for each distinct publishing inverval
var nodesDistinctPublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Select(c => c.OpcPublishingInterval).Distinct();
foreach (var nodeDistinctPublishingInterval in nodesDistinctPublishingInterval)
{
// create a subscription for the publishing interval and add it to the session.
OpcSubscription opcSubscription = new OpcSubscription(nodeDistinctPublishingInterval);
// add all nodes with this OPC publishing interval to this subscription.
var nodesWithSamePublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Where(n => n.OpcPublishingInterval == nodeDistinctPublishingInterval);
foreach (var nodeInfo in nodesWithSamePublishingInterval)
{
// differentiate if legacy (using ns=) or new syntax (using nsu=) is used
if (nodeInfo.NodeId == null)
{
// create a monitored item for the node
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.ExpandedNodeId, opcSession.EndpointUri)
{
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
SamplingInterval = nodeInfo.OpcSamplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
}
else
{
// give user a warning that the syntax is obsolete
Trace($"Please update the syntax of the configuration file and use ExpandedNodeId instead of NodeId property name for node with identifier '{nodeInfo.NodeId.ToString()}' on EndpointUrl '{nodeInfo.EndpointUri.AbsolutePath}'.");
// create a monitored item for the node with the configured or default sampling interval
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUri)
{
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
SamplingInterval = nodeInfo.OpcSamplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
}
}
// add subscription to session.
opcSession.OpcSubscriptions.Add(opcSubscription);
}
// add session.
OpcSessions.Add(opcSession);
}
}
finally
{
OpcSessionsListSemaphore.Release();
PublishDataSemaphore.Release();
} }
// kick off the task to maintain all sessions // kick off the task to maintain all sessions
@ -573,7 +461,7 @@ namespace OpcPublisher
WriteLine(""); WriteLine("");
ReadLine(); ReadLine();
ShutdownTokenSource.Cancel(); ShutdownTokenSource.Cancel();
WriteLine("Publisher is shuting down..."); WriteLine("Publisher is shutting down...");
// Wait for session connector completion // Wait for session connector completion
await sessionConnectorAsync; await sessionConnectorAsync;

Просмотреть файл

@ -1,72 +0,0 @@

using Newtonsoft.Json;
using Opc.Ua;
using System;
using System.Collections.Generic;
namespace OpcPublisher
{
/// <summary>
/// Class describing a list of nodes in the ExpandedNodeId format (using nsu as namespace syntax)
/// </summary>
public class OpcNodesOnEndpointUrl
{
public string ExpandedNodeId;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public int? OpcSamplingInterval;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public int? OpcPublishingInterval;
}
/// <summary>
/// Class describing the nodes which should be published. It supports three formats:
/// - NodeId syntax using the namespace index (ns) syntax
/// - ExpandedNodeId syntax, using the namespace URI (nsu) syntax
/// - List of ExpandedNodeId syntax, to allow putting nodes with similar publishing and/or sampling intervals in one object
/// </summary>
public partial class PublisherConfigFileEntry
{
public PublisherConfigFileEntry()
{
}
public PublisherConfigFileEntry(string nodeId, string endpointUrl)
{
NodeId = new NodeId(nodeId);
EndpointUri = new Uri(endpointUrl);
}
[JsonProperty("EndpointUrl")]
public Uri EndpointUri;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public NodeId NodeId;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public List<OpcNodesOnEndpointUrl> OpcNodes;
}
public class NodeToPublishConfig
{
public Uri EndpointUri;
public NodeId NodeId;
public ExpandedNodeId ExpandedNodeId;
public int OpcSamplingInterval;
public int OpcPublishingInterval;
public NodeToPublishConfig(NodeId nodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval)
{
NodeId = nodeId;
ExpandedNodeId = null;
EndpointUri = endpointUri;
OpcSamplingInterval = opcSamplingInterval;
OpcPublishingInterval = opcPublishingInterval;
}
public NodeToPublishConfig(ExpandedNodeId expandedNodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval)
{
NodeId = null;
ExpandedNodeId = expandedNodeId;
EndpointUri = endpointUri;
OpcSamplingInterval = opcSamplingInterval;
OpcPublishingInterval = opcPublishingInterval;
}
}
}

Просмотреть файл

@ -0,0 +1,338 @@

using Newtonsoft.Json;
using Opc.Ua;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace OpcPublisher
{
using System.IO;
using System.Linq;
using System.Threading;
using static OpcMonitoredItem;
using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcStackConfiguration;
public class PublisherNodeConfiguration
{
public static SemaphoreSlim PublisherNodeConfigurationSemaphore = new SemaphoreSlim(1);
public static List<OpcSession> OpcSessions = new List<OpcSession>();
public static SemaphoreSlim OpcSessionsListSemaphore = new SemaphoreSlim(1);
public static string PublisherNodeConfigurationFilename
{
get => _publisherNodeConfigurationFilename;
set => _publisherNodeConfigurationFilename = value;
}
private static string _publisherNodeConfigurationFilename = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
private List<NodePublishingConfiguration> _nodePublishingConfiguration;
private static List<PublisherConfigurationFileEntry> _configurationFileEntries = new List<PublisherConfigurationFileEntry>();
public PublisherNodeConfiguration()
{
_nodePublishingConfiguration = new List<NodePublishingConfiguration>();
}
/// <summary>
/// Read and parse the publisher node configuration file.
/// </summary>
/// <returns></returns>
public async Task<bool> ReadConfigAsync()
{
// get information on the nodes to publish and validate the json by deserializing it.
try
{
await PublisherNodeConfigurationSemaphore.WaitAsync();
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_PNFP")))
{
Trace("Publishing node configuration file path read from environment.");
_publisherNodeConfigurationFilename = Environment.GetEnvironmentVariable("_GW_PNFP");
}
Trace($"Attempting to load nodes file from: {_publisherNodeConfigurationFilename}");
_configurationFileEntries = JsonConvert.DeserializeObject<List<PublisherConfigurationFileEntry>>(File.ReadAllText(_publisherNodeConfigurationFilename));
Trace($"Loaded {_configurationFileEntries.Count} config file entry/entries.");
foreach (var publisherConfigFileEntry in _configurationFileEntries)
{
if (publisherConfigFileEntry.NodeId == null)
{
// new node configuration syntax.
foreach (var opcNode in publisherConfigFileEntry.OpcNodes)
{
ExpandedNodeId expandedNodeId = ExpandedNodeId.Parse(opcNode.ExpandedNodeId);
_nodePublishingConfiguration.Add(new NodePublishingConfiguration(expandedNodeId, publisherConfigFileEntry.EndpointUri, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval));
}
}
else
{
// NodeId (ns=) format node configuration syntax using default sampling and publishing interval.
_nodePublishingConfiguration.Add(new NodePublishingConfiguration(publisherConfigFileEntry.NodeId, publisherConfigFileEntry.EndpointUri, OpcSamplingInterval, OpcPublishingInterval));
// give user a warning that the syntax is obsolete
Trace($"Please update the syntax of the configuration file and use ExpandedNodeId instead of NodeId property name for node with identifier '{publisherConfigFileEntry.NodeId.ToString()}' on EndpointUrl '{publisherConfigFileEntry.EndpointUri.AbsoluteUri}'.");
}
}
}
catch (Exception e)
{
Trace(e, "Loading of the node configuration file failed. Does the file exist and has correct syntax?");
Trace("exiting...");
return false;
}
finally
{
PublisherNodeConfigurationSemaphore.Release();
}
Trace($"There are {_nodePublishingConfiguration.Count.ToString()} nodes to publish.");
return true;
}
/// <summary>
/// Create the publisher data structures to manage OPC sessions, subscriptions and monitored items.
/// </summary>
/// <returns></returns>
public async Task<bool> CreateOpcPublishingDataAsync()
{
// create a list to manage sessions, subscriptions and monitored items.
try
{
await PublisherNodeConfigurationSemaphore.WaitAsync();
await OpcSessionsListSemaphore.WaitAsync();
var uniqueEndpointUris = _nodePublishingConfiguration.Select(n => n.EndpointUri).Distinct();
foreach (var endpointUri in uniqueEndpointUris)
{
// create new session info.
OpcSession opcSession = new OpcSession(endpointUri, OpcSessionCreationTimeout);
// create a subscription for each distinct publishing inverval
var nodesDistinctPublishingInterval = _nodePublishingConfiguration.Where(n => n.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)).Select(c => c.OpcPublishingInterval).Distinct();
foreach (var nodeDistinctPublishingInterval in nodesDistinctPublishingInterval)
{
// create a subscription for the publishing interval and add it to the session.
OpcSubscription opcSubscription = new OpcSubscription(nodeDistinctPublishingInterval);
// add all nodes with this OPC publishing interval to this subscription.
var nodesWithSamePublishingInterval = _nodePublishingConfiguration.Where(n => n.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase)).Where(n => n.OpcPublishingInterval == nodeDistinctPublishingInterval);
foreach (var nodeInfo in nodesWithSamePublishingInterval)
{
// differentiate if NodeId or ExpandedNodeId format is used
if (nodeInfo.NodeId == null)
{
// create a monitored item for the node, we do not have the namespace index without a connected session.
// so request a namespace update.
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.ExpandedNodeId, opcSession.EndpointUri, true)
{
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
SamplingInterval = nodeInfo.OpcSamplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
}
else
{
// create a monitored item for the node with the configured or default sampling interval
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUri)
{
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
SamplingInterval = nodeInfo.OpcSamplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
}
}
// add subscription to session.
opcSession.OpcSubscriptions.Add(opcSubscription);
}
// add session.
OpcSessions.Add(opcSession);
}
}
catch (Exception e)
{
Trace(e, "Creation of the internal OPC data managment structures failed.");
Trace("exiting...");
return false;
}
finally
{
OpcSessionsListSemaphore.Release();
PublisherNodeConfigurationSemaphore.Release();
}
return true;
}
/// <summary>
/// Returns a list of all published nodes for a specific endpoint in config file format.
/// </summary>
/// <returns></returns>
public static async Task<List<PublisherConfigurationFileEntry>> GetPublisherConfigurationFileEntries(Uri endpointUri, OpcMonitoredItemConfigurationType? requestedType, bool getAll)
{
List<PublisherConfigurationFileEntry> publisherConfigurationFileEntries = new List<PublisherConfigurationFileEntry>();
try
{
await PublisherNodeConfigurationSemaphore.WaitAsync();
// itereate through all sessions, subscriptions and monitored items and create config file entries
foreach (var session in OpcSessions)
{
if (endpointUri == null || session.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase))
{
PublisherConfigurationFileEntry publisherConfigurationFileEntry = new PublisherConfigurationFileEntry();
publisherConfigurationFileEntry.EndpointUri = session.EndpointUri;
publisherConfigurationFileEntry.NodeId = null;
publisherConfigurationFileEntry.OpcNodes = null;
foreach (var subscription in session.OpcSubscriptions)
{
foreach (var monitoredItem in subscription.OpcMonitoredItems)
{
// ignore items tagged to stop
if (monitoredItem.State != OpcMonitoredItemState.RemovalRequested || getAll == true)
{
OpcNodeOnEndpointUrl opcNodeOnEndpointUrl = new OpcNodeOnEndpointUrl();
if (monitoredItem.ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId)
{
// for certain scenarios we support returning the NodeId format even so the
// actual configuration of the node was in ExpandedNodeId format
if (requestedType == OpcMonitoredItemConfigurationType.NodeId)
{
PublisherConfigurationFileEntry legacyPublisherConfigFileEntry = new PublisherConfigurationFileEntry();
legacyPublisherConfigFileEntry.EndpointUri = session.EndpointUri;
legacyPublisherConfigFileEntry.NodeId = new NodeId(monitoredItem.ConfigExpandedNodeId.Identifier, (ushort)session.GetNamespaceIndex(monitoredItem.ConfigExpandedNodeId?.NamespaceUri));
publisherConfigurationFileEntries.Add(legacyPublisherConfigFileEntry);
}
else
{
opcNodeOnEndpointUrl.ExpandedNodeId = monitoredItem.ConfigExpandedNodeId.ToString();
opcNodeOnEndpointUrl.OpcPublishingInterval = (int)subscription.RequestedPublishingInterval;
opcNodeOnEndpointUrl.OpcSamplingInterval = monitoredItem.RequestedSamplingInterval;
if (publisherConfigurationFileEntry.OpcNodes == null)
{
publisherConfigurationFileEntry.OpcNodes = new List<OpcNodeOnEndpointUrl>();
}
publisherConfigurationFileEntry.OpcNodes.Add(opcNodeOnEndpointUrl);
}
}
else
{
// we do not convert nodes with legacy configuration to the new format to keep backward
// compatibility with external configurations.
// the conversion would only be possible, if the session is connected, to have access to the
// server namespace array.
PublisherConfigurationFileEntry legacyPublisherConfigFileEntry = new PublisherConfigurationFileEntry();
legacyPublisherConfigFileEntry.EndpointUri = session.EndpointUri;
legacyPublisherConfigFileEntry.NodeId = monitoredItem.ConfigNodeId;
publisherConfigurationFileEntries.Add(legacyPublisherConfigFileEntry);
}
}
}
}
if (publisherConfigurationFileEntry.OpcNodes != null)
{
publisherConfigurationFileEntries.Add(publisherConfigurationFileEntry);
}
}
}
}
catch (Exception e)
{
Trace(e, "Creation of configuration file entries failed.");
}
finally
{
PublisherNodeConfigurationSemaphore.Release();
}
return publisherConfigurationFileEntries;
}
/// <summary>
/// Updates the configuration file to persist all currently published nodes
/// </summary>
public static async Task UpdateNodeConfigurationFile()
{
try
{
// itereate through all sessions, subscriptions and monitored items and create config file entries
List<PublisherConfigurationFileEntry> publisherNodeConfiguration = await GetPublisherConfigurationFileEntries(null, null, true);
// update the config file
File.WriteAllText(PublisherNodeConfigurationFilename, JsonConvert.SerializeObject(publisherNodeConfiguration, Formatting.Indented));
}
catch (Exception e)
{
Trace(e, "Update of node configuration file failed.");
}
}
}
/// <summary>
/// Class describing a list of nodes in the ExpandedNodeId format
/// </summary>
public class OpcNodeOnEndpointUrl
{
public string ExpandedNodeId;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public int? OpcSamplingInterval;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public int? OpcPublishingInterval;
}
/// <summary>
/// Class describing the nodes which should be published. It supports three formats:
/// - NodeId syntax using the namespace index (ns) syntax
/// - ExpandedNodeId syntax, using the namespace URI (nsu) syntax
/// - List of ExpandedNodeId syntax, to allow putting nodes with similar publishing and/or sampling intervals in one object
/// </summary>
public partial class PublisherConfigurationFileEntry
{
public PublisherConfigurationFileEntry()
{
}
public PublisherConfigurationFileEntry(string nodeId, string endpointUrl)
{
NodeId = new NodeId(nodeId);
EndpointUri = new Uri(endpointUrl);
}
[JsonProperty("EndpointUrl")]
public Uri EndpointUri;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public NodeId NodeId;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public List<OpcNodeOnEndpointUrl> OpcNodes;
}
/// <summary>
/// Describes the publishing information of a node.
/// </summary>
public class NodePublishingConfiguration
{
public Uri EndpointUri;
public NodeId NodeId;
public ExpandedNodeId ExpandedNodeId;
public int OpcSamplingInterval;
public int OpcPublishingInterval;
public NodePublishingConfiguration(NodeId nodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval)
{
NodeId = nodeId;
ExpandedNodeId = null;
EndpointUri = endpointUri;
OpcSamplingInterval = opcSamplingInterval;
OpcPublishingInterval = opcPublishingInterval;
}
public NodePublishingConfiguration(ExpandedNodeId expandedNodeId, Uri endpointUri, int opcSamplingInterval, int opcPublishingInterval)
{
NodeId = null;
ExpandedNodeId = expandedNodeId;
EndpointUri = endpointUri;
OpcSamplingInterval = opcSamplingInterval;
OpcPublishingInterval = opcPublishingInterval;
}
}
}

Просмотреть файл

@ -8,16 +8,17 @@ namespace OpcPublisher
{ {
using IoTHubCredentialTools; using IoTHubCredentialTools;
using Newtonsoft.Json; using Newtonsoft.Json;
using System.IO;
using System.Linq; using System.Linq;
using static IotHubMessaging; using static IotHubMessaging;
using static OpcPublisher.OpcMonitoredItem;
using static OpcPublisher.Program; using static OpcPublisher.Program;
using static OpcPublisher.Workarounds.TraceWorkaround; using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcStackConfiguration; using static OpcStackConfiguration;
using static PublisherNodeConfiguration;
public class PublisherNodeManager : CustomNodeManager2 public class PublisherNodeManager : CustomNodeManager2
{ {
public PublisherNodeManager(Opc.Ua.Server.IServerInternal server, ApplicationConfiguration configuration) public PublisherNodeManager(Opc.Ua.Server.IServerInternal server, ApplicationConfiguration configuration)
: base(server, configuration, Namespaces.PublisherApplications) : base(server, configuration, Namespaces.PublisherApplications)
{ {
@ -114,8 +115,8 @@ namespace OpcPublisher
MethodState unpublishNodeMethod = CreateMethod(methodsFolder, "UnpublishNode", "UnpublishNode"); MethodState unpublishNodeMethod = CreateMethod(methodsFolder, "UnpublishNode", "UnpublishNode");
SetUnpublishNodeMethodProperties(ref unpublishNodeMethod); SetUnpublishNodeMethodProperties(ref unpublishNodeMethod);
MethodState getListOfPublishedNodesMethod = CreateMethod(methodsFolder, "GetListOfPublishedNodes", "GetListOfPublishedNodes"); MethodState getPublishedNodesMethod = CreateMethod(methodsFolder, "GetPublishedNodes", "GetPublishedNodes");
SetGetListOfPublishedNodesMethodProperties(ref getListOfPublishedNodesMethod); SetGetPublishedNodesMethodProperties(ref getPublishedNodesMethod);
} }
catch (Exception e) catch (Exception e)
{ {
@ -127,10 +128,27 @@ namespace OpcPublisher
} }
/// <summary> /// <summary>
/// Sets properies of the GetListOfPublishedNodes method. /// Sets properies of the GetPublishedNodes method.
/// </summary> /// </summary>
private void SetGetListOfPublishedNodesMethodProperties(ref MethodState method) private void SetGetPublishedNodesMethodProperties(ref MethodState method)
{ {
// define input arguments
method.InputArguments = new PropertyState<Argument[]>(method)
{
NodeId = new NodeId(method.BrowseName.Name + "InArgs", NamespaceIndex),
BrowseName = BrowseNames.InputArguments
};
method.InputArguments.DisplayName = method.InputArguments.BrowseName.Name;
method.InputArguments.TypeDefinitionId = VariableTypeIds.PropertyType;
method.InputArguments.ReferenceTypeId = ReferenceTypeIds.HasProperty;
method.InputArguments.DataType = DataTypeIds.Argument;
method.InputArguments.ValueRank = ValueRanks.OneDimension;
method.InputArguments.Value = new Argument[]
{
new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server to return the published nodes for.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }
};
// set output arguments // set output arguments
method.OutputArguments = new PropertyState<Argument[]>(method) method.OutputArguments = new PropertyState<Argument[]>(method)
{ {
@ -147,7 +165,7 @@ namespace OpcPublisher
{ {
new Argument() { Name = "Published nodes", Description = "List of the nodes published by Publisher", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar } new Argument() { Name = "Published nodes", Description = "List of the nodes published by Publisher", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }
}; };
method.OnCallMethod = new GenericMethodCalledEventHandler(OnGetListOfPublishedNodesCall); method.OnCallMethod = new GenericMethodCalledEventHandler(OnGetPublishedNodesCall);
} }
/// <summary> /// <summary>
@ -169,7 +187,7 @@ namespace OpcPublisher
method.InputArguments.Value = new Argument[] method.InputArguments.Value = new Argument[]
{ {
new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in 'ns=' syntax.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in NodeId format.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar },
new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server owning the node.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar } new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server owning the node.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }
}; };
@ -195,7 +213,7 @@ namespace OpcPublisher
method.InputArguments.Value = new Argument[] method.InputArguments.Value = new Argument[]
{ {
new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in 'ns=' syntax.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, new Argument() { Name = "NodeId", Description = "NodeId of the node to publish in NodeId format.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar },
new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server owning the node.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar }, new Argument() { Name = "EndpointUri", Description = "Endpoint URI of the OPC UA server owning the node.", DataType = DataTypeIds.String, ValueRank = ValueRanks.Scalar },
}; };
@ -412,25 +430,18 @@ namespace OpcPublisher
/// </summary> /// </summary>
private ServiceResult OnPublishNodeCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments) private ServiceResult OnPublishNodeCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{ {
if (inputArguments[0] == null || inputArguments[1] == null) if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string))
{ {
Trace("PublishNode: Invalid Arguments when trying to publish a node."); Trace("PublishNode: Invalid Arguments when trying to publish a node.");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments!"); return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
} }
NodeToPublishConfig nodeToPublish;
NodeId nodeId = null; NodeId nodeId = null;
Uri endpointUri = null; Uri endpointUri = null;
try try
{ {
if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string))
{
Trace($"PublishNode: Arguments (0 (nodeId), 1 (endpointUrl)) are not valid strings!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
}
nodeId = NodeId.Parse(inputArguments[0] as string); nodeId = NodeId.Parse(inputArguments[0] as string);
endpointUri = new Uri(inputArguments[1] as string); endpointUri = new Uri(inputArguments[1] as string);
nodeToPublish = new NodeToPublishConfig(nodeId, endpointUri, OpcSamplingInterval, OpcPublishingInterval);
} }
catch (UriFormatException) catch (UriFormatException)
{ {
@ -440,12 +451,15 @@ namespace OpcPublisher
catch (Exception e) catch (Exception e)
{ {
Trace(e, $"PublishNode: The NodeId has an invalid format '{inputArguments[0] as string}'!"); Trace(e, $"PublishNode: The NodeId has an invalid format '{inputArguments[0] as string}'!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA NodeId in 'ns=' syntax as first argument!"); return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA NodeId in NodeId format as first argument!");
} }
// find/create a session to the endpoint URL and start monitoring the node. // find/create a session to the endpoint URL and start monitoring the node.
try try
{ {
// lock the publishing configuration till we are done
OpcSessionsListSemaphore.WaitAsync();
if (ShutdownTokenSource.IsCancellationRequested) if (ShutdownTokenSource.IsCancellationRequested)
{ {
return ServiceResult.Create(StatusCodes.BadUnexpectedError, $"Publisher shutdown in progress."); return ServiceResult.Create(StatusCodes.BadUnexpectedError, $"Publisher shutdown in progress.");
@ -453,59 +467,45 @@ namespace OpcPublisher
// find the session we need to monitor the node // find the session we need to monitor the node
OpcSession opcSession = null; OpcSession opcSession = null;
try opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase));
{ string namespaceUri = null;
OpcSessionsListSemaphore.WaitAsync(); ExpandedNodeId expandedNodeId = null;
opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == nodeToPublish.EndpointUri);
// add a new session. // add a new session.
if (opcSession == null) if (opcSession == null)
{
// create new session info.
opcSession = new OpcSession(endpointUri, OpcSessionCreationTimeout);
OpcSessions.Add(opcSession);
Trace($"PublishNode: No matching session found for endpoint '{endpointUri.OriginalString}'. Requested to create a new one.");
}
else
{
Trace($"PublishNode: Session found for endpoint '{endpointUri.OriginalString}'");
// check if node is already published
namespaceUri = opcSession.GetNamespaceUri(nodeId.NamespaceIndex);
if (string.IsNullOrEmpty(namespaceUri))
{ {
// create new session info. return ServiceResult.Create(StatusCodes.BadUnexpectedError, $"The namespace index of the node id is invalid.");
opcSession = new OpcSession(nodeToPublish.EndpointUri, OpcSessionCreationTimeout);
OpcSessions.Add(opcSession);
Trace($"PublishNode: No matching session found for endpoint '{nodeToPublish.EndpointUri.OriginalString}'. Requested to create a new one.");
} }
else expandedNodeId = new ExpandedNodeId(nodeId.Identifier, nodeId.NamespaceIndex, namespaceUri, 0);
{
Trace($"PublishNode: Session found for endpoint '{nodeToPublish.EndpointUri.OriginalString}'");
}
// add the node info to the subscription with the default publishing interval, execute syncronously
opcSession.AddNodeForMonitoring(OpcPublishingInterval, OpcSamplingInterval, nodeToPublish.NodeId).Wait();
Trace($"PublishNode: Requested to monitor item with NodeId '{nodeToPublish.NodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})");
}
finally
{
OpcSessionsListSemaphore.Release();
} }
// update our data // add the node info to the subscription with the default publishing interval, execute syncronously
try opcSession.AddNodeForMonitoring(nodeId, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval).Wait();
{ Trace($"PublishNode: Requested to monitor item with NodeId '{nodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})");
PublishDataSemaphore.WaitAsync();
PublishConfig.Add(nodeToPublish);
// add it also to the publish file
var publisherConfigFileEntry = new PublisherConfigFileEntry()
{
EndpointUri = endpointUri,
NodeId = nodeId
};
PublisherConfigFileEntries.Add(publisherConfigFileEntry);
File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublisherConfigFileEntries));
}
finally
{
PublishDataSemaphore.Release();
}
return ServiceResult.Good;
} }
catch (Exception e) catch (Exception e)
{ {
Trace(e, $"PublishNode: Exception while trying to configure publishing node '{nodeToPublish.NodeId.ToString()}'"); Trace(e, $"PublishNode: Exception while trying to configure publishing node '{nodeId.ToString()}'");
return ServiceResult.Create(e, StatusCodes.BadUnexpectedError, $"Unexpected error publishing node: {e.Message}"); return ServiceResult.Create(e, StatusCodes.BadUnexpectedError, $"Unexpected error publishing node: {e.Message}");
} }
finally
{
OpcSessionsListSemaphore.Release();
}
return ServiceResult.Good;
} }
/// <summary> /// <summary>
@ -513,7 +513,7 @@ namespace OpcPublisher
/// </summary> /// </summary>
private ServiceResult OnUnpublishNodeCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments) private ServiceResult OnUnpublishNodeCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{ {
if (inputArguments[0] == null || inputArguments[1] == null) if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string))
{ {
Trace("UnpublishNode: Invalid arguments!"); Trace("UnpublishNode: Invalid arguments!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments!"); return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments!");
@ -523,12 +523,7 @@ namespace OpcPublisher
Uri endpointUri = null; Uri endpointUri = null;
try try
{ {
if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string)) nodeId = NodeId.Parse(inputArguments[0] as string);
{
Trace($"UnpublishNode: Arguments (0 (nodeId), 1 (endpointUrl)) are not valid strings!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
}
nodeId = inputArguments[0] as string;
endpointUri = new Uri(inputArguments[1] as string); endpointUri = new Uri(inputArguments[1] as string);
} }
catch (UriFormatException) catch (UriFormatException)
@ -550,7 +545,7 @@ namespace OpcPublisher
try try
{ {
OpcSessionsListSemaphore.WaitAsync(); OpcSessionsListSemaphore.WaitAsync();
opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == endpointUri); opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri.AbsoluteUri.Equals(endpointUri.AbsoluteUri, StringComparison.OrdinalIgnoreCase));
} }
catch catch
{ {
@ -561,57 +556,73 @@ namespace OpcPublisher
OpcSessionsListSemaphore.Release(); OpcSessionsListSemaphore.Release();
} }
ExpandedNodeId expandedNodeId = null;
if (opcSession == null) if (opcSession == null)
{ {
// do nothing if there is no session for this endpoint. // do nothing if there is no session for this endpoint.
Trace($"UnpublishNode: Session for endpoint '{endpointUri.OriginalString}' not found."); Trace($"UnpublishNode: Session for endpoint '{endpointUri.OriginalString}' not found.");
return ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session for endpoint of published node not found!"); return ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session for endpoint of node to unpublished not found!");
} }
else else
{ {
Trace($"UnpublishNode: Session found for endpoint '{endpointUri.OriginalString}'"); // check if node is already published
string namespaceUri = opcSession.GetNamespaceUri(nodeId.NamespaceIndex);
if (string.IsNullOrEmpty(namespaceUri))
{
return ServiceResult.Create(StatusCodes.BadUnexpectedError, $"The namespace index of the node id is invalid.");
}
expandedNodeId = new ExpandedNodeId(nodeId.Identifier, nodeId.NamespaceIndex, namespaceUri, 0);
if (!OpcSession.IsNodePublished(nodeId, expandedNodeId, endpointUri))
{
Trace($"UnpublishNode: Node with id '{nodeId.Identifier.ToString()}' on endpoint '{endpointUri.OriginalString}' is not published.");
return ServiceResult.Good;
}
} }
// remove the node from the sessions monitored items list. // remove the node from the sessions monitored items list.
opcSession.TagNodeForMonitoringStop(nodeId).Wait(); opcSession.RequestMonitorItemRemoval(nodeId, expandedNodeId).Wait();
Trace("UnpublishNode: Requested to stop monitoring of node."); Trace("UnpublishNode: Requested to stop monitoring of node.");
// remove node from persisted config file
try
{
PublishDataSemaphore.WaitAsync();
var entryToRemove = PublisherConfigFileEntries.Find(l => l.NodeId == nodeId && l.EndpointUri == endpointUri);
PublisherConfigFileEntries.Remove(entryToRemove);
File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublisherConfigFileEntries));
}
finally
{
PublishDataSemaphore.Release();
}
} }
catch (Exception e) catch (Exception e)
{ {
Trace(e, $"UnpublishNode: Exception while trying to configure publishing node '{nodeId.ToString()}'"); Trace(e, $"UnpublishNode: Exception while trying to configure publishing node '{nodeId.ToString()}'");
return ServiceResult.Create(e, StatusCodes.BadUnexpectedError, $"Unexpected error publishing node: {e.Message}"); return ServiceResult.Create(e, StatusCodes.BadUnexpectedError, $"Unexpected error unpublishing node: {e.Message}");
} }
return ServiceResult.Good; return ServiceResult.Good;
} }
/// <summary> /// <summary>
/// Method to get the list of published nodes. Executes synchronously. /// Method to get the list of published nodes. Executes synchronously.
/// The format of the returned node description is using NodeId format. The assumption
/// is that the caller is able to access the namespace array of the server
/// on the endpoint URL(s) themselve and do the correct mapping.
/// </summary> /// </summary>
private ServiceResult OnGetListOfPublishedNodesCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments) private ServiceResult OnGetPublishedNodesCall(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{ {
try Uri endpointUri = null;
if (string.IsNullOrEmpty(inputArguments[0] as string))
{ {
PublishDataSemaphore.WaitAsync(); Trace("GetPublishedNodes: Return all published nodes");
outputArguments[0] = JsonConvert.SerializeObject(PublisherConfigFileEntries);
} }
finally else
{ {
PublishDataSemaphore.Release(); try
{
endpointUri = new Uri(inputArguments[0] as string);
}
catch (UriFormatException)
{
Trace($"GetPublishedNodes: The endpointUrl is invalid '{inputArguments[0] as string}'!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA endpoint URL as first argument!");
}
} }
Trace("GetListOfPublishedNodes: Success!");
// get the list of published nodes in NodeId format
outputArguments[0] = JsonConvert.SerializeObject(GetPublisherConfigurationFileEntries(endpointUri, OpcMonitoredItemConfigurationType.NodeId, false));
Trace("GetPublishedNodes: Success!");
return ServiceResult.Good; return ServiceResult.Good;
} }
@ -633,7 +644,7 @@ namespace OpcPublisher
// read current connection string and compare to the one passed in // read current connection string and compare to the one passed in
string currentConnectionString = SecureIoTHubToken.ReadAsync(PublisherOpcApplicationConfiguration.ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath).Result; string currentConnectionString = SecureIoTHubToken.ReadAsync(PublisherOpcApplicationConfiguration.ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath).Result;
if (string.Equals(connectionString, currentConnectionString, StringComparison.OrdinalIgnoreCase)) if (connectionString.Equals(currentConnectionString, StringComparison.OrdinalIgnoreCase))
{ {
Trace("ConnectionStringWrite: Connection string up to date!"); Trace("ConnectionStringWrite: Connection string up to date!");
return ServiceResult.Create(StatusCodes.Bad, "Connection string already up-to-date!"); return ServiceResult.Create(StatusCodes.Bad, "Connection string already up-to-date!");

Просмотреть файл

@ -28,9 +28,9 @@
] ]
}, },
// the format below is only supported for backward compatibility. you need to ensure that the // the format below (NodeId format) is only supported for backward compatibility. you need to ensure that the
// OPC UA server on the configured EndpointUrl has the namespaceindex you expect with your configuration. // OPC UA server on the configured EndpointUrl has the namespaceindex you expect with your configuration.
// please use the ExpandedNodeId syntax instead. // please use the ExpandedNodeId format as in the examples above instead.
{ {
"EndpointUrl": "opc.tcp://<your_opcua_server>:<your_opcua_server_port>/<your_opcua_server_path>", "EndpointUrl": "opc.tcp://<your_opcua_server>:<your_opcua_server_port>/<your_opcua_server_path>",
"NodeId": { "NodeId": {