Refactor publishing path, add diagnostics, update documentation.
This commit is contained in:
Родитель
958f3ea2aa
Коммит
194f30c8b8
526
README.md
526
README.md
|
@ -88,9 +88,10 @@ The syntax of the configuration file is as follows:
|
|||
## Command line options
|
||||
The complete usage of the application can be shown using the `--help` command line option and is as follows:
|
||||
|
||||
OpcPublisher.exe <applicationname> [<iothubconnectionstring>] [<options>]
|
||||
Usage: OpcPublisher.exe <applicationname> [<iothubconnectionstring>] [<options>]
|
||||
|
||||
with:
|
||||
OPC Edge Publisher to subscribe to configured OPC UA servers and send telemetry to Azure IoTHub.
|
||||
To exit the application, just press ENTER while it is running.
|
||||
|
||||
applicationname: the OPC UA application name to use, required
|
||||
The application name is also used to register the publisher under this name in the
|
||||
|
@ -98,175 +99,195 @@ with:
|
|||
|
||||
iothubconnectionstring: the IoTHub owner connectionstring, optional
|
||||
|
||||
The following options are supported:
|
||||
There are a couple of environment variables which can be used to control the application:
|
||||
_HUB_CS: sets the IoTHub owner connectionstring
|
||||
_GW_LOGP: sets the filename of the log file to use
|
||||
_TPC_SP: sets the path to store certificates of trusted stations
|
||||
_GW_PNFP: sets the filename of the publishing configuration file
|
||||
|
||||
--pf, --publishfile=VALUE
|
||||
the filename to configure the nodes to publish.
|
||||
Default: './publishednodes.json'
|
||||
--sd, --shopfloordomain=VALUE
|
||||
the domain of the shopfloor. if specified this
|
||||
domain is appended (delimited by a ':' to the '
|
||||
ApplicationURI' property when telemetry is
|
||||
sent to IoTHub.
|
||||
The value must follow the syntactical rules of a
|
||||
DNS hostname.
|
||||
Default: not set
|
||||
--sw, --sessionconnectwait=VALUE
|
||||
specify the wait time in seconds publisher is
|
||||
trying to connect to disconnected endpoints and
|
||||
starts monitoring unmonitored items
|
||||
Min: 10
|
||||
Default: 10
|
||||
--vc, --verboseconsole=VALUE
|
||||
the output of publisher is shown on the console.
|
||||
Default: False
|
||||
--ih, --iothubprotocol=VALUE
|
||||
the protocol to use for communication with Azure
|
||||
IoTHub (allowed values: Amqp, Http1, Amqp_
|
||||
WebSocket_Only, Amqp_Tcp_Only, Mqtt, Mqtt_
|
||||
WebSocket_Only, Mqtt_Tcp_Only).
|
||||
Default: Mqtt
|
||||
--ms, --iothubmessagesize=VALUE
|
||||
the max size of a message which can be send to
|
||||
IoTHub. when telemetry of this size is available
|
||||
it will be sent.
|
||||
0 will enforce immediate send when telemetry is
|
||||
available
|
||||
Min: 0
|
||||
Max: 256 * 1024
|
||||
Default: 4096
|
||||
--si, --iothubsendinterval=VALUE
|
||||
the interval in seconds when telemetry should be
|
||||
send to IoTHub. If 0, then only the
|
||||
iothubmessagesize parameter controls when
|
||||
telemetry is sent.
|
||||
Default: '1'
|
||||
--lf, --logfile=VALUE the filename of the logfile to use.
|
||||
Default: './Logs/<applicationname>.log.txt'
|
||||
--pn, --portnum=VALUE the server port of the publisher OPC server
|
||||
endpoint.
|
||||
Default: 62222
|
||||
--pa, --path=VALUE the enpoint URL path part of the publisher OPC
|
||||
server endpoint.
|
||||
Default: '/UA/Publisher'
|
||||
--lr, --ldsreginterval=VALUE
|
||||
the LDS(-ME) registration interval in ms. If 0,
|
||||
then the registration is disabled.
|
||||
Default: 0
|
||||
--ot, --operationtimeout=VALUE
|
||||
the operation timeout of the publisher OPC UA
|
||||
client in ms.
|
||||
Default: 120000
|
||||
--oi, --opcsamplinginterval=VALUE
|
||||
the publisher is using this as default value in
|
||||
milliseconds to request the servers to sample
|
||||
the nodes with this interval
|
||||
this value might be revised by the OPC UA
|
||||
servers to a supported sampling interval.
|
||||
please check the OPC UA specification for
|
||||
details how this is handled by the OPC UA stack.
|
||||
a negative value will set the sampling interval
|
||||
to the publishing interval of the subscription
|
||||
this node is on.
|
||||
0 will configure the OPC UA server to sample in
|
||||
the highest possible resolution and should be
|
||||
taken with care.
|
||||
Default: 1000
|
||||
--op, --opcpublishinginterval=VALUE
|
||||
the publisher is using this as default value in
|
||||
milliseconds for the publishing interval setting
|
||||
of the subscriptions established to the OPC UA
|
||||
servers.
|
||||
please check the OPC UA specification for
|
||||
details how this is handled by the OPC UA stack.
|
||||
a value less than or equal zero will let the
|
||||
server revise the publishing interval.
|
||||
Default: 0
|
||||
--ct, --createsessiontimeout=VALUE
|
||||
specify the timeout in seconds used when creating
|
||||
a session to an endpoint. On unsuccessful
|
||||
connection attemps a backoff up to 5 times the
|
||||
specified timeout value is used.
|
||||
Min: 1
|
||||
Default: 10
|
||||
--ki, --keepaliveinterval=VALUE
|
||||
specify the interval in seconds the publisher is
|
||||
sending keep alive messages to the OPC servers
|
||||
on the endpoints it is connected to.
|
||||
Min: 2
|
||||
Default: 2
|
||||
--kt, --keepalivethreshold=VALUE
|
||||
specify the number of keep alive packets a server
|
||||
can miss, before the session is disconneced
|
||||
Min: 1
|
||||
Default: 5
|
||||
--st, --opcstacktracemask=VALUE
|
||||
the trace mask for the OPC stack. See github OPC .
|
||||
NET stack for definitions.
|
||||
To enable IoTHub telemetry tracing set it to 711.
|
||||
Command line arguments overrule environment variable settings.
|
||||
|
||||
Default: 285 (645)
|
||||
--as, --autotrustservercerts=VALUE
|
||||
the publisher trusts all servers it is
|
||||
establishing a connection to.
|
||||
Default: False
|
||||
--tm, --trustmyself=VALUE
|
||||
the publisher certificate is put into the trusted
|
||||
certificate store automatically.
|
||||
Default: True
|
||||
--fd, --fetchdisplayname=VALUE
|
||||
enable to read the display name of a published
|
||||
node from the server. this will increase the
|
||||
runtime.
|
||||
Default: False
|
||||
--at, --appcertstoretype=VALUE
|
||||
the own application cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: 'X509Store'
|
||||
--ap, --appcertstorepath=VALUE
|
||||
the path where the own application cert should be
|
||||
stored
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/own'
|
||||
--tt, --trustedcertstoretype=VALUE
|
||||
the trusted cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: Directory
|
||||
--tp, --trustedcertstorepath=VALUE
|
||||
the path of the trusted cert store
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/trusted'
|
||||
--rt, --rejectedcertstoretype=VALUE
|
||||
the rejected cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: Directory
|
||||
--rp, --rejectedcertstorepath=VALUE
|
||||
the path of the rejected cert store
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/rejected'
|
||||
--it, --issuercertstoretype=VALUE
|
||||
the trusted issuer cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: Directory
|
||||
--ip, --issuercertstorepath=VALUE
|
||||
the path of the trusted issuer cert store
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/issuers'
|
||||
--dt, --devicecertstoretype=VALUE
|
||||
the iothub device cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: X509Store
|
||||
--dp, --devicecertstorepath=VALUE
|
||||
the path of the iot device cert store
|
||||
Default Default (depends on store type):
|
||||
X509Store: 'My'
|
||||
Directory: 'CertificateStores/IoTHub'
|
||||
Options:
|
||||
--pf, --publishfile=VALUE
|
||||
the filename to configure the nodes to publish.
|
||||
Default: 'D:\Repos\hg\iot-edge-opc-publisher\src\
|
||||
publishednodes.json'
|
||||
--sd, --shopfloordomain=VALUE
|
||||
the domain of the shopfloor. if specified this
|
||||
domain is appended (delimited by a ':' to the '
|
||||
ApplicationURI' property when telemetry is sent
|
||||
to IoTHub.
|
||||
The value must follow the syntactical rules of a
|
||||
DNS hostname.
|
||||
Default: not set
|
||||
--sw, --sessionconnectwait=VALUE
|
||||
specify the wait time in seconds publisher is
|
||||
trying to connect to disconnected endpoints and
|
||||
starts monitoring unmonitored items
|
||||
Min: 10
|
||||
Default: 10
|
||||
--mq, --monitoreditemqueuecapacity=VALUE
|
||||
specify how many notifications of monitored items
|
||||
could be stored in the internal queue, if the
|
||||
data could not be sent quick enough to IoTHub
|
||||
Min: 1024
|
||||
Default: 8192
|
||||
--di, --diagnosticsinterval=VALUE
|
||||
shows publisher diagnostic info at the specified
|
||||
interval in seconds. 0 disables diagnostic
|
||||
output.
|
||||
Default: 0
|
||||
--vc, --verboseconsole=VALUE
|
||||
the output of publisher is shown on the console.
|
||||
Default: False
|
||||
--ih, --iothubprotocol=VALUE
|
||||
the protocol to use for communication with Azure
|
||||
IoTHub (allowed values: Amqp, Http1, Amqp_
|
||||
WebSocket_Only, Amqp_Tcp_Only, Mqtt, Mqtt_
|
||||
WebSocket_Only, Mqtt_Tcp_Only).
|
||||
Default: Mqtt
|
||||
--ms, --iothubmessagesize=VALUE
|
||||
the max size of a message which can be send to
|
||||
IoTHub. when telemetry of this size is available
|
||||
it will be sent.
|
||||
0 will enforce immediate send when telemetry is
|
||||
available
|
||||
Min: 0
|
||||
Max: 262144
|
||||
Default: 4096
|
||||
--si, --iothubsendinterval=VALUE
|
||||
the interval in seconds when telemetry should be
|
||||
send to IoTHub. If 0, then only the
|
||||
iothubmessagesize parameter controls when
|
||||
telemetry is sent.
|
||||
Default: '1'
|
||||
--lf, --logfile=VALUE the filename of the logfile to use.
|
||||
Default: './Logs/<applicationname>.log.txt'
|
||||
--pn, --portnum=VALUE the server port of the publisher OPC server
|
||||
endpoint.
|
||||
Default: 62222
|
||||
--pa, --path=VALUE the enpoint URL path part of the publisher OPC
|
||||
server endpoint.
|
||||
Default: '/UA/Publisher'
|
||||
--lr, --ldsreginterval=VALUE
|
||||
the LDS(-ME) registration interval in ms. If 0,
|
||||
then the registration is disabled.
|
||||
Default: 0
|
||||
--ot, --operationtimeout=VALUE
|
||||
the operation timeout of the publisher OPC UA
|
||||
client in ms.
|
||||
Default: 120000
|
||||
--oi, --opcsamplinginterval=VALUE
|
||||
the publisher is using this as default value in
|
||||
milliseconds to request the servers to sample
|
||||
the nodes with this interval
|
||||
this value might be revised by the OPC UA
|
||||
servers to a supported sampling interval.
|
||||
please check the OPC UA specification for
|
||||
details how this is handled by the OPC UA stack.
|
||||
a negative value will set the sampling interval
|
||||
to the publishing interval of the subscription
|
||||
this node is on.
|
||||
0 will configure the OPC UA server to sample in
|
||||
the highest possible resolution and should be
|
||||
taken with care.
|
||||
Default: 1000
|
||||
--op, --opcpublishinginterval=VALUE
|
||||
the publisher is using this as default value in
|
||||
milliseconds for the publishing interval setting
|
||||
of the subscriptions established to the OPC UA
|
||||
servers.
|
||||
please check the OPC UA specification for
|
||||
details how this is handled by the OPC UA stack.
|
||||
a value less than or equal zero will let the
|
||||
server revise the publishing interval.
|
||||
Default: 0
|
||||
--ct, --createsessiontimeout=VALUE
|
||||
specify the timeout in seconds used when creating
|
||||
a session to an endpoint. On unsuccessful
|
||||
connection attemps a backoff up to 5 times the
|
||||
specified timeout value is used.
|
||||
Min: 1
|
||||
Default: 10
|
||||
--ki, --keepaliveinterval=VALUE
|
||||
specify the interval in seconds the publisher is
|
||||
sending keep alive messages to the OPC servers
|
||||
on the endpoints it is connected to.
|
||||
Min: 2
|
||||
Default: 2
|
||||
--kt, --keepalivethreshold=VALUE
|
||||
specify the number of keep alive packets a server
|
||||
can miss, before the session is disconneced
|
||||
Min: 1
|
||||
Default: 5
|
||||
--st, --opcstacktracemask=VALUE
|
||||
the trace mask for the OPC stack. See github OPC .
|
||||
NET stack for definitions.
|
||||
To enable IoTHub telemetry tracing set it to 711.
|
||||
|
||||
Default: 285 (645)
|
||||
--as, --autotrustservercerts=VALUE
|
||||
the publisher trusts all servers it is
|
||||
establishing a connection to.
|
||||
Default: False
|
||||
--tm, --trustmyself=VALUE
|
||||
the publisher certificate is put into the trusted
|
||||
certificate store automatically.
|
||||
Default: True
|
||||
--fd, --fetchdisplayname=VALUE
|
||||
enable to read the display name of a published
|
||||
node from the server. this will increase the
|
||||
runtime.
|
||||
Default: False
|
||||
--at, --appcertstoretype=VALUE
|
||||
the own application cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: 'X509Store'
|
||||
--ap, --appcertstorepath=VALUE
|
||||
the path where the own application cert should be
|
||||
stored
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/own'
|
||||
--tt, --trustedcertstoretype=VALUE
|
||||
the trusted cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: Directory
|
||||
--tp, --trustedcertstorepath=VALUE
|
||||
the path of the trusted cert store
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/trusted'
|
||||
--rt, --rejectedcertstoretype=VALUE
|
||||
the rejected cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: Directory
|
||||
--rp, --rejectedcertstorepath=VALUE
|
||||
the path of the rejected cert store
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/rejected'
|
||||
--it, --issuercertstoretype=VALUE
|
||||
the trusted issuer cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: Directory
|
||||
--ip, --issuercertstorepath=VALUE
|
||||
the path of the trusted issuer cert store
|
||||
Default (depends on store type):
|
||||
X509Store: 'CurrentUser\UA_MachineDefault'
|
||||
Directory: 'CertificateStores/issuers'
|
||||
--dt, --devicecertstoretype=VALUE
|
||||
the iothub device cert store type.
|
||||
(allowed values: Directory, X509Store)
|
||||
Default: X509Store
|
||||
--dp, --devicecertstorepath=VALUE
|
||||
the path of the iot device cert store
|
||||
Default Default (depends on store type):
|
||||
X509Store: 'My'
|
||||
Directory: 'CertificateStores/IoTHub'
|
||||
-h, --help show this message and exit
|
||||
|
||||
|
||||
There are a couple of environment variables which can be used to control the application:
|
||||
* _HUB_CS: sets the IoTHub owner connectionstring
|
||||
* _GW_LOGP: sets the filename of the log file to use
|
||||
|
@ -329,6 +350,177 @@ In certain use cases it may make sense to read configuration information from or
|
|||
### Store for X509 certificates
|
||||
Storing X509 certificates does not work with bind mounts, since the permissions of the path to the store need to be `rw` for the owner. Instead you need to use the `-v` option of `docker run` in the volume mode.
|
||||
|
||||
## Performance and memory considerations
|
||||
### Commandline parameters contolling performance and memory
|
||||
When running Publisher you need to be aware of your performance requirements and the memory resources you have available on your platform.
|
||||
Since both are interdependent and both depend on the configuration of how many nodes are configured to publish, you should ensure that the parameters you are using for:
|
||||
* IoTHub send interval (`--si`)
|
||||
* IoTHub message size (`--ms`)
|
||||
* Monitored Items queue capacity (`--mq`)
|
||||
do meet your requirements.
|
||||
|
||||
The `--mq` parameter controls the upper bound of the capacity of the internal queue, which buffers all notifications if a value of an OPC node changes. If Publisher is not able to send messages to IoTHub fast enough,
|
||||
then this queue buffers those notifications. The parameter sets the number of notifications which could be buffered. If you seen the number of items in this queue increasing in your test runs, you need to:
|
||||
* decrease the IoTHub send interval (`--si`)
|
||||
* increase the IoTHub message size (`--ms`)
|
||||
otherwise you will loose the data values of those OPC node changes. The `--mq` parameter at the same time allows to prevent controlling the upper bound of the memory resources used by Publisher.
|
||||
|
||||
The `--si` parameter enforces Publisher to send messages to IoTHub as the specified interval. If there is an IoTHub message size specified via the `--ms` parameter (or by the default value for it),
|
||||
then a message will be sent either when the message size is reached (in this case the interval is restarted) or when the specified interval time has passed. If you disable the message size by `--ms 0`, Publisher
|
||||
uses the maximal possible IoTHub message size of 256 kB to batch data.
|
||||
|
||||
The `--ms` parameter allows you to enable batching of messages sent to IoTHub. Depending on the protocol you are using, the overhead to send a message to IoTHub is high compared to the actual time of sending the payload.
|
||||
If your scenario allows latency for the data ingested, you should configure Publisher to use the maximal message size of 256 kB.
|
||||
|
||||
Before you use Publisher in production scenarios, you need to test the performance and memory under production conditions. You could use the `--di` commandline parameter to specify a interval in seconds,
|
||||
which will trigger the output of diagnostic information at this interval.
|
||||
|
||||
### Test measurements
|
||||
Here are some measurements with different values for `--si` and `--ms` parameters publishing 497 nodes with an OPC publishing interval of 1 second.
|
||||
Publisher was used as debug build on Windows 10 natively for 120 seconds. The IoTHub protocol was configured to use HTTP (`--ih Http1`).
|
||||
|
||||
#### Default configuration (--si 1 --ms 4096)
|
||||
|
||||
======================================================================
|
||||
OpcPublisher status @ 25.10.2017 11:08:25
|
||||
---------------------------------
|
||||
OPC sessions: 1
|
||||
connected OPC sessions: 1
|
||||
connected OPC subscriptions: 1
|
||||
OPC monitored items: 497
|
||||
---------------------------------
|
||||
monitored items queue bounded capacity: 8192
|
||||
monitored items queue current items: 8107
|
||||
monitored item notifications enqueued: 57076
|
||||
monitored item notifications enqueue failure: 28551
|
||||
monitored item notifications dequeued: 20418
|
||||
---------------------------------
|
||||
messages sent to IoTHub: 1200
|
||||
bytes sent to IoTHub: 4772773
|
||||
avg msg size: 3977
|
||||
time in ms for sent msgs: 113744
|
||||
min time in ms for msg: 84
|
||||
max time in ms for msg: 441
|
||||
avg time in ms for msg: 94
|
||||
msg send failures: 0
|
||||
time in ms for failed msgs: 0
|
||||
avg time in ms for failed msg: 0
|
||||
messages too large to sent to IoTHub: 0
|
||||
times we missed send interval: 0
|
||||
---------------------------------
|
||||
current working set in MB: 90
|
||||
======================================================================
|
||||
|
||||
The default configuration sends data to IoTHub each second or when 4kB of data to ingest is available. In this configuration we loose 28551 OPC node value updates (`monitored item notifications enqueue failure`).
|
||||
|
||||
#### Constant send inverval (--si 1 --ms 0)
|
||||
|
||||
======================================================================
|
||||
OpcPublisher status @ 25.10.2017 11:18:20
|
||||
---------------------------------
|
||||
OPC sessions: 1
|
||||
connected OPC sessions: 1
|
||||
connected OPC subscriptions: 1
|
||||
OPC monitored items: 497
|
||||
---------------------------------
|
||||
monitored items queue bounded capacity: 8192
|
||||
monitored items queue current items: 0
|
||||
monitored item notifications enqueued: 56682
|
||||
monitored item notifications enqueue failure: 0
|
||||
monitored item notifications dequeued: 56682
|
||||
---------------------------------
|
||||
messages sent to IoTHub: 114
|
||||
bytes sent to IoTHub: 13130523
|
||||
avg msg size: 115180
|
||||
time in ms for sent msgs: 14454
|
||||
min time in ms for msg: 100
|
||||
max time in ms for msg: 705
|
||||
avg time in ms for msg: 126
|
||||
msg send failures: 0
|
||||
time in ms for failed msgs: 0
|
||||
avg time in ms for failed msg: 0
|
||||
messages too large to sent to IoTHub: 0
|
||||
times we missed send interval: 0
|
||||
---------------------------------
|
||||
current working set in MB: 87
|
||||
======================================================================
|
||||
|
||||
When the message size is set to 0 and there is a send interval configured (or the default of 1 second is used), then Publisher does use internally batch data using the maximal supported IoTHub message size, which is 256 kB. As you see in the diagnostic output,
|
||||
the average message size is 115180 byte. In this configuration we do not loose any OPC node value udpates. The average time to send a message to IoTHub (`avg time in ms for msg`) was only 22ms higher than in the
|
||||
default configuration, which has a average message size of 3877 byte.
|
||||
|
||||
#### Send each OPC node value update (--si 0 --ms 0)
|
||||
|
||||
======================================================================
|
||||
OpcPublisher status @ 25.10.2017 10:08:03
|
||||
---------------------------------
|
||||
OPC sessions: 1
|
||||
connected OPC sessions: 1
|
||||
connected OPC subscriptions: 1
|
||||
OPC monitored items: 497
|
||||
---------------------------------
|
||||
monitored items queue bounded capacity: 8192
|
||||
monitored items queue current items: 8192
|
||||
monitored item notifications enqueued: 57323
|
||||
monitored item notifications enqueue failure: 48116
|
||||
monitored item notifications dequeued: 1015
|
||||
---------------------------------
|
||||
messages sent to IoTHub: 1014
|
||||
bytes sent to IoTHub: 237292
|
||||
avg msg size: 234
|
||||
time in ms for sent msgs: 114268
|
||||
min time in ms for msg: 84
|
||||
max time in ms for msg: 330
|
||||
avg time in ms for msg: 112
|
||||
msg send failures: 0
|
||||
time in ms for failed msgs: 0
|
||||
avg time in ms for failed msg: 0
|
||||
messages too large to sent to IoTHub: 0
|
||||
times we missed send interval: 0
|
||||
---------------------------------
|
||||
current working set in MB: 92
|
||||
======================================================================
|
||||
|
||||
This configuration sends for each OPC node value change a message to IoTHub. You see the average message size of 234 byte is pretty small and the average time required to send such a message was
|
||||
112 ms - which is compared to larger message sizes - a high value. The advantage of this configuration is that Publisher does not add any latency to the ingest data path. The number of
|
||||
lost OPC node value updates (`monitored item notifications enqueue failure: 48116`) is the highest of all compared configurations.
|
||||
|
||||
#### Maximum batching (--si 0 --ms 262144)
|
||||
|
||||
======================================================================
|
||||
OpcPublisher status @ 25.10.2017 10:05:23
|
||||
---------------------------------
|
||||
OPC sessions: 1
|
||||
connected OPC sessions: 1
|
||||
connected OPC subscriptions: 1
|
||||
OPC monitored items: 497
|
||||
---------------------------------
|
||||
monitored items queue bounded capacity: 8192
|
||||
monitored items queue current items: 0
|
||||
monitored item notifications enqueued: 57086
|
||||
monitored item notifications enqueue failure: 0
|
||||
monitored item notifications dequeued: 57086
|
||||
---------------------------------
|
||||
messages sent to IoTHub: 50
|
||||
bytes sent to IoTHub: 13096746
|
||||
avg msg size: 261934
|
||||
time in ms for sent msgs: 8648
|
||||
min time in ms for msg: 136
|
||||
max time in ms for msg: 596
|
||||
avg time in ms for msg: 172
|
||||
msg send failures: 0
|
||||
time in ms for failed msgs: 0
|
||||
avg time in ms for failed msg: 0
|
||||
messages too large to sent to IoTHub: 0
|
||||
times we missed send interval: 0
|
||||
---------------------------------
|
||||
current working set in MB: 89
|
||||
======================================================================
|
||||
|
||||
This configuration batches as much OPC node value udpates as possible. The maximum IoTHub message size is 256 kB, which is configured here. There is no send interval requested, which makes the time when data is ingested
|
||||
completly controlled by the data itself. This configuration has the least probability of loosing any OPC node values and could be used for publishing a high number of nodes.
|
||||
When using this configuration you need to ensure, that your scenario does not have conditions where high latency is introduced (because the message size of 256 kB is not reached).
|
||||
|
||||
# Debugging the Application
|
||||
|
||||
## Native on Winodws
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
@ -13,6 +10,9 @@ namespace OpcPublisher
|
|||
using Microsoft.Azure.Devices;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Opc.Ua;
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using static Opc.Ua.CertificateStoreType;
|
||||
using static OpcPublisher.Workarounds.TraceWorkaround;
|
||||
using static OpcStackConfiguration;
|
||||
|
@ -36,12 +36,13 @@ namespace OpcPublisher
|
|||
}
|
||||
private static Microsoft.Azure.Devices.Client.TransportType _iotHubProtocol = Microsoft.Azure.Devices.Client.TransportType.Mqtt;
|
||||
|
||||
public static uint MaxSizeOfIoTHubMessageBytes
|
||||
public const uint IotHubMessageSizeMax = (256 * 1024);
|
||||
public static uint IotHubMessageSize
|
||||
{
|
||||
get => _maxSizeOfIoTHubMessageBytes;
|
||||
set => _maxSizeOfIoTHubMessageBytes = value;
|
||||
get => _iotHubMessageSize;
|
||||
set => _iotHubMessageSize = value;
|
||||
}
|
||||
private static uint _maxSizeOfIoTHubMessageBytes = 4096;
|
||||
private static uint _iotHubMessageSize = 4096;
|
||||
|
||||
public static int DefaultSendIntervalSeconds
|
||||
{
|
||||
|
@ -66,6 +67,88 @@ namespace OpcPublisher
|
|||
}
|
||||
private static string _iotDeviceCertStorePath = IotDeviceCertX509StorePathDefault;
|
||||
|
||||
public static int MonitoredItemsQueueCapacity
|
||||
{
|
||||
get => _monitoredItemsQueueCapacity;
|
||||
set => _monitoredItemsQueueCapacity = value;
|
||||
}
|
||||
private static int _monitoredItemsQueueCapacity = 8192;
|
||||
|
||||
public static long MonitoredItemsQueueCount => _monitoredItemsDataQueue.Count;
|
||||
|
||||
public static long EnqueueCount
|
||||
{
|
||||
get => _enqueueCount;
|
||||
}
|
||||
private static long _enqueueCount;
|
||||
|
||||
public static long EnqueueFailureCount
|
||||
{
|
||||
get => _enqueueFailureCount;
|
||||
}
|
||||
private static long _enqueueFailureCount;
|
||||
|
||||
public static long DequeueCount
|
||||
{
|
||||
get => _dequeueCount;
|
||||
}
|
||||
private static long _dequeueCount;
|
||||
|
||||
|
||||
public static long MissedSendIntervalCount
|
||||
{
|
||||
get => _missedSendIntervalCount;
|
||||
}
|
||||
private static long _missedSendIntervalCount;
|
||||
|
||||
public static long TooLargeCount
|
||||
{
|
||||
get => _tooLargeCount;
|
||||
}
|
||||
private static long _tooLargeCount;
|
||||
|
||||
public static long SentBytes
|
||||
{
|
||||
get => _sentBytes;
|
||||
}
|
||||
private static long _sentBytes;
|
||||
|
||||
public static long SentMessages
|
||||
{
|
||||
get => _sentMessages;
|
||||
}
|
||||
private static long _sentMessages;
|
||||
|
||||
public static long SentTime
|
||||
{
|
||||
get => _sentTime;
|
||||
}
|
||||
private static long _sentTime;
|
||||
|
||||
public static long MinSentTime
|
||||
{
|
||||
get => _minSentTime;
|
||||
}
|
||||
private static long _minSentTime = long.MaxValue;
|
||||
|
||||
public static long MaxSentTime
|
||||
{
|
||||
get => _maxSentTime;
|
||||
}
|
||||
private static long _maxSentTime;
|
||||
|
||||
public static long FailedMessages
|
||||
{
|
||||
get => _failedMessages;
|
||||
}
|
||||
private static long _failedMessages;
|
||||
|
||||
public static long FailedTime
|
||||
{
|
||||
get => _failedTime;
|
||||
}
|
||||
private static long _failedTime;
|
||||
|
||||
/// <summary>
|
||||
/// Classes for the telemetry message sent to IoTHub.
|
||||
/// </summary>
|
||||
|
@ -83,26 +166,16 @@ namespace OpcPublisher
|
|||
public string SourceTimestamp;
|
||||
}
|
||||
|
||||
private ConcurrentQueue<string> _sendQueue;
|
||||
private int _currentSizeOfIotHubMessageBytes;
|
||||
private List<OpcUaMessage> _messageList;
|
||||
private SemaphoreSlim _messageListSemaphore;
|
||||
private CancellationTokenSource _tokenSource;
|
||||
private Task _dequeueAndSendTask;
|
||||
private Timer _sendTimer;
|
||||
private AutoResetEvent _sendQueueEvent;
|
||||
private DeviceClient _iotHubClient;
|
||||
|
||||
private static BlockingCollection<string> _monitoredItemsDataQueue;
|
||||
private static CancellationTokenSource _tokenSource;
|
||||
private static Task _monitoredItemsProcessorTask;
|
||||
private static DeviceClient _iotHubClient;
|
||||
/// <summary>
|
||||
/// Ctor for the class.
|
||||
/// </summary>
|
||||
public IotHubMessaging()
|
||||
{
|
||||
_sendQueue = new ConcurrentQueue<string>();
|
||||
_sendQueueEvent = new AutoResetEvent(false);
|
||||
_messageList = new List<OpcUaMessage>();
|
||||
_messageListSemaphore = new SemaphoreSlim(1);
|
||||
_currentSizeOfIotHubMessageBytes = 0;
|
||||
_monitoredItemsDataQueue = new BlockingCollection<string>(_monitoredItemsQueueCapacity);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -172,7 +245,7 @@ namespace OpcPublisher
|
|||
{
|
||||
Trace($"Create Publisher IoTHub client with device connection string: '{deviceConnectionString}' using '{IotHubProtocol}' for communication.");
|
||||
_iotHubClient = DeviceClient.CreateFromConnectionString(deviceConnectionString, IotHubProtocol);
|
||||
ExponentialBackoff exponentialRetryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(5000), TimeSpan.FromMilliseconds(100));
|
||||
ExponentialBackoff exponentialRetryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(2), TimeSpan.FromMilliseconds(1024), TimeSpan.FromMilliseconds(3));
|
||||
_iotHubClient.SetRetryPolicy(exponentialRetryPolicy);
|
||||
await _iotHubClient.OpenAsync();
|
||||
}
|
||||
|
@ -184,11 +257,11 @@ namespace OpcPublisher
|
|||
}
|
||||
|
||||
// start up task to send telemetry to IoTHub.
|
||||
_dequeueAndSendTask = null;
|
||||
_monitoredItemsProcessorTask = null;
|
||||
_tokenSource = new CancellationTokenSource();
|
||||
|
||||
Trace("Creating task to send OPC UA messages in batches to IoT Hub...");
|
||||
_dequeueAndSendTask = Task.Run(() => DeQueueMessagesAsync(_tokenSource.Token), _tokenSource.Token);
|
||||
Trace("Creating task process and batch monitored item data updates...");
|
||||
_monitoredItemsProcessorTask = Task.Run(async () => await MonitoredItemsProcessor(_tokenSource.Token), _tokenSource.Token);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -218,20 +291,12 @@ namespace OpcPublisher
|
|||
try
|
||||
{
|
||||
_tokenSource.Cancel();
|
||||
await _dequeueAndSendTask;
|
||||
await _monitoredItemsProcessorTask;
|
||||
|
||||
if (_iotHubClient != null)
|
||||
{
|
||||
await _iotHubClient.CloseAsync();
|
||||
}
|
||||
if (_sendTimer != null)
|
||||
{
|
||||
_sendTimer.Dispose();
|
||||
}
|
||||
if (_sendQueueEvent != null)
|
||||
{
|
||||
_sendQueueEvent.Dispose();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -239,151 +304,191 @@ namespace OpcPublisher
|
|||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Enqueue a message for batch send.
|
||||
//
|
||||
/// <summary>
|
||||
/// Enqueue a message for sending to IoTHub.
|
||||
/// </summary>
|
||||
/// <param name="json"></param>
|
||||
public void Enqueue(string json)
|
||||
{
|
||||
_sendQueue.Enqueue(json);
|
||||
_sendQueueEvent.Set();
|
||||
// Try to add the message.
|
||||
_enqueueCount++;
|
||||
if (_monitoredItemsDataQueue.TryAdd(json) == false)
|
||||
{
|
||||
_enqueueFailureCount++;
|
||||
if (_enqueueFailureCount % 10000 == 0)
|
||||
{
|
||||
Trace(Utils.TraceMasks.Error, $"The internal monitored item message queue is above its capacity of {_monitoredItemsDataQueue.BoundedCapacity}. We have already lost {_enqueueFailureCount} monitored item notifications:(");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Dequeue telemetry messages, compose them for batch send (if needed) and prepares them for sending to IoTHub.
|
||||
/// Dequeue monitored item notification messages, batch them for send (if needed) and send them to IoTHub.
|
||||
/// </summary>
|
||||
private async Task DeQueueMessagesAsync(CancellationToken ct)
|
||||
private async Task MonitoredItemsProcessor(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
string contentPropertyKey = "content-type";
|
||||
string contentPropertyValue = "application/opcua+uajson";
|
||||
string devicenamePropertyKey = "devicename";
|
||||
string devicenamePropertyValue = ApplicationName;
|
||||
int propertyLength = contentPropertyKey.Length + contentPropertyValue.Length + devicenamePropertyKey.Length + devicenamePropertyValue.Length;
|
||||
// if batching is requested the buffer will have the requested size, otherwise we reserve the max size
|
||||
uint iotHubMessageBufferSize = (_iotHubMessageSize > 0 ? _iotHubMessageSize : IotHubMessageSizeMax) - (uint)propertyLength;
|
||||
byte[] iotHubMessageBuffer = new byte[iotHubMessageBufferSize];
|
||||
MemoryStream iotHubMessage = new MemoryStream(iotHubMessageBuffer);
|
||||
DateTime nextSendTime = DateTime.UtcNow + TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
|
||||
int millisecondsTillNextSend = TimeSpan.FromSeconds(_defaultSendIntervalSeconds).Milliseconds;
|
||||
|
||||
using (iotHubMessage)
|
||||
{
|
||||
if (_defaultSendIntervalSeconds > 0 && _maxSizeOfIoTHubMessageBytes > 0)
|
||||
try
|
||||
{
|
||||
// send every x seconds
|
||||
Trace($"Start timer to send data to IoTHub in {_defaultSendIntervalSeconds} seconds.");
|
||||
_sendTimer = new Timer(async state => await SendToIoTHubAsync(), null, TimeSpan.FromSeconds(_defaultSendIntervalSeconds), TimeSpan.FromSeconds(_defaultSendIntervalSeconds));
|
||||
}
|
||||
string jsonMessage = string.Empty;
|
||||
bool needToBufferMessage = false;
|
||||
int jsonMessageSize = 0;
|
||||
|
||||
WaitHandle[] waitHandles = { _sendQueueEvent, ct.WaitHandle };
|
||||
while (true)
|
||||
{
|
||||
// wait till some work needs to be done
|
||||
WaitHandle.WaitAny(waitHandles);
|
||||
|
||||
// do we need to stop
|
||||
if (ct.IsCancellationRequested)
|
||||
iotHubMessage.Position = 0;
|
||||
iotHubMessage.SetLength(0);
|
||||
while (true)
|
||||
{
|
||||
Trace($"Cancellation requested. Sending {_sendQueue.Count} remaining messages.");
|
||||
await SendToIoTHubAsync();
|
||||
break;
|
||||
}
|
||||
|
||||
if (_sendQueue.Count > 0)
|
||||
{
|
||||
bool isPeekSuccessful = false;
|
||||
bool isDequeueSuccessful = false;
|
||||
string messageInJson = string.Empty;
|
||||
int nextMessageSizeBytes = 0;
|
||||
|
||||
// perform a TryPeek to determine size of next message
|
||||
// and whether it will fit. If so, dequeue message and add it to the list.
|
||||
// if it cannot fit, send current message to IoTHub, reset it, and repeat.
|
||||
|
||||
isPeekSuccessful = _sendQueue.TryPeek(out messageInJson);
|
||||
|
||||
// get size of next message in the queue
|
||||
if (isPeekSuccessful)
|
||||
// sanity check the send interval, compute the timeout and get the next monitored item message
|
||||
if (_defaultSendIntervalSeconds > 0)
|
||||
{
|
||||
nextMessageSizeBytes = Encoding.UTF8.GetByteCount(messageInJson);
|
||||
|
||||
// sanity check that the user has set a large enough IoTHub messages size.
|
||||
if (nextMessageSizeBytes > _maxSizeOfIoTHubMessageBytes && _maxSizeOfIoTHubMessageBytes > 0)
|
||||
millisecondsTillNextSend = nextSendTime.Subtract(DateTime.UtcNow).Milliseconds;
|
||||
if (millisecondsTillNextSend < 0)
|
||||
{
|
||||
Trace(Utils.TraceMasks.Error, $"There is a telemetry message (size: {nextMessageSizeBytes}), which will not fit into an IoTHub message (max size: {_maxSizeOfIoTHubMessageBytes}].");
|
||||
Trace(Utils.TraceMasks.Error, $"Please check your IoTHub message size settings. The telemetry message will be discarded silently. Sorry:(");
|
||||
_sendQueue.TryDequeue(out messageInJson);
|
||||
continue;
|
||||
_missedSendIntervalCount++;
|
||||
// do not wait if we missed the send interval
|
||||
millisecondsTillNextSend = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// determine if it will fit into remaining space of the IoTHub message or if we do not batch at all
|
||||
// if so, dequeue it
|
||||
if (_currentSizeOfIotHubMessageBytes + nextMessageSizeBytes < _maxSizeOfIoTHubMessageBytes || _maxSizeOfIoTHubMessageBytes == 0)
|
||||
else
|
||||
{
|
||||
isDequeueSuccessful = _sendQueue.TryDequeue(out messageInJson);
|
||||
// if we are in shutdown do not wait, else wait infinite if send interval is not set
|
||||
millisecondsTillNextSend = ct.IsCancellationRequested ? 0 : Timeout.Infinite;
|
||||
}
|
||||
bool gotItem = _monitoredItemsDataQueue.TryTake(out jsonMessage, millisecondsTillNextSend);
|
||||
|
||||
// add dequeued message to list
|
||||
if (isDequeueSuccessful)
|
||||
// the two commandline parameter --ms (message size) and --si (send interval) control when data is sent to IoTHub
|
||||
// pls see detailed comments on performance and memory consumption at https://github.com/Azure/iot-edge-opc-publisher
|
||||
|
||||
// check if we got an item or if we hit the timeout or got canceled
|
||||
if (gotItem)
|
||||
{
|
||||
_dequeueCount++;
|
||||
jsonMessageSize = Encoding.UTF8.GetByteCount(jsonMessage.ToString());
|
||||
|
||||
// sanity check that the user has set a large enough IoTHub messages size
|
||||
if ((_iotHubMessageSize > 0 && jsonMessageSize > _iotHubMessageSize) || (_iotHubMessageSize == 0 && jsonMessageSize > iotHubMessageBufferSize))
|
||||
{
|
||||
OpcUaMessage msgPayload = JsonConvert.DeserializeObject<OpcUaMessage>(messageInJson);
|
||||
await _messageListSemaphore.WaitAsync();
|
||||
_messageList.Add(msgPayload);
|
||||
_messageListSemaphore.Release();
|
||||
_currentSizeOfIotHubMessageBytes = _currentSizeOfIotHubMessageBytes + nextMessageSizeBytes;
|
||||
Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {nextMessageSizeBytes} to IoTHub message (size is now {_currentSizeOfIotHubMessageBytes}). {_sendQueue.Count} message(s) in send queue.");
|
||||
Trace(Utils.TraceMasks.Error, $"There is a telemetry message (size: {jsonMessageSize}), which will not fit into an IoTHub message (max size: {_iotHubMessageSize}].");
|
||||
Trace(Utils.TraceMasks.Error, $"Please check your IoTHub message size settings. The telemetry message will be discarded silently. Sorry:(");
|
||||
_tooLargeCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// fall through, if we should send immediately
|
||||
if (_maxSizeOfIoTHubMessageBytes != 0)
|
||||
// if batching is requested or we need to send at intervals, batch it otherwise send it right away
|
||||
if (_iotHubMessageSize > 0 || (_iotHubMessageSize == 0 && _defaultSendIntervalSeconds > 0))
|
||||
{
|
||||
// if there is still space to batch, do it. otherwise send the buffer and flag the message for later buffering
|
||||
if (iotHubMessage.Position + jsonMessageSize <= iotHubMessage.Capacity)
|
||||
{
|
||||
iotHubMessage.Write(Encoding.UTF8.GetBytes(jsonMessage.ToString()), 0, jsonMessageSize);
|
||||
Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {jsonMessageSize} to IoTHub message (size is now {iotHubMessage.Position}).");
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
needToBufferMessage = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// if we got no message, we either reached the interval or we are in shutdown and have processed all messages
|
||||
if (ct.IsCancellationRequested)
|
||||
{
|
||||
Trace($"Cancellation requested.");
|
||||
_monitoredItemsDataQueue.CompleteAdding();
|
||||
_monitoredItemsDataQueue.Dispose();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// the message needs to be sent now.
|
||||
Trace(Utils.TraceMasks.OperationDetail, $"IoTHub message complete. Trigger send of message with size {_currentSizeOfIotHubMessageBytes} to IoTHub.");
|
||||
await SendToIoTHubAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Trace(e, "Error while dequeuing messages.");
|
||||
}
|
||||
}
|
||||
// the batching is completed or we reached the send interval or got a cancelation request
|
||||
try
|
||||
{
|
||||
Microsoft.Azure.Devices.Client.Message encodedIotHubMessage = null;
|
||||
|
||||
/// <summary>
|
||||
/// Send messages to IoT Hub
|
||||
/// </summary>
|
||||
private async Task SendToIoTHubAsync()
|
||||
{
|
||||
if (_messageList.Count > 0)
|
||||
{
|
||||
// process all queued messages
|
||||
await _messageListSemaphore.WaitAsync();
|
||||
string msgListInJson = JsonConvert.SerializeObject(_messageList);
|
||||
var encodedMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(msgListInJson));
|
||||
_currentSizeOfIotHubMessageBytes = 0;
|
||||
_messageList.Clear();
|
||||
_messageListSemaphore.Release();
|
||||
// if we reached the send interval, but have nothing to send, we continue
|
||||
if (!gotItem && iotHubMessage.Position == 0)
|
||||
{
|
||||
nextSendTime += TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
|
||||
iotHubMessage.Position = 0;
|
||||
iotHubMessage.SetLength(0);
|
||||
continue;
|
||||
}
|
||||
|
||||
// publish
|
||||
encodedMessage.Properties.Add("content-type", "application/opcua+uajson");
|
||||
encodedMessage.Properties.Add("deviceName", ApplicationName);
|
||||
// if there is no batching and not interval configured, we send the JSON message we just got, otherwise we send the buffer
|
||||
if (_iotHubMessageSize == 0 && _defaultSendIntervalSeconds == 0)
|
||||
{
|
||||
encodedIotHubMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(jsonMessage.ToString()));
|
||||
}
|
||||
else
|
||||
{
|
||||
encodedIotHubMessage = new Microsoft.Azure.Devices.Client.Message(iotHubMessage.ToArray());
|
||||
}
|
||||
encodedIotHubMessage.Properties.Add(contentPropertyKey, contentPropertyValue);
|
||||
encodedIotHubMessage.Properties.Add(devicenamePropertyKey, devicenamePropertyValue);
|
||||
if (_iotHubClient != null)
|
||||
{
|
||||
Stopwatch stopwatch = new Stopwatch();
|
||||
stopwatch.Start();
|
||||
nextSendTime += TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
|
||||
try
|
||||
{
|
||||
_sentBytes += encodedIotHubMessage.GetBytes().Length;
|
||||
await _iotHubClient.SendEventAsync(encodedIotHubMessage);
|
||||
stopwatch.Stop();
|
||||
_sentMessages++;
|
||||
_sentTime += stopwatch.ElapsedMilliseconds;
|
||||
_maxSentTime = Math.Max(_maxSentTime, stopwatch.ElapsedMilliseconds);
|
||||
_minSentTime = Math.Min(_minSentTime, stopwatch.ElapsedMilliseconds);
|
||||
Trace(Utils.TraceMasks.OperationDetail, $"Sending {encodedIotHubMessage.BodyStream.Length} bytes to IoTHub took {stopwatch.ElapsedMilliseconds} ms.");
|
||||
}
|
||||
catch
|
||||
{
|
||||
stopwatch.Stop();
|
||||
_failedMessages++;
|
||||
_failedTime += stopwatch.ElapsedMilliseconds;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (_iotHubClient != null)
|
||||
{
|
||||
Trace(Utils.TraceMasks.OperationDetail, "Send data to IoTHub.");
|
||||
await _iotHubClient.SendEventAsync(encodedMessage);
|
||||
}
|
||||
else
|
||||
{
|
||||
Trace("No IoTHub client available. Dropping messages...");
|
||||
// reset the messaage
|
||||
iotHubMessage.Position = 0;
|
||||
iotHubMessage.SetLength(0);
|
||||
|
||||
// if we had not yet buffered the last message because there was not enough space, buffer it now
|
||||
if (needToBufferMessage)
|
||||
{
|
||||
iotHubMessage.Write(Encoding.UTF8.GetBytes(jsonMessage.ToString()), 0, jsonMessageSize);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Trace("No IoTHub client available. Dropping messages...");
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Trace(e, "Exception while sending message to IoTHub. Dropping message...");
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Trace(e, "Exception while sending message to IoTHub. Dropping messages...");
|
||||
Trace(e, "Error while processing monitored item messages.");
|
||||
}
|
||||
}
|
||||
|
||||
// Restart timer
|
||||
if (_sendTimer != null)
|
||||
{
|
||||
// send in x seconds
|
||||
Trace(Utils.TraceMasks.OperationDetail, $"Restart timer to send data to IoTHub in {_defaultSendIntervalSeconds} second(s).");
|
||||
_sendTimer.Change(TimeSpan.FromSeconds(_defaultSendIntervalSeconds), TimeSpan.FromSeconds(_defaultSendIntervalSeconds));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -196,7 +196,6 @@ namespace OpcPublisher
|
|||
Trace(Utils.TraceMasks.OperationDetail, $" DisplayName: {monitoredItem.DisplayName}");
|
||||
Trace(Utils.TraceMasks.OperationDetail, $" Value: {value}");
|
||||
IotHubCommunication.Enqueue(json);
|
||||
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -249,6 +248,40 @@ namespace OpcPublisher
|
|||
}
|
||||
private static string _shopfloorDomain;
|
||||
|
||||
public int GetNumberOfOpcSubscriptions()
|
||||
{
|
||||
int result = 0;
|
||||
try
|
||||
{
|
||||
_opcSessionSemaphore.WaitAsync();
|
||||
result = OpcSubscriptions.Count();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_opcSessionSemaphore.Release();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public int GetNumberOfOpcMonitoredItems()
|
||||
{
|
||||
int result = 0;
|
||||
try
|
||||
{
|
||||
_opcSessionSemaphore.WaitAsync();
|
||||
var subscriptions = OpcSessions.SelectMany(s => s.OpcSubscriptions);
|
||||
foreach (var subscription in subscriptions)
|
||||
{
|
||||
result += subscription.OpcMonitoredItems.Count(i => i.State == OpcMonitoredItemState.Monitored);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_opcSessionSemaphore.Release();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public Uri EndpointUri;
|
||||
public Session OpcUaClientSession;
|
||||
public SessionState State;
|
||||
|
@ -286,20 +319,20 @@ namespace OpcPublisher
|
|||
/// - unused subscriptions (without any nodes to monitor) are removed.
|
||||
/// - sessions with out subscriptions are removed.
|
||||
/// </summary>
|
||||
public async Task ConnectAndMonitorAsync()
|
||||
public async Task ConnectAndMonitorAsync(CancellationToken ct)
|
||||
{
|
||||
bool updateConfigFileRequired = false;
|
||||
try
|
||||
{
|
||||
await ConnectSession();
|
||||
await ConnectSession(ct);
|
||||
|
||||
updateConfigFileRequired = await MonitorNodes();
|
||||
updateConfigFileRequired = await MonitorNodes(ct);
|
||||
|
||||
updateConfigFileRequired |= await StopMonitoringNodes();
|
||||
updateConfigFileRequired |= await StopMonitoringNodes(ct);
|
||||
|
||||
await RemoveUnusedSubscriptions();
|
||||
await RemoveUnusedSubscriptions(ct);
|
||||
|
||||
await RemoveUnusedSessions();
|
||||
await RemoveUnusedSessions(ct);
|
||||
|
||||
// update the config file if required
|
||||
if (updateConfigFileRequired)
|
||||
|
@ -316,14 +349,14 @@ namespace OpcPublisher
|
|||
/// <summary>
|
||||
/// Connects the session if it is disconnected.
|
||||
/// </summary>
|
||||
public async Task ConnectSession()
|
||||
public async Task ConnectSession(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _opcSessionSemaphore.WaitAsync();
|
||||
|
||||
// if the session is not disconnected, return
|
||||
if (State != SessionState.Disconnected)
|
||||
// if the session is already connected or shutdown in progress, return
|
||||
if (State == SessionState.Connected || ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -415,15 +448,15 @@ namespace OpcPublisher
|
|||
/// <summary>
|
||||
/// Monitoring for a node starts if it is required.
|
||||
/// </summary>
|
||||
public async Task<bool> MonitorNodes()
|
||||
public async Task<bool> MonitorNodes(CancellationToken ct)
|
||||
{
|
||||
bool requestConfigFileUpdate = false;
|
||||
try
|
||||
{
|
||||
await _opcSessionSemaphore.WaitAsync();
|
||||
|
||||
// if session is not connected, return
|
||||
if (State != SessionState.Connected)
|
||||
// if the session is not connected or shutdown in progress, return
|
||||
if (State != SessionState.Connected || ct.IsCancellationRequested)
|
||||
{
|
||||
return requestConfigFileUpdate;
|
||||
}
|
||||
|
@ -443,11 +476,19 @@ namespace OpcPublisher
|
|||
// process all unmonitored items.
|
||||
var unmonitoredItems = opcSubscription.OpcMonitoredItems.Where(i => (i.State == OpcMonitoredItemState.Unmonitored || i.State == OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested));
|
||||
|
||||
Trace($"Start monitoring nodes on endpoint '{EndpointUri.AbsoluteUri}'.");
|
||||
int additionalMonitoredItemsCount = 0;
|
||||
int monitoredItemsCount = 0;
|
||||
bool haveUnmonitoredItems = false;
|
||||
if (unmonitoredItems.Count() != 0)
|
||||
{
|
||||
haveUnmonitoredItems = true;
|
||||
monitoredItemsCount = opcSubscription.OpcMonitoredItems.Count(i => (i.State == OpcMonitoredItemState.Monitored));
|
||||
Trace($"Start monitoring items on endpoint '{EndpointUri.AbsoluteUri}'. Currently monitoring {monitoredItemsCount} items.");
|
||||
}
|
||||
foreach (var item in unmonitoredItems)
|
||||
{
|
||||
// if the session is disconnected, we stop trying and wait for the next cycle
|
||||
if (State == SessionState.Disconnected)
|
||||
// if the session is disconnected or a shutdown is in progress, we stop trying and wait for the next cycle
|
||||
if (State == SessionState.Disconnected || ct.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
@ -536,13 +577,16 @@ namespace OpcPublisher
|
|||
item.OpcUaClientMonitoredItem = monitoredItem;
|
||||
item.State = OpcMonitoredItemState.Monitored;
|
||||
item.EndpointUri = EndpointUri;
|
||||
Trace($"Created monitored item for node '{currentNodeId.ToString()}' in subscription with id {opcSubscription.OpcUaClientSubscription.Id} on endpoint '{EndpointUri.AbsoluteUri}'");
|
||||
Trace($"Created monitored item for node '{currentNodeId.ToString()}' in subscription with id '{opcSubscription.OpcUaClientSubscription.Id}' on endpoint '{EndpointUri.AbsoluteUri}'");
|
||||
if (item.RequestedSamplingInterval != monitoredItem.SamplingInterval)
|
||||
{
|
||||
Trace($"Sampling interval: requested: {item.RequestedSamplingInterval}; revised: {monitoredItem.SamplingInterval}");
|
||||
item.SamplingInterval = monitoredItem.SamplingInterval;
|
||||
}
|
||||
requestConfigFileUpdate = true;
|
||||
if (additionalMonitoredItemsCount++ % 50 == 0)
|
||||
{
|
||||
Trace($"Now monitoring {monitoredItemsCount + additionalMonitoredItemsCount} items in subscription with id '{opcSubscription.OpcUaClientSubscription.Id}'");
|
||||
}
|
||||
}
|
||||
catch (Exception e) when (e.GetType() == typeof(ServiceResultException))
|
||||
{
|
||||
|
@ -575,7 +619,14 @@ namespace OpcPublisher
|
|||
Trace(e, $"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUri}'");
|
||||
}
|
||||
}
|
||||
if (haveUnmonitoredItems == true)
|
||||
{
|
||||
monitoredItemsCount = opcSubscription.OpcMonitoredItems.Count(i => (i.State == OpcMonitoredItemState.Monitored));
|
||||
Trace($"Done processing unmonitored items on endpoint '{EndpointUri.AbsoluteUri}'. Now monitoring {monitoredItemsCount} items in subscription with id '{opcSubscription.OpcUaClientSubscription.Id}'.");
|
||||
}
|
||||
}
|
||||
// request a config file update, if everything is successfully monitored
|
||||
requestConfigFileUpdate = true;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -591,15 +642,15 @@ namespace OpcPublisher
|
|||
/// <summary>
|
||||
/// Checks if there are monitored nodes tagged to stop monitoring and stop monitoring.
|
||||
/// </summary>
|
||||
public async Task<bool> StopMonitoringNodes()
|
||||
public async Task<bool> StopMonitoringNodes(CancellationToken ct)
|
||||
{
|
||||
bool requestConfigFileUpdate = false;
|
||||
try
|
||||
{
|
||||
await _opcSessionSemaphore.WaitAsync();
|
||||
|
||||
// if session is not connected, return
|
||||
if (State != SessionState.Connected)
|
||||
// if session is not connected or shutdown is in progress, return
|
||||
if (State != SessionState.Connected || ct.IsCancellationRequested)
|
||||
{
|
||||
return requestConfigFileUpdate;
|
||||
}
|
||||
|
@ -637,14 +688,14 @@ namespace OpcPublisher
|
|||
/// <summary>
|
||||
/// Checks if there are subscriptions without any monitored items and remove them.
|
||||
/// </summary>
|
||||
public async Task RemoveUnusedSubscriptions()
|
||||
public async Task RemoveUnusedSubscriptions(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _opcSessionSemaphore.WaitAsync();
|
||||
|
||||
// if session is not connected, return
|
||||
if (State != SessionState.Connected)
|
||||
// if session is not connected or shutdown is in progress, return
|
||||
if (State != SessionState.Connected || ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -670,14 +721,14 @@ namespace OpcPublisher
|
|||
/// <summary>
|
||||
/// Checks if there are session without any subscriptions and remove them.
|
||||
/// </summary>
|
||||
public async Task RemoveUnusedSessions()
|
||||
public async Task RemoveUnusedSessions(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await OpcSessionsListSemaphore.WaitAsync();
|
||||
|
||||
// if session is not connected, return
|
||||
if (State != SessionState.Connected)
|
||||
// if session is not connected or shutdown is in progress, return
|
||||
if (State != SessionState.Connected || ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -805,13 +856,13 @@ namespace OpcPublisher
|
|||
/// Adds a node to be monitored. If there is no subscription with the requested publishing interval,
|
||||
/// one is created.
|
||||
/// </summary>
|
||||
public async Task AddNodeForMonitoring(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval)
|
||||
public async Task AddNodeForMonitoring(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _opcSessionSemaphore.WaitAsync();
|
||||
|
||||
if (ShutdownTokenSource.IsCancellationRequested)
|
||||
if (ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -846,7 +897,7 @@ namespace OpcPublisher
|
|||
|
||||
// update the publishing data
|
||||
// Start publishing.
|
||||
Task.Run(async () => await ConnectAndMonitorAsync());
|
||||
Task.Run(async () => await ConnectAndMonitorAsync(ct));
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
|
@ -862,13 +913,13 @@ namespace OpcPublisher
|
|||
/// <summary>
|
||||
/// Tags a monitored node to stop monitoring and remove it.
|
||||
/// </summary>
|
||||
public async Task RequestMonitorItemRemoval(NodeId nodeId, ExpandedNodeId expandedNodeId)
|
||||
public async Task RequestMonitorItemRemoval(NodeId nodeId, ExpandedNodeId expandedNodeId, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _opcSessionSemaphore.WaitAsync();
|
||||
|
||||
if (ShutdownTokenSource.IsCancellationRequested)
|
||||
if (ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -889,7 +940,7 @@ namespace OpcPublisher
|
|||
}
|
||||
|
||||
// Stop publishing.
|
||||
Task.Run(async () => await ConnectAndMonitorAsync());
|
||||
Task.Run(async () => await ConnectAndMonitorAsync(ct));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -977,7 +1028,7 @@ namespace OpcPublisher
|
|||
{
|
||||
foreach (var opcSubscription in OpcSubscriptions)
|
||||
{
|
||||
Trace($"Removing {opcSubscription.OpcUaClientSubscription.MonitoredItemCount} monitored items from subscription with id {opcSubscription.OpcUaClientSubscription.Id}.");
|
||||
Trace($"Removing {opcSubscription.OpcUaClientSubscription.MonitoredItemCount} monitored items from subscription with id '{opcSubscription.OpcUaClientSubscription.Id}'.");
|
||||
opcSubscription.OpcUaClientSubscription.RemoveItems(opcSubscription.OpcUaClientSubscription.MonitoredItems);
|
||||
}
|
||||
Trace($"Removing {OpcUaClientSession.SubscriptionCount} subscriptions from session.");
|
||||
|
|
|
@ -19,6 +19,7 @@ namespace OpcPublisher
|
|||
using static OpcStackConfiguration;
|
||||
using static PublisherNodeConfiguration;
|
||||
using static System.Console;
|
||||
using static Diagnostics;
|
||||
|
||||
public class Program
|
||||
{
|
||||
|
@ -115,16 +116,29 @@ namespace OpcPublisher
|
|||
}
|
||||
}
|
||||
},
|
||||
{ "vc|verboseconsole=", $"the output of publisher is shown on the console.\nDefault: {VerboseConsole}", (bool b) => VerboseConsole = b },
|
||||
{ "mq|monitoreditemqueuecapacity=", $"specify how many notifications of monitored items could be stored in the internal queue, if the data could not be sent quick enough to IoTHub\nMin: 1024\nDefault: {MonitoredItemsQueueCapacity}", (int i) => {
|
||||
if (i >= 1024)
|
||||
{
|
||||
MonitoredItemsQueueCapacity = i;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new OptionException("The monitoreditemqueueitems must be greater than 1024", "monitoreditemqueueitems");
|
||||
}
|
||||
}
|
||||
},
|
||||
{ "di|diagnosticsinterval=", $"shows publisher diagnostic info at the specified interval in seconds. 0 disables diagnostic output.\nDefault: {DiagnosticsInterval}", (uint u) => DiagnosticsInterval = u },
|
||||
|
||||
{ "vc|verboseconsole=", $"the output of publisher is shown on the console.\nDefault: {VerboseConsole}", (bool b) => VerboseConsole = b },
|
||||
|
||||
// IoTHub specific options
|
||||
{ "ih|iothubprotocol=", $"the protocol to use for communication with Azure IoTHub (allowed values: {string.Join(", ", Enum.GetNames(IotHubProtocol.GetType()))}).\nDefault: {Enum.GetName(IotHubProtocol.GetType(), IotHubProtocol)}",
|
||||
(Microsoft.Azure.Devices.Client.TransportType p) => IotHubProtocol = p
|
||||
},
|
||||
{ "ms|iothubmessagesize=", $"the max size of a message which can be send to IoTHub. when telemetry of this size is available it will be sent.\n0 will enforce immediate send when telemetry is available\nMin: 0\nMax: 256 * 1024\nDefault: {MaxSizeOfIoTHubMessageBytes}", (uint u) => {
|
||||
if (u >= 0 && u <= 256 * 1024)
|
||||
{ "ms|iothubmessagesize=", $"the max size of a message which can be send to IoTHub. when telemetry of this size is available it will be sent.\n0 will enforce immediate send when telemetry is available\nMin: 0\nMax: {IotHubMessageSizeMax}\nDefault: {IotHubMessageSize}", (uint u) => {
|
||||
if (u >= 0 && u <= IotHubMessageSizeMax)
|
||||
{
|
||||
MaxSizeOfIoTHubMessageBytes = u;
|
||||
IotHubMessageSize = u;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -381,7 +395,10 @@ namespace OpcPublisher
|
|||
|
||||
WriteLine("Publisher is starting up...");
|
||||
|
||||
// Init shutdown token source.
|
||||
// initialize publisher diagnostics
|
||||
Diagnostics.Init();
|
||||
|
||||
// Shutdown token sources.
|
||||
ShutdownTokenSource = new CancellationTokenSource();
|
||||
|
||||
// init OPC configuration and tracing
|
||||
|
@ -474,6 +491,9 @@ namespace OpcPublisher
|
|||
|
||||
// shutdown the IoTHub messaging
|
||||
await IotHubCommunication.Shutdown();
|
||||
|
||||
// shutdown diagnostics
|
||||
await Diagnostics.Shutdown();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -515,7 +535,7 @@ namespace OpcPublisher
|
|||
{
|
||||
// get tasks for all disconnected sessions and start them
|
||||
await OpcSessionsListSemaphore.WaitAsync();
|
||||
var singleSessionHandlerTaskList = OpcSessions.Select(s => s.ConnectAndMonitorAsync());
|
||||
var singleSessionHandlerTaskList = OpcSessions.Select(s => s.ConnectAndMonitorAsync(cancellationtoken));
|
||||
OpcSessionsListSemaphore.Release();
|
||||
await Task.WhenAll(singleSessionHandlerTaskList);
|
||||
}
|
||||
|
|
|
@ -27,6 +27,86 @@ namespace OpcPublisher
|
|||
}
|
||||
private static string _publisherNodeConfigurationFilename = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
|
||||
|
||||
public static int NumberOfOpcSessions
|
||||
{
|
||||
get
|
||||
{
|
||||
int result = 0;
|
||||
try
|
||||
{
|
||||
OpcSessionsListSemaphore.WaitAsync();
|
||||
result = OpcSessions.Count();
|
||||
}
|
||||
finally
|
||||
{
|
||||
OpcSessionsListSemaphore.Release();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public static int NumberOfConnectedOpcSessions
|
||||
{
|
||||
get
|
||||
{
|
||||
int result = 0;
|
||||
try
|
||||
{
|
||||
OpcSessionsListSemaphore.WaitAsync();
|
||||
result = OpcSessions.Count(s => s.State == OpcSession.SessionState.Connected);
|
||||
}
|
||||
finally
|
||||
{
|
||||
OpcSessionsListSemaphore.Release();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public static int NumberOfConnectedOpcSubscriptions
|
||||
{
|
||||
get
|
||||
{
|
||||
int result = 0;
|
||||
try
|
||||
{
|
||||
OpcSessionsListSemaphore.WaitAsync();
|
||||
var opcSessions = OpcSessions.Where(s => s.State == OpcSession.SessionState.Connected);
|
||||
foreach (var opcSession in opcSessions)
|
||||
{
|
||||
result += opcSession.GetNumberOfOpcSubscriptions();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
OpcSessionsListSemaphore.Release();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public static int NumberOfMonitoredItems
|
||||
{
|
||||
get
|
||||
{
|
||||
int result = 0;
|
||||
try
|
||||
{
|
||||
OpcSessionsListSemaphore.WaitAsync();
|
||||
var opcSessions = OpcSessions.Where(s => s.State == OpcSession.SessionState.Connected);
|
||||
foreach (var opcSession in opcSessions)
|
||||
{
|
||||
result += opcSession.GetNumberOfOpcMonitoredItems();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
OpcSessionsListSemaphore.Release();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
private List<NodePublishingConfiguration> _nodePublishingConfiguration;
|
||||
private static List<PublisherConfigurationFileEntry> _configurationFileEntries = new List<PublisherConfigurationFileEntry>();
|
||||
|
||||
|
|
|
@ -493,7 +493,7 @@ namespace OpcPublisher
|
|||
}
|
||||
|
||||
// add the node info to the subscription with the default publishing interval, execute syncronously
|
||||
opcSession.AddNodeForMonitoring(nodeId, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval).Wait();
|
||||
opcSession.AddNodeForMonitoring(nodeId, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval, ShutdownTokenSource.Token).Wait();
|
||||
Trace($"PublishNode: Requested to monitor item with NodeId '{nodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})");
|
||||
}
|
||||
catch (Exception e)
|
||||
|
@ -582,7 +582,7 @@ namespace OpcPublisher
|
|||
}
|
||||
|
||||
// remove the node from the sessions monitored items list.
|
||||
opcSession.RequestMonitorItemRemoval(nodeId, expandedNodeId).Wait();
|
||||
opcSession.RequestMonitorItemRemoval(nodeId, expandedNodeId, ShutdownTokenSource.Token).Wait();
|
||||
Trace("UnpublishNode: Requested to stop monitoring of node.");
|
||||
}
|
||||
catch (Exception e)
|
||||
|
|
Загрузка…
Ссылка в новой задаче