Refactor publishing path, add diagnostics, update documentation (#1)

* Refactor publishing path, add diagnostics, update documentation.

* Refactor publishing path, add diagnostics, update documentation.
This commit is contained in:
Hans Gschossmann 2017-10-25 17:19:28 +02:00 коммит произвёл GitHub
Родитель b8d84d96cc
Коммит a43e00e6a6
7 изменённых файлов: 915 добавлений и 355 удалений

526
README.md
Просмотреть файл

@ -88,9 +88,10 @@ The syntax of the configuration file is as follows:
## Command line options
The complete usage of the application can be shown using the `--help` command line option and is as follows:
OpcPublisher.exe <applicationname> [<iothubconnectionstring>] [<options>]
Usage: OpcPublisher.exe <applicationname> [<iothubconnectionstring>] [<options>]
with:
OPC Edge Publisher to subscribe to configured OPC UA servers and send telemetry to Azure IoTHub.
To exit the application, just press ENTER while it is running.
applicationname: the OPC UA application name to use, required
The application name is also used to register the publisher under this name in the
@ -98,175 +99,195 @@ with:
iothubconnectionstring: the IoTHub owner connectionstring, optional
The following options are supported:
There are a couple of environment variables which can be used to control the application:
_HUB_CS: sets the IoTHub owner connectionstring
_GW_LOGP: sets the filename of the log file to use
_TPC_SP: sets the path to store certificates of trusted stations
_GW_PNFP: sets the filename of the publishing configuration file
--pf, --publishfile=VALUE
the filename to configure the nodes to publish.
Default: './publishednodes.json'
--sd, --shopfloordomain=VALUE
the domain of the shopfloor. if specified this
domain is appended (delimited by a ':' to the '
ApplicationURI' property when telemetry is
sent to IoTHub.
The value must follow the syntactical rules of a
DNS hostname.
Default: not set
--sw, --sessionconnectwait=VALUE
specify the wait time in seconds publisher is
trying to connect to disconnected endpoints and
starts monitoring unmonitored items
Min: 10
Default: 10
--vc, --verboseconsole=VALUE
the output of publisher is shown on the console.
Default: False
--ih, --iothubprotocol=VALUE
the protocol to use for communication with Azure
IoTHub (allowed values: Amqp, Http1, Amqp_
WebSocket_Only, Amqp_Tcp_Only, Mqtt, Mqtt_
WebSocket_Only, Mqtt_Tcp_Only).
Default: Mqtt
--ms, --iothubmessagesize=VALUE
the max size of a message which can be send to
IoTHub. when telemetry of this size is available
it will be sent.
0 will enforce immediate send when telemetry is
available
Min: 0
Max: 256 * 1024
Default: 4096
--si, --iothubsendinterval=VALUE
the interval in seconds when telemetry should be
send to IoTHub. If 0, then only the
iothubmessagesize parameter controls when
telemetry is sent.
Default: '1'
--lf, --logfile=VALUE the filename of the logfile to use.
Default: './Logs/<applicationname>.log.txt'
--pn, --portnum=VALUE the server port of the publisher OPC server
endpoint.
Default: 62222
--pa, --path=VALUE the enpoint URL path part of the publisher OPC
server endpoint.
Default: '/UA/Publisher'
--lr, --ldsreginterval=VALUE
the LDS(-ME) registration interval in ms. If 0,
then the registration is disabled.
Default: 0
--ot, --operationtimeout=VALUE
the operation timeout of the publisher OPC UA
client in ms.
Default: 120000
--oi, --opcsamplinginterval=VALUE
the publisher is using this as default value in
milliseconds to request the servers to sample
the nodes with this interval
this value might be revised by the OPC UA
servers to a supported sampling interval.
please check the OPC UA specification for
details how this is handled by the OPC UA stack.
a negative value will set the sampling interval
to the publishing interval of the subscription
this node is on.
0 will configure the OPC UA server to sample in
the highest possible resolution and should be
taken with care.
Default: 1000
--op, --opcpublishinginterval=VALUE
the publisher is using this as default value in
milliseconds for the publishing interval setting
of the subscriptions established to the OPC UA
servers.
please check the OPC UA specification for
details how this is handled by the OPC UA stack.
a value less than or equal zero will let the
server revise the publishing interval.
Default: 0
--ct, --createsessiontimeout=VALUE
specify the timeout in seconds used when creating
a session to an endpoint. On unsuccessful
connection attemps a backoff up to 5 times the
specified timeout value is used.
Min: 1
Default: 10
--ki, --keepaliveinterval=VALUE
specify the interval in seconds the publisher is
sending keep alive messages to the OPC servers
on the endpoints it is connected to.
Min: 2
Default: 2
--kt, --keepalivethreshold=VALUE
specify the number of keep alive packets a server
can miss, before the session is disconneced
Min: 1
Default: 5
--st, --opcstacktracemask=VALUE
the trace mask for the OPC stack. See github OPC .
NET stack for definitions.
To enable IoTHub telemetry tracing set it to 711.
Command line arguments overrule environment variable settings.
Default: 285 (645)
--as, --autotrustservercerts=VALUE
the publisher trusts all servers it is
establishing a connection to.
Default: False
--tm, --trustmyself=VALUE
the publisher certificate is put into the trusted
certificate store automatically.
Default: True
--fd, --fetchdisplayname=VALUE
enable to read the display name of a published
node from the server. this will increase the
runtime.
Default: False
--at, --appcertstoretype=VALUE
the own application cert store type.
(allowed values: Directory, X509Store)
Default: 'X509Store'
--ap, --appcertstorepath=VALUE
the path where the own application cert should be
stored
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/own'
--tt, --trustedcertstoretype=VALUE
the trusted cert store type.
(allowed values: Directory, X509Store)
Default: Directory
--tp, --trustedcertstorepath=VALUE
the path of the trusted cert store
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/trusted'
--rt, --rejectedcertstoretype=VALUE
the rejected cert store type.
(allowed values: Directory, X509Store)
Default: Directory
--rp, --rejectedcertstorepath=VALUE
the path of the rejected cert store
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/rejected'
--it, --issuercertstoretype=VALUE
the trusted issuer cert store type.
(allowed values: Directory, X509Store)
Default: Directory
--ip, --issuercertstorepath=VALUE
the path of the trusted issuer cert store
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/issuers'
--dt, --devicecertstoretype=VALUE
the iothub device cert store type.
(allowed values: Directory, X509Store)
Default: X509Store
--dp, --devicecertstorepath=VALUE
the path of the iot device cert store
Default Default (depends on store type):
X509Store: 'My'
Directory: 'CertificateStores/IoTHub'
Options:
--pf, --publishfile=VALUE
the filename to configure the nodes to publish.
Default: 'D:\Repos\hg\iot-edge-opc-publisher\src\
publishednodes.json'
--sd, --shopfloordomain=VALUE
the domain of the shopfloor. if specified this
domain is appended (delimited by a ':' to the '
ApplicationURI' property when telemetry is sent
to IoTHub.
The value must follow the syntactical rules of a
DNS hostname.
Default: not set
--sw, --sessionconnectwait=VALUE
specify the wait time in seconds publisher is
trying to connect to disconnected endpoints and
starts monitoring unmonitored items
Min: 10
Default: 10
--mq, --monitoreditemqueuecapacity=VALUE
specify how many notifications of monitored items
could be stored in the internal queue, if the
data could not be sent quick enough to IoTHub
Min: 1024
Default: 8192
--di, --diagnosticsinterval=VALUE
shows publisher diagnostic info at the specified
interval in seconds. 0 disables diagnostic
output.
Default: 0
--vc, --verboseconsole=VALUE
the output of publisher is shown on the console.
Default: False
--ih, --iothubprotocol=VALUE
the protocol to use for communication with Azure
IoTHub (allowed values: Amqp, Http1, Amqp_
WebSocket_Only, Amqp_Tcp_Only, Mqtt, Mqtt_
WebSocket_Only, Mqtt_Tcp_Only).
Default: Mqtt
--ms, --iothubmessagesize=VALUE
the max size of a message which can be send to
IoTHub. when telemetry of this size is available
it will be sent.
0 will enforce immediate send when telemetry is
available
Min: 0
Max: 262144
Default: 4096
--si, --iothubsendinterval=VALUE
the interval in seconds when telemetry should be
send to IoTHub. If 0, then only the
iothubmessagesize parameter controls when
telemetry is sent.
Default: '1'
--lf, --logfile=VALUE the filename of the logfile to use.
Default: './Logs/<applicationname>.log.txt'
--pn, --portnum=VALUE the server port of the publisher OPC server
endpoint.
Default: 62222
--pa, --path=VALUE the enpoint URL path part of the publisher OPC
server endpoint.
Default: '/UA/Publisher'
--lr, --ldsreginterval=VALUE
the LDS(-ME) registration interval in ms. If 0,
then the registration is disabled.
Default: 0
--ot, --operationtimeout=VALUE
the operation timeout of the publisher OPC UA
client in ms.
Default: 120000
--oi, --opcsamplinginterval=VALUE
the publisher is using this as default value in
milliseconds to request the servers to sample
the nodes with this interval
this value might be revised by the OPC UA
servers to a supported sampling interval.
please check the OPC UA specification for
details how this is handled by the OPC UA stack.
a negative value will set the sampling interval
to the publishing interval of the subscription
this node is on.
0 will configure the OPC UA server to sample in
the highest possible resolution and should be
taken with care.
Default: 1000
--op, --opcpublishinginterval=VALUE
the publisher is using this as default value in
milliseconds for the publishing interval setting
of the subscriptions established to the OPC UA
servers.
please check the OPC UA specification for
details how this is handled by the OPC UA stack.
a value less than or equal zero will let the
server revise the publishing interval.
Default: 0
--ct, --createsessiontimeout=VALUE
specify the timeout in seconds used when creating
a session to an endpoint. On unsuccessful
connection attemps a backoff up to 5 times the
specified timeout value is used.
Min: 1
Default: 10
--ki, --keepaliveinterval=VALUE
specify the interval in seconds the publisher is
sending keep alive messages to the OPC servers
on the endpoints it is connected to.
Min: 2
Default: 2
--kt, --keepalivethreshold=VALUE
specify the number of keep alive packets a server
can miss, before the session is disconneced
Min: 1
Default: 5
--st, --opcstacktracemask=VALUE
the trace mask for the OPC stack. See github OPC .
NET stack for definitions.
To enable IoTHub telemetry tracing set it to 711.
Default: 285 (645)
--as, --autotrustservercerts=VALUE
the publisher trusts all servers it is
establishing a connection to.
Default: False
--tm, --trustmyself=VALUE
the publisher certificate is put into the trusted
certificate store automatically.
Default: True
--fd, --fetchdisplayname=VALUE
enable to read the display name of a published
node from the server. this will increase the
runtime.
Default: False
--at, --appcertstoretype=VALUE
the own application cert store type.
(allowed values: Directory, X509Store)
Default: 'X509Store'
--ap, --appcertstorepath=VALUE
the path where the own application cert should be
stored
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/own'
--tt, --trustedcertstoretype=VALUE
the trusted cert store type.
(allowed values: Directory, X509Store)
Default: Directory
--tp, --trustedcertstorepath=VALUE
the path of the trusted cert store
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/trusted'
--rt, --rejectedcertstoretype=VALUE
the rejected cert store type.
(allowed values: Directory, X509Store)
Default: Directory
--rp, --rejectedcertstorepath=VALUE
the path of the rejected cert store
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/rejected'
--it, --issuercertstoretype=VALUE
the trusted issuer cert store type.
(allowed values: Directory, X509Store)
Default: Directory
--ip, --issuercertstorepath=VALUE
the path of the trusted issuer cert store
Default (depends on store type):
X509Store: 'CurrentUser\UA_MachineDefault'
Directory: 'CertificateStores/issuers'
--dt, --devicecertstoretype=VALUE
the iothub device cert store type.
(allowed values: Directory, X509Store)
Default: X509Store
--dp, --devicecertstorepath=VALUE
the path of the iot device cert store
Default Default (depends on store type):
X509Store: 'My'
Directory: 'CertificateStores/IoTHub'
-h, --help show this message and exit
There are a couple of environment variables which can be used to control the application:
* _HUB_CS: sets the IoTHub owner connectionstring
* _GW_LOGP: sets the filename of the log file to use
@ -330,6 +351,177 @@ In certain use cases it may make sense to read configuration information from or
### Store for X509 certificates
Storing X509 certificates does not work with bind mounts, since the permissions of the path to the store need to be `rw` for the owner. Instead you need to use the `-v` option of `docker run` in the volume mode.
## Performance and memory considerations
### Commandline parameters contolling performance and memory
When running Publisher you need to be aware of your performance requirements and the memory resources you have available on your platform.
Since both are interdependent and both depend on the configuration of how many nodes are configured to publish, you should ensure that the parameters you are using for:
* IoTHub send interval (`--si`)
* IoTHub message size (`--ms`)
* Monitored Items queue capacity (`--mq`)
do meet your requirements.
The `--mq` parameter controls the upper bound of the capacity of the internal queue, which buffers all notifications if a value of an OPC node changes. If Publisher is not able to send messages to IoTHub fast enough,
then this queue buffers those notifications. The parameter sets the number of notifications which could be buffered. If you seen the number of items in this queue increasing in your test runs, you need to:
* decrease the IoTHub send interval (`--si`)
* increase the IoTHub message size (`--ms`)
otherwise you will loose the data values of those OPC node changes. The `--mq` parameter at the same time allows to prevent controlling the upper bound of the memory resources used by Publisher.
The `--si` parameter enforces Publisher to send messages to IoTHub as the specified interval. If there is an IoTHub message size specified via the `--ms` parameter (or by the default value for it),
then a message will be sent either when the message size is reached (in this case the interval is restarted) or when the specified interval time has passed. If you disable the message size by `--ms 0`, Publisher
uses the maximal possible IoTHub message size of 256 kB to batch data.
The `--ms` parameter allows you to enable batching of messages sent to IoTHub. Depending on the protocol you are using, the overhead to send a message to IoTHub is high compared to the actual time of sending the payload.
If your scenario allows latency for the data ingested, you should configure Publisher to use the maximal message size of 256 kB.
Before you use Publisher in production scenarios, you need to test the performance and memory under production conditions. You could use the `--di` commandline parameter to specify a interval in seconds,
which will trigger the output of diagnostic information at this interval.
### Test measurements
Here are some measurements with different values for `--si` and `--ms` parameters publishing 497 nodes with an OPC publishing interval of 1 second.
Publisher was used as debug build on Windows 10 natively for 120 seconds. The IoTHub protocol was configured to use HTTP (`--ih Http1`).
#### Default configuration (--si 1 --ms 4096)
======================================================================
OpcPublisher status @ 25.10.2017 11:08:25
---------------------------------
OPC sessions: 1
connected OPC sessions: 1
connected OPC subscriptions: 1
OPC monitored items: 497
---------------------------------
monitored items queue bounded capacity: 8192
monitored items queue current items: 8107
monitored item notifications enqueued: 57076
monitored item notifications enqueue failure: 28551
monitored item notifications dequeued: 20418
---------------------------------
messages sent to IoTHub: 1200
bytes sent to IoTHub: 4772773
avg msg size: 3977
time in ms for sent msgs: 113744
min time in ms for msg: 84
max time in ms for msg: 441
avg time in ms for msg: 94
msg send failures: 0
time in ms for failed msgs: 0
avg time in ms for failed msg: 0
messages too large to sent to IoTHub: 0
times we missed send interval: 0
---------------------------------
current working set in MB: 90
======================================================================
The default configuration sends data to IoTHub each second or when 4kB of data to ingest is available. In this configuration we loose 28551 OPC node value updates (`monitored item notifications enqueue failure`).
#### Constant send inverval (--si 1 --ms 0)
======================================================================
OpcPublisher status @ 25.10.2017 11:18:20
---------------------------------
OPC sessions: 1
connected OPC sessions: 1
connected OPC subscriptions: 1
OPC monitored items: 497
---------------------------------
monitored items queue bounded capacity: 8192
monitored items queue current items: 0
monitored item notifications enqueued: 56682
monitored item notifications enqueue failure: 0
monitored item notifications dequeued: 56682
---------------------------------
messages sent to IoTHub: 114
bytes sent to IoTHub: 13130523
avg msg size: 115180
time in ms for sent msgs: 14454
min time in ms for msg: 100
max time in ms for msg: 705
avg time in ms for msg: 126
msg send failures: 0
time in ms for failed msgs: 0
avg time in ms for failed msg: 0
messages too large to sent to IoTHub: 0
times we missed send interval: 0
---------------------------------
current working set in MB: 87
======================================================================
When the message size is set to 0 and there is a send interval configured (or the default of 1 second is used), then Publisher does use internally batch data using the maximal supported IoTHub message size, which is 256 kB. As you see in the diagnostic output,
the average message size is 115180 byte. In this configuration we do not loose any OPC node value udpates. The average time to send a message to IoTHub (`avg time in ms for msg`) was only 22ms higher than in the
default configuration, which has a average message size of 3877 byte.
#### Send each OPC node value update (--si 0 --ms 0)
======================================================================
OpcPublisher status @ 25.10.2017 10:08:03
---------------------------------
OPC sessions: 1
connected OPC sessions: 1
connected OPC subscriptions: 1
OPC monitored items: 497
---------------------------------
monitored items queue bounded capacity: 8192
monitored items queue current items: 8192
monitored item notifications enqueued: 57323
monitored item notifications enqueue failure: 48116
monitored item notifications dequeued: 1015
---------------------------------
messages sent to IoTHub: 1014
bytes sent to IoTHub: 237292
avg msg size: 234
time in ms for sent msgs: 114268
min time in ms for msg: 84
max time in ms for msg: 330
avg time in ms for msg: 112
msg send failures: 0
time in ms for failed msgs: 0
avg time in ms for failed msg: 0
messages too large to sent to IoTHub: 0
times we missed send interval: 0
---------------------------------
current working set in MB: 92
======================================================================
This configuration sends for each OPC node value change a message to IoTHub. You see the average message size of 234 byte is pretty small and the average time required to send such a message was
112 ms - which is compared to larger message sizes - a high value. The advantage of this configuration is that Publisher does not add any latency to the ingest data path. The number of
lost OPC node value updates (`monitored item notifications enqueue failure: 48116`) is the highest of all compared configurations.
#### Maximum batching (--si 0 --ms 262144)
======================================================================
OpcPublisher status @ 25.10.2017 10:05:23
---------------------------------
OPC sessions: 1
connected OPC sessions: 1
connected OPC subscriptions: 1
OPC monitored items: 497
---------------------------------
monitored items queue bounded capacity: 8192
monitored items queue current items: 0
monitored item notifications enqueued: 57086
monitored item notifications enqueue failure: 0
monitored item notifications dequeued: 57086
---------------------------------
messages sent to IoTHub: 50
bytes sent to IoTHub: 13096746
avg msg size: 261934
time in ms for sent msgs: 8648
min time in ms for msg: 136
max time in ms for msg: 596
avg time in ms for msg: 172
msg send failures: 0
time in ms for failed msgs: 0
avg time in ms for failed msg: 0
messages too large to sent to IoTHub: 0
times we missed send interval: 0
---------------------------------
current working set in MB: 89
======================================================================
This configuration batches as much OPC node value udpates as possible. The maximum IoTHub message size is 256 kB, which is configured here. There is no send interval requested, which makes the time when data is ingested
completly controlled by the data itself. This configuration has the least probability of loosing any OPC node values and could be used for publishing a high number of nodes.
When using this configuration you need to ensure, that your scenario does not have conditions where high latency is introduced (because the message size of 256 kB is not reached).
# Debugging the Application
## Native on Winodws

112
src/Diagnostics.cs Normal file
Просмотреть файл

@ -0,0 +1,112 @@

using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace OpcPublisher
{
using static OpcPublisher.Workarounds.TraceWorkaround;
using static IotHubMessaging;
using static PublisherNodeConfiguration;
/// <summary>
/// Class to enable output to the console.
/// </summary>
public static class Diagnostics
{
public static uint DiagnosticsInterval
{
get => _diagnosticsInterval;
set => _diagnosticsInterval = value;
}
private static uint _diagnosticsInterval = 0;
public static int IotHubMessagingMessagesSentCount
{
get => _messagesSentCount;
}
private static int _messagesSentCount = 0;
private static CancellationTokenSource _shutdownTokenSource;
private static Task _showDiagnosticsInfoTask;
public static void Init()
{
// init data
_showDiagnosticsInfoTask = null;
_shutdownTokenSource = new CancellationTokenSource();
// kick off the task to show diagnostic info
if (_diagnosticsInterval > 0)
{
_showDiagnosticsInfoTask = Task.Run(async () => await ShowDiagnosticsInfoAsync(_shutdownTokenSource.Token));
}
}
public async static Task Shutdown()
{
// wait for diagnostic task completion if it is enabled
if (_showDiagnosticsInfoTask != null)
{
_shutdownTokenSource.Cancel();
await _showDiagnosticsInfoTask;
}
}
/// <summary>
/// Kicks of the task to show diagnostic information each 30 seconds.
/// </summary>
public static async Task ShowDiagnosticsInfoAsync(CancellationToken cancellationtoken)
{
while (true)
{
try
{
if (cancellationtoken.IsCancellationRequested)
{
return;
}
await Task.Delay((int)_diagnosticsInterval * 1000);
Trace("======================================================================");
Trace($"OpcPublisher status @ {System.DateTime.UtcNow}");
Trace("---------------------------------");
Trace($"OPC sessions: {NumberOfOpcSessions}");
Trace($"connected OPC sessions: {NumberOfConnectedOpcSessions}");
Trace($"connected OPC subscriptions: {NumberOfConnectedOpcSubscriptions}");
Trace($"OPC monitored items: {NumberOfMonitoredItems}");
Trace("---------------------------------");
Trace($"monitored items queue bounded capacity: {MonitoredItemsQueueCapacity}");
Trace($"monitored items queue current items: {MonitoredItemsQueueCount}");
Trace($"monitored item notifications enqueued: {EnqueueCount}");
Trace($"monitored item notifications enqueue failure: {EnqueueFailureCount}");
Trace($"monitored item notifications dequeued: {DequeueCount}");
Trace("---------------------------------");
Trace($"messages sent to IoTHub: {SentMessages}");
Trace($"bytes sent to IoTHub: {SentBytes}");
Trace($"avg msg size: {SentBytes / (SentMessages == 0 ? 1 : SentMessages)}");
Trace($"time in ms for sent msgs: {SentTime}");
Trace($"min time in ms for msg: {MinSentTime}");
Trace($"max time in ms for msg: {MaxSentTime}");
Trace($"avg time in ms for msg: {SentTime / (SentMessages == 0 ? 1 : SentMessages)}");
Trace($"msg send failures: {FailedMessages}");
Trace($"time in ms for failed msgs: {FailedTime}");
Trace($"avg time in ms for failed msg: {FailedTime / (FailedMessages == 0 ? 1 : FailedMessages)}");
Trace($"messages too large to sent to IoTHub: {TooLargeCount}");
Trace($"times we missed send interval: {MissedSendIntervalCount}");
Trace("---------------------------------");
Trace($"current working set in MB: {Process.GetCurrentProcess().WorkingSet64 / (1024 * 1024)}");
Trace("======================================================================");
}
catch
{
}
}
}
}
}

Просмотреть файл

@ -1,8 +1,5 @@

using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -13,6 +10,9 @@ namespace OpcPublisher
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Client;
using Opc.Ua;
using System;
using System.Diagnostics;
using System.IO;
using static Opc.Ua.CertificateStoreType;
using static OpcPublisher.Workarounds.TraceWorkaround;
using static OpcStackConfiguration;
@ -36,12 +36,13 @@ namespace OpcPublisher
}
private static Microsoft.Azure.Devices.Client.TransportType _iotHubProtocol = Microsoft.Azure.Devices.Client.TransportType.Mqtt;
public static uint MaxSizeOfIoTHubMessageBytes
public const uint IotHubMessageSizeMax = (256 * 1024);
public static uint IotHubMessageSize
{
get => _maxSizeOfIoTHubMessageBytes;
set => _maxSizeOfIoTHubMessageBytes = value;
get => _iotHubMessageSize;
set => _iotHubMessageSize = value;
}
private static uint _maxSizeOfIoTHubMessageBytes = 4096;
private static uint _iotHubMessageSize = 4096;
public static int DefaultSendIntervalSeconds
{
@ -66,6 +67,88 @@ namespace OpcPublisher
}
private static string _iotDeviceCertStorePath = IotDeviceCertX509StorePathDefault;
public static int MonitoredItemsQueueCapacity
{
get => _monitoredItemsQueueCapacity;
set => _monitoredItemsQueueCapacity = value;
}
private static int _monitoredItemsQueueCapacity = 8192;
public static long MonitoredItemsQueueCount => _monitoredItemsDataQueue.Count;
public static long EnqueueCount
{
get => _enqueueCount;
}
private static long _enqueueCount;
public static long EnqueueFailureCount
{
get => _enqueueFailureCount;
}
private static long _enqueueFailureCount;
public static long DequeueCount
{
get => _dequeueCount;
}
private static long _dequeueCount;
public static long MissedSendIntervalCount
{
get => _missedSendIntervalCount;
}
private static long _missedSendIntervalCount;
public static long TooLargeCount
{
get => _tooLargeCount;
}
private static long _tooLargeCount;
public static long SentBytes
{
get => _sentBytes;
}
private static long _sentBytes;
public static long SentMessages
{
get => _sentMessages;
}
private static long _sentMessages;
public static long SentTime
{
get => _sentTime;
}
private static long _sentTime;
public static long MinSentTime
{
get => _minSentTime;
}
private static long _minSentTime = long.MaxValue;
public static long MaxSentTime
{
get => _maxSentTime;
}
private static long _maxSentTime;
public static long FailedMessages
{
get => _failedMessages;
}
private static long _failedMessages;
public static long FailedTime
{
get => _failedTime;
}
private static long _failedTime;
/// <summary>
/// Classes for the telemetry message sent to IoTHub.
/// </summary>
@ -83,26 +166,16 @@ namespace OpcPublisher
public string SourceTimestamp;
}
private ConcurrentQueue<string> _sendQueue;
private int _currentSizeOfIotHubMessageBytes;
private List<OpcUaMessage> _messageList;
private SemaphoreSlim _messageListSemaphore;
private CancellationTokenSource _tokenSource;
private Task _dequeueAndSendTask;
private Timer _sendTimer;
private AutoResetEvent _sendQueueEvent;
private DeviceClient _iotHubClient;
private static BlockingCollection<string> _monitoredItemsDataQueue;
private static CancellationTokenSource _tokenSource;
private static Task _monitoredItemsProcessorTask;
private static DeviceClient _iotHubClient;
/// <summary>
/// Ctor for the class.
/// </summary>
public IotHubMessaging()
{
_sendQueue = new ConcurrentQueue<string>();
_sendQueueEvent = new AutoResetEvent(false);
_messageList = new List<OpcUaMessage>();
_messageListSemaphore = new SemaphoreSlim(1);
_currentSizeOfIotHubMessageBytes = 0;
_monitoredItemsDataQueue = new BlockingCollection<string>(_monitoredItemsQueueCapacity);
}
/// <summary>
@ -172,7 +245,7 @@ namespace OpcPublisher
{
Trace($"Create Publisher IoTHub client with device connection string: '{deviceConnectionString}' using '{IotHubProtocol}' for communication.");
_iotHubClient = DeviceClient.CreateFromConnectionString(deviceConnectionString, IotHubProtocol);
ExponentialBackoff exponentialRetryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(5000), TimeSpan.FromMilliseconds(100));
ExponentialBackoff exponentialRetryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(2), TimeSpan.FromMilliseconds(1024), TimeSpan.FromMilliseconds(3));
_iotHubClient.SetRetryPolicy(exponentialRetryPolicy);
await _iotHubClient.OpenAsync();
}
@ -184,11 +257,11 @@ namespace OpcPublisher
}
// start up task to send telemetry to IoTHub.
_dequeueAndSendTask = null;
_monitoredItemsProcessorTask = null;
_tokenSource = new CancellationTokenSource();
Trace("Creating task to send OPC UA messages in batches to IoT Hub...");
_dequeueAndSendTask = Task.Run(() => DeQueueMessagesAsync(_tokenSource.Token), _tokenSource.Token);
Trace("Creating task process and batch monitored item data updates...");
_monitoredItemsProcessorTask = Task.Run(async () => await MonitoredItemsProcessor(_tokenSource.Token), _tokenSource.Token);
}
catch (Exception e)
{
@ -218,20 +291,12 @@ namespace OpcPublisher
try
{
_tokenSource.Cancel();
await _dequeueAndSendTask;
await _monitoredItemsProcessorTask;
if (_iotHubClient != null)
{
await _iotHubClient.CloseAsync();
}
if (_sendTimer != null)
{
_sendTimer.Dispose();
}
if (_sendQueueEvent != null)
{
_sendQueueEvent.Dispose();
}
}
catch (Exception e)
{
@ -239,151 +304,191 @@ namespace OpcPublisher
}
}
//
// Enqueue a message for batch send.
//
/// <summary>
/// Enqueue a message for sending to IoTHub.
/// </summary>
/// <param name="json"></param>
public void Enqueue(string json)
{
_sendQueue.Enqueue(json);
_sendQueueEvent.Set();
// Try to add the message.
_enqueueCount++;
if (_monitoredItemsDataQueue.TryAdd(json) == false)
{
_enqueueFailureCount++;
if (_enqueueFailureCount % 10000 == 0)
{
Trace(Utils.TraceMasks.Error, $"The internal monitored item message queue is above its capacity of {_monitoredItemsDataQueue.BoundedCapacity}. We have already lost {_enqueueFailureCount} monitored item notifications:(");
}
}
}
/// <summary>
/// Dequeue telemetry messages, compose them for batch send (if needed) and prepares them for sending to IoTHub.
/// Dequeue monitored item notification messages, batch them for send (if needed) and send them to IoTHub.
/// </summary>
private async Task DeQueueMessagesAsync(CancellationToken ct)
private async Task MonitoredItemsProcessor(CancellationToken ct)
{
try
string contentPropertyKey = "content-type";
string contentPropertyValue = "application/opcua+uajson";
string devicenamePropertyKey = "devicename";
string devicenamePropertyValue = ApplicationName;
int propertyLength = contentPropertyKey.Length + contentPropertyValue.Length + devicenamePropertyKey.Length + devicenamePropertyValue.Length;
// if batching is requested the buffer will have the requested size, otherwise we reserve the max size
uint iotHubMessageBufferSize = (_iotHubMessageSize > 0 ? _iotHubMessageSize : IotHubMessageSizeMax) - (uint)propertyLength;
byte[] iotHubMessageBuffer = new byte[iotHubMessageBufferSize];
MemoryStream iotHubMessage = new MemoryStream(iotHubMessageBuffer);
DateTime nextSendTime = DateTime.UtcNow + TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
int millisecondsTillNextSend = TimeSpan.FromSeconds(_defaultSendIntervalSeconds).Milliseconds;
using (iotHubMessage)
{
if (_defaultSendIntervalSeconds > 0 && _maxSizeOfIoTHubMessageBytes > 0)
try
{
// send every x seconds
Trace($"Start timer to send data to IoTHub in {_defaultSendIntervalSeconds} seconds.");
_sendTimer = new Timer(async state => await SendToIoTHubAsync(), null, TimeSpan.FromSeconds(_defaultSendIntervalSeconds), TimeSpan.FromSeconds(_defaultSendIntervalSeconds));
}
string jsonMessage = string.Empty;
bool needToBufferMessage = false;
int jsonMessageSize = 0;
WaitHandle[] waitHandles = { _sendQueueEvent, ct.WaitHandle };
while (true)
{
// wait till some work needs to be done
WaitHandle.WaitAny(waitHandles);
// do we need to stop
if (ct.IsCancellationRequested)
iotHubMessage.Position = 0;
iotHubMessage.SetLength(0);
while (true)
{
Trace($"Cancellation requested. Sending {_sendQueue.Count} remaining messages.");
await SendToIoTHubAsync();
break;
}
if (_sendQueue.Count > 0)
{
bool isPeekSuccessful = false;
bool isDequeueSuccessful = false;
string messageInJson = string.Empty;
int nextMessageSizeBytes = 0;
// perform a TryPeek to determine size of next message
// and whether it will fit. If so, dequeue message and add it to the list.
// if it cannot fit, send current message to IoTHub, reset it, and repeat.
isPeekSuccessful = _sendQueue.TryPeek(out messageInJson);
// get size of next message in the queue
if (isPeekSuccessful)
// sanity check the send interval, compute the timeout and get the next monitored item message
if (_defaultSendIntervalSeconds > 0)
{
nextMessageSizeBytes = Encoding.UTF8.GetByteCount(messageInJson);
// sanity check that the user has set a large enough IoTHub messages size.
if (nextMessageSizeBytes > _maxSizeOfIoTHubMessageBytes && _maxSizeOfIoTHubMessageBytes > 0)
millisecondsTillNextSend = nextSendTime.Subtract(DateTime.UtcNow).Milliseconds;
if (millisecondsTillNextSend < 0)
{
Trace(Utils.TraceMasks.Error, $"There is a telemetry message (size: {nextMessageSizeBytes}), which will not fit into an IoTHub message (max size: {_maxSizeOfIoTHubMessageBytes}].");
Trace(Utils.TraceMasks.Error, $"Please check your IoTHub message size settings. The telemetry message will be discarded silently. Sorry:(");
_sendQueue.TryDequeue(out messageInJson);
continue;
_missedSendIntervalCount++;
// do not wait if we missed the send interval
millisecondsTillNextSend = 0;
}
}
// determine if it will fit into remaining space of the IoTHub message or if we do not batch at all
// if so, dequeue it
if (_currentSizeOfIotHubMessageBytes + nextMessageSizeBytes < _maxSizeOfIoTHubMessageBytes || _maxSizeOfIoTHubMessageBytes == 0)
else
{
isDequeueSuccessful = _sendQueue.TryDequeue(out messageInJson);
// if we are in shutdown do not wait, else wait infinite if send interval is not set
millisecondsTillNextSend = ct.IsCancellationRequested ? 0 : Timeout.Infinite;
}
bool gotItem = _monitoredItemsDataQueue.TryTake(out jsonMessage, millisecondsTillNextSend);
// add dequeued message to list
if (isDequeueSuccessful)
// the two commandline parameter --ms (message size) and --si (send interval) control when data is sent to IoTHub
// pls see detailed comments on performance and memory consumption at https://github.com/Azure/iot-edge-opc-publisher
// check if we got an item or if we hit the timeout or got canceled
if (gotItem)
{
_dequeueCount++;
jsonMessageSize = Encoding.UTF8.GetByteCount(jsonMessage.ToString());
// sanity check that the user has set a large enough IoTHub messages size
if ((_iotHubMessageSize > 0 && jsonMessageSize > _iotHubMessageSize) || (_iotHubMessageSize == 0 && jsonMessageSize > iotHubMessageBufferSize))
{
OpcUaMessage msgPayload = JsonConvert.DeserializeObject<OpcUaMessage>(messageInJson);
await _messageListSemaphore.WaitAsync();
_messageList.Add(msgPayload);
_messageListSemaphore.Release();
_currentSizeOfIotHubMessageBytes = _currentSizeOfIotHubMessageBytes + nextMessageSizeBytes;
Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {nextMessageSizeBytes} to IoTHub message (size is now {_currentSizeOfIotHubMessageBytes}). {_sendQueue.Count} message(s) in send queue.");
Trace(Utils.TraceMasks.Error, $"There is a telemetry message (size: {jsonMessageSize}), which will not fit into an IoTHub message (max size: {_iotHubMessageSize}].");
Trace(Utils.TraceMasks.Error, $"Please check your IoTHub message size settings. The telemetry message will be discarded silently. Sorry:(");
_tooLargeCount++;
continue;
}
// fall through, if we should send immediately
if (_maxSizeOfIoTHubMessageBytes != 0)
// if batching is requested or we need to send at intervals, batch it otherwise send it right away
if (_iotHubMessageSize > 0 || (_iotHubMessageSize == 0 && _defaultSendIntervalSeconds > 0))
{
// if there is still space to batch, do it. otherwise send the buffer and flag the message for later buffering
if (iotHubMessage.Position + jsonMessageSize <= iotHubMessage.Capacity)
{
iotHubMessage.Write(Encoding.UTF8.GetBytes(jsonMessage.ToString()), 0, jsonMessageSize);
Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {jsonMessageSize} to IoTHub message (size is now {iotHubMessage.Position}).");
continue;
}
else
{
needToBufferMessage = true;
}
}
}
else
{
// if we got no message, we either reached the interval or we are in shutdown and have processed all messages
if (ct.IsCancellationRequested)
{
Trace($"Cancellation requested.");
_monitoredItemsDataQueue.CompleteAdding();
_monitoredItemsDataQueue.Dispose();
break;
}
}
// the message needs to be sent now.
Trace(Utils.TraceMasks.OperationDetail, $"IoTHub message complete. Trigger send of message with size {_currentSizeOfIotHubMessageBytes} to IoTHub.");
await SendToIoTHubAsync();
}
}
}
catch (Exception e)
{
Trace(e, "Error while dequeuing messages.");
}
}
// the batching is completed or we reached the send interval or got a cancelation request
try
{
Microsoft.Azure.Devices.Client.Message encodedIotHubMessage = null;
/// <summary>
/// Send messages to IoT Hub
/// </summary>
private async Task SendToIoTHubAsync()
{
if (_messageList.Count > 0)
{
// process all queued messages
await _messageListSemaphore.WaitAsync();
string msgListInJson = JsonConvert.SerializeObject(_messageList);
var encodedMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(msgListInJson));
_currentSizeOfIotHubMessageBytes = 0;
_messageList.Clear();
_messageListSemaphore.Release();
// if we reached the send interval, but have nothing to send, we continue
if (!gotItem && iotHubMessage.Position == 0)
{
nextSendTime += TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
iotHubMessage.Position = 0;
iotHubMessage.SetLength(0);
continue;
}
// publish
encodedMessage.Properties.Add("content-type", "application/opcua+uajson");
encodedMessage.Properties.Add("deviceName", ApplicationName);
// if there is no batching and not interval configured, we send the JSON message we just got, otherwise we send the buffer
if (_iotHubMessageSize == 0 && _defaultSendIntervalSeconds == 0)
{
encodedIotHubMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(jsonMessage.ToString()));
}
else
{
encodedIotHubMessage = new Microsoft.Azure.Devices.Client.Message(iotHubMessage.ToArray());
}
encodedIotHubMessage.Properties.Add(contentPropertyKey, contentPropertyValue);
encodedIotHubMessage.Properties.Add(devicenamePropertyKey, devicenamePropertyValue);
if (_iotHubClient != null)
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
nextSendTime += TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
try
{
_sentBytes += encodedIotHubMessage.GetBytes().Length;
await _iotHubClient.SendEventAsync(encodedIotHubMessage);
stopwatch.Stop();
_sentMessages++;
_sentTime += stopwatch.ElapsedMilliseconds;
_maxSentTime = Math.Max(_maxSentTime, stopwatch.ElapsedMilliseconds);
_minSentTime = Math.Min(_minSentTime, stopwatch.ElapsedMilliseconds);
Trace(Utils.TraceMasks.OperationDetail, $"Sending {encodedIotHubMessage.BodyStream.Length} bytes to IoTHub took {stopwatch.ElapsedMilliseconds} ms.");
}
catch
{
stopwatch.Stop();
_failedMessages++;
_failedTime += stopwatch.ElapsedMilliseconds;
}
try
{
if (_iotHubClient != null)
{
Trace(Utils.TraceMasks.OperationDetail, "Send data to IoTHub.");
await _iotHubClient.SendEventAsync(encodedMessage);
}
else
{
Trace("No IoTHub client available. Dropping messages...");
// reset the messaage
iotHubMessage.Position = 0;
iotHubMessage.SetLength(0);
// if we had not yet buffered the last message because there was not enough space, buffer it now
if (needToBufferMessage)
{
iotHubMessage.Write(Encoding.UTF8.GetBytes(jsonMessage.ToString()), 0, jsonMessageSize);
}
}
else
{
Trace("No IoTHub client available. Dropping messages...");
}
}
catch (Exception e)
{
Trace(e, "Exception while sending message to IoTHub. Dropping message...");
}
}
}
catch (Exception e)
{
Trace(e, "Exception while sending message to IoTHub. Dropping messages...");
Trace(e, "Error while processing monitored item messages.");
}
}
// Restart timer
if (_sendTimer != null)
{
// send in x seconds
Trace(Utils.TraceMasks.OperationDetail, $"Restart timer to send data to IoTHub in {_defaultSendIntervalSeconds} second(s).");
_sendTimer.Change(TimeSpan.FromSeconds(_defaultSendIntervalSeconds), TimeSpan.FromSeconds(_defaultSendIntervalSeconds));
}
}
}
}

Просмотреть файл

@ -196,7 +196,6 @@ namespace OpcPublisher
Trace(Utils.TraceMasks.OperationDetail, $" DisplayName: {monitoredItem.DisplayName}");
Trace(Utils.TraceMasks.OperationDetail, $" Value: {value}");
IotHubCommunication.Enqueue(json);
}
catch (Exception e)
{
@ -249,6 +248,40 @@ namespace OpcPublisher
}
private static string _shopfloorDomain;
public int GetNumberOfOpcSubscriptions()
{
int result = 0;
try
{
_opcSessionSemaphore.WaitAsync();
result = OpcSubscriptions.Count();
}
finally
{
_opcSessionSemaphore.Release();
}
return result;
}
public int GetNumberOfOpcMonitoredItems()
{
int result = 0;
try
{
_opcSessionSemaphore.WaitAsync();
var subscriptions = OpcSessions.SelectMany(s => s.OpcSubscriptions);
foreach (var subscription in subscriptions)
{
result += subscription.OpcMonitoredItems.Count(i => i.State == OpcMonitoredItemState.Monitored);
}
}
finally
{
_opcSessionSemaphore.Release();
}
return result;
}
public Uri EndpointUri;
public Session OpcUaClientSession;
public SessionState State;
@ -286,20 +319,20 @@ namespace OpcPublisher
/// - unused subscriptions (without any nodes to monitor) are removed.
/// - sessions with out subscriptions are removed.
/// </summary>
public async Task ConnectAndMonitorAsync()
public async Task ConnectAndMonitorAsync(CancellationToken ct)
{
bool updateConfigFileRequired = false;
try
{
await ConnectSession();
await ConnectSession(ct);
updateConfigFileRequired = await MonitorNodes();
updateConfigFileRequired = await MonitorNodes(ct);
updateConfigFileRequired |= await StopMonitoringNodes();
updateConfigFileRequired |= await StopMonitoringNodes(ct);
await RemoveUnusedSubscriptions();
await RemoveUnusedSubscriptions(ct);
await RemoveUnusedSessions();
await RemoveUnusedSessions(ct);
// update the config file if required
if (updateConfigFileRequired)
@ -316,14 +349,14 @@ namespace OpcPublisher
/// <summary>
/// Connects the session if it is disconnected.
/// </summary>
public async Task ConnectSession()
public async Task ConnectSession(CancellationToken ct)
{
try
{
await _opcSessionSemaphore.WaitAsync();
// if the session is not disconnected, return
if (State != SessionState.Disconnected)
// if the session is already connected or shutdown in progress, return
if (State == SessionState.Connected || ct.IsCancellationRequested)
{
return;
}
@ -415,15 +448,15 @@ namespace OpcPublisher
/// <summary>
/// Monitoring for a node starts if it is required.
/// </summary>
public async Task<bool> MonitorNodes()
public async Task<bool> MonitorNodes(CancellationToken ct)
{
bool requestConfigFileUpdate = false;
try
{
await _opcSessionSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
// if the session is not connected or shutdown in progress, return
if (State != SessionState.Connected || ct.IsCancellationRequested)
{
return requestConfigFileUpdate;
}
@ -443,11 +476,19 @@ namespace OpcPublisher
// process all unmonitored items.
var unmonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => (i.State == OpcMonitoredItemState.Unmonitored || i.State == OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested));
Trace($"Start monitoring nodes on endpoint '{EndpointUri.AbsoluteUri}'.");
int additionalMonitoredItemsCount = 0;
int monitoredItemsCount = 0;
bool haveUnmonitoredItems = false;
if (unmonitoredItems.Count() != 0)
{
haveUnmonitoredItems = true;
monitoredItemsCount = opcSubscription.OpcMonitoredItems.Count(i => (i.State == OpcMonitoredItemState.Monitored));
Trace($"Start monitoring items on endpoint '{EndpointUri.AbsoluteUri}'. Currently monitoring {monitoredItemsCount} items.");
}
foreach (var item in unmonitoredItems)
{
// if the session is disconnected, we stop trying and wait for the next cycle
if (State == SessionState.Disconnected)
// if the session is disconnected or a shutdown is in progress, we stop trying and wait for the next cycle
if (State == SessionState.Disconnected || ct.IsCancellationRequested)
{
break;
}
@ -536,13 +577,16 @@ namespace OpcPublisher
item.OpcUaClientMonitoredItem = monitoredItem;
item.State = OpcMonitoredItemState.Monitored;
item.EndpointUri = EndpointUri;
Trace($"Created monitored item for node '{currentNodeId.ToString()}' in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
Trace($"Created monitored item for node '{currentNodeId.ToString()}' in subscription with id '{opcSubscription.OpcUaClientSubscription.Id}' on endpoint '{EndpointUri.AbsoluteUri}'");
if (item.RequestedSamplingInterval != monitoredItem.SamplingInterval)
{
Trace($"Sampling interval: requested: {item.RequestedSamplingInterval}; revised: {monitoredItem.SamplingInterval}");
item.SamplingInterval = monitoredItem.SamplingInterval;
}
requestConfigFileUpdate = true;
if (additionalMonitoredItemsCount++ % 50 == 0)
{
Trace($"Now monitoring {monitoredItemsCount + additionalMonitoredItemsCount} items in subscription with id '{opcSubscription.OpcUaClientSubscription.Id}'");
}
}
catch (Exception e) when (e.GetType() == typeof(ServiceResultException))
{
@ -575,7 +619,14 @@ namespace OpcPublisher
Trace(e, $"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'");
}
}
if (haveUnmonitoredItems == true)
{
monitoredItemsCount = opcSubscription.OpcMonitoredItems.Count(i => (i.State == OpcMonitoredItemState.Monitored));
Trace($"Done processing unmonitored items on endpoint '{EndpointUri.AbsoluteUri}'. Now monitoring {monitoredItemsCount} items in subscription with id '{opcSubscription.OpcUaClientSubscription.Id}'.");
}
}
// request a config file update, if everything is successfully monitored
requestConfigFileUpdate = true;
}
catch (Exception e)
{
@ -591,15 +642,15 @@ namespace OpcPublisher
/// <summary>
/// Checks if there are monitored nodes tagged to stop monitoring.
/// </summary>
public async Task<bool> StopMonitoringNodes()
public async Task<bool> StopMonitoringNodes(CancellationToken ct)
{
bool requestConfigFileUpdate = false;
try
{
await _opcSessionSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
// if session is not connected or shutdown is in progress, return
if (State != SessionState.Connected || ct.IsCancellationRequested)
{
return requestConfigFileUpdate;
}
@ -637,14 +688,14 @@ namespace OpcPublisher
/// <summary>
/// Checks if there are subscriptions without any monitored items and remove them.
/// </summary>
public async Task RemoveUnusedSubscriptions()
public async Task RemoveUnusedSubscriptions(CancellationToken ct)
{
try
{
await _opcSessionSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
// if session is not connected or shutdown is in progress, return
if (State != SessionState.Connected || ct.IsCancellationRequested)
{
return;
}
@ -670,14 +721,14 @@ namespace OpcPublisher
/// <summary>
/// Checks if there are session without any subscriptions and remove them.
/// </summary>
public async Task RemoveUnusedSessions()
public async Task RemoveUnusedSessions(CancellationToken ct)
{
try
{
await OpcSessionsListSemaphore.WaitAsync();
// if session is not connected, return
if (State != SessionState.Connected)
// if session is not connected or shutdown is in progress, return
if (State != SessionState.Connected || ct.IsCancellationRequested)
{
return;
}
@ -805,13 +856,13 @@ namespace OpcPublisher
/// Adds a node to be monitored. If there is no subscription with the requested publishing interval,
/// one is created.
/// </summary>
public async Task AddNodeForMonitoring(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval)
public async Task AddNodeForMonitoring(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval, CancellationToken ct)
{
try
{
await _opcSessionSemaphore.WaitAsync();
if (ShutdownTokenSource.IsCancellationRequested)
if (ct.IsCancellationRequested)
{
return;
}
@ -846,7 +897,7 @@ namespace OpcPublisher
// update the publishing data
// Start publishing.
Task.Run(async () => await ConnectAndMonitorAsync());
Task.Run(async () => await ConnectAndMonitorAsync(ct));
}
}
catch (Exception e)
@ -862,13 +913,13 @@ namespace OpcPublisher
/// <summary>
/// Tags a monitored node to stop monitoring and remove it.
/// </summary>
public async Task RequestMonitorItemRemoval(NodeId nodeId, ExpandedNodeId expandedNodeId)
public async Task RequestMonitorItemRemoval(NodeId nodeId, ExpandedNodeId expandedNodeId, CancellationToken ct)
{
try
{
await _opcSessionSemaphore.WaitAsync();
if (ShutdownTokenSource.IsCancellationRequested)
if (ct.IsCancellationRequested)
{
return;
}
@ -889,7 +940,7 @@ namespace OpcPublisher
}
// Stop publishing.
Task.Run(async () => await ConnectAndMonitorAsync());
Task.Run(async () => await ConnectAndMonitorAsync(ct));
}
catch (Exception e)
{
@ -977,7 +1028,7 @@ namespace OpcPublisher
{
foreach (var opcSubscription in OpcSubscriptions)
{
Trace($"Removing {opcSubscription.OpcUaClientSubscription.MonitoredItemCount} monitored items from subscription with id {opcSubscription.OpcUaClientSubscription.Id}.");
Trace($"Removing {opcSubscription.OpcUaClientSubscription.MonitoredItemCount} monitored items from subscription with id '{opcSubscription.OpcUaClientSubscription.Id}'.");
opcSubscription.OpcUaClientSubscription.RemoveItems(opcSubscription.OpcUaClientSubscription.MonitoredItems);
}
Trace($"Removing {OpcUaClientSession.SubscriptionCount} subscriptions from session.");

Просмотреть файл

@ -19,6 +19,7 @@ namespace OpcPublisher
using static OpcStackConfiguration;
using static PublisherNodeConfiguration;
using static System.Console;
using static Diagnostics;
public class Program
{
@ -115,16 +116,29 @@ namespace OpcPublisher
}
}
},
{ "vc|verboseconsole=", $"the output of publisher is shown on the console.\nDefault: {VerboseConsole}", (bool b) => VerboseConsole = b },
{ "mq|monitoreditemqueuecapacity=", $"specify how many notifications of monitored items could be stored in the internal queue, if the data could not be sent quick enough to IoTHub\nMin: 1024\nDefault: {MonitoredItemsQueueCapacity}", (int i) => {
if (i >= 1024)
{
MonitoredItemsQueueCapacity = i;
}
else
{
throw new OptionException("The monitoreditemqueueitems must be greater than 1024", "monitoreditemqueueitems");
}
}
},
{ "di|diagnosticsinterval=", $"shows publisher diagnostic info at the specified interval in seconds. 0 disables diagnostic output.\nDefault: {DiagnosticsInterval}", (uint u) => DiagnosticsInterval = u },
{ "vc|verboseconsole=", $"the output of publisher is shown on the console.\nDefault: {VerboseConsole}", (bool b) => VerboseConsole = b },
// IoTHub specific options
{ "ih|iothubprotocol=", $"the protocol to use for communication with Azure IoTHub (allowed values: {string.Join(", ", Enum.GetNames(IotHubProtocol.GetType()))}).\nDefault: {Enum.GetName(IotHubProtocol.GetType(), IotHubProtocol)}",
(Microsoft.Azure.Devices.Client.TransportType p) => IotHubProtocol = p
},
{ "ms|iothubmessagesize=", $"the max size of a message which can be send to IoTHub. when telemetry of this size is available it will be sent.\n0 will enforce immediate send when telemetry is available\nMin: 0\nMax: 256 * 1024\nDefault: {MaxSizeOfIoTHubMessageBytes}", (uint u) => {
if (u >= 0 && u <= 256 * 1024)
{ "ms|iothubmessagesize=", $"the max size of a message which can be send to IoTHub. when telemetry of this size is available it will be sent.\n0 will enforce immediate send when telemetry is available\nMin: 0\nMax: {IotHubMessageSizeMax}\nDefault: {IotHubMessageSize}", (uint u) => {
if (u >= 0 && u <= IotHubMessageSizeMax)
{
MaxSizeOfIoTHubMessageBytes = u;
IotHubMessageSize = u;
}
else
{
@ -381,7 +395,10 @@ namespace OpcPublisher
WriteLine("Publisher is starting up...");
// Init shutdown token source.
// initialize publisher diagnostics
Diagnostics.Init();
// Shutdown token sources.
ShutdownTokenSource = new CancellationTokenSource();
// init OPC configuration and tracing
@ -474,6 +491,9 @@ namespace OpcPublisher
// shutdown the IoTHub messaging
await IotHubCommunication.Shutdown();
// shutdown diagnostics
await Diagnostics.Shutdown();
}
catch (Exception e)
{
@ -515,7 +535,7 @@ namespace OpcPublisher
{
// get tasks for all disconnected sessions and start them
await OpcSessionsListSemaphore.WaitAsync();
var singleSessionHandlerTaskList = OpcSessions.Select(s => s.ConnectAndMonitorAsync());
var singleSessionHandlerTaskList = OpcSessions.Select(s => s.ConnectAndMonitorAsync(cancellationtoken));
OpcSessionsListSemaphore.Release();
await Task.WhenAll(singleSessionHandlerTaskList);
}

Просмотреть файл

@ -27,6 +27,86 @@ namespace OpcPublisher
}
private static string _publisherNodeConfigurationFilename = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
public static int NumberOfOpcSessions
{
get
{
int result = 0;
try
{
OpcSessionsListSemaphore.WaitAsync();
result = OpcSessions.Count();
}
finally
{
OpcSessionsListSemaphore.Release();
}
return result;
}
}
public static int NumberOfConnectedOpcSessions
{
get
{
int result = 0;
try
{
OpcSessionsListSemaphore.WaitAsync();
result = OpcSessions.Count(s => s.State == OpcSession.SessionState.Connected);
}
finally
{
OpcSessionsListSemaphore.Release();
}
return result;
}
}
public static int NumberOfConnectedOpcSubscriptions
{
get
{
int result = 0;
try
{
OpcSessionsListSemaphore.WaitAsync();
var opcSessions = OpcSessions.Where(s => s.State == OpcSession.SessionState.Connected);
foreach (var opcSession in opcSessions)
{
result += opcSession.GetNumberOfOpcSubscriptions();
}
}
finally
{
OpcSessionsListSemaphore.Release();
}
return result;
}
}
public static int NumberOfMonitoredItems
{
get
{
int result = 0;
try
{
OpcSessionsListSemaphore.WaitAsync();
var opcSessions = OpcSessions.Where(s => s.State == OpcSession.SessionState.Connected);
foreach (var opcSession in opcSessions)
{
result += opcSession.GetNumberOfOpcMonitoredItems();
}
}
finally
{
OpcSessionsListSemaphore.Release();
}
return result;
}
}
private List<NodePublishingConfiguration> _nodePublishingConfiguration;
private static List<PublisherConfigurationFileEntry> _configurationFileEntries = new List<PublisherConfigurationFileEntry>();

Просмотреть файл

@ -493,7 +493,7 @@ namespace OpcPublisher
}
// add the node info to the subscription with the default publishing interval, execute syncronously
opcSession.AddNodeForMonitoring(nodeId, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval).Wait();
opcSession.AddNodeForMonitoring(nodeId, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval, ShutdownTokenSource.Token).Wait();
Trace($"PublishNode: Requested to monitor item with NodeId '{nodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})");
}
catch (Exception e)
@ -582,7 +582,7 @@ namespace OpcPublisher
}
// remove the node from the sessions monitored items list.
opcSession.RequestMonitorItemRemoval(nodeId, expandedNodeId).Wait();
opcSession.RequestMonitorItemRemoval(nodeId, expandedNodeId, ShutdownTokenSource.Token).Wait();
Trace("UnpublishNode: Requested to stop monitoring of node.");
}
catch (Exception e)