split modules, fix smaller bugs, add display name to configuration
This commit is contained in:
Родитель
9820fcaf2e
Коммит
cb8a1b3cc1
|
@ -43,6 +43,42 @@ namespace OpcPublisher
|
|||
_showDiagnosticsInfoTask = null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fetch diagnostic data.
|
||||
/// </summary>
|
||||
public static DiagnosticInfoModel GetDiagnosticInfo()
|
||||
{
|
||||
DiagnosticInfoModel diagnosticInfo = new DiagnosticInfoModel();
|
||||
|
||||
try
|
||||
{
|
||||
diagnosticInfo.PublisherStartTime = PublisherStartTime;
|
||||
diagnosticInfo.NumberOfOpcSessions = NumberOfOpcSessions;
|
||||
diagnosticInfo.NumberOfConnectedOpcSessions = NumberOfConnectedOpcSessions;
|
||||
diagnosticInfo.NumberOfConnectedOpcSubscriptions = NumberOfConnectedOpcSubscriptions;
|
||||
diagnosticInfo.NumberOfMonitoredItems = NumberOfMonitoredItems;
|
||||
diagnosticInfo.MonitoredItemsQueueCapacity = MonitoredItemsQueueCapacity;
|
||||
diagnosticInfo.MonitoredItemsQueueCount = MonitoredItemsQueueCount;
|
||||
diagnosticInfo.EnqueueCount = EnqueueCount;
|
||||
diagnosticInfo.EnqueueFailureCount = EnqueueFailureCount;
|
||||
diagnosticInfo.NumberOfEvents = NumberOfEvents;
|
||||
diagnosticInfo.SentMessages = SentMessages;
|
||||
diagnosticInfo.SentLastTime = SentLastTime;
|
||||
diagnosticInfo.SentBytes = SentBytes;
|
||||
diagnosticInfo.FailedMessages = FailedMessages;
|
||||
diagnosticInfo.TooLargeCount = TooLargeCount;
|
||||
diagnosticInfo.MissedSendIntervalCount = MissedSendIntervalCount;
|
||||
diagnosticInfo.WorkingSetMB = Process.GetCurrentProcess().WorkingSet64 / (1024 * 1024);
|
||||
diagnosticInfo.DefaultSendIntervalSeconds = DefaultSendIntervalSeconds;
|
||||
diagnosticInfo.HubMessageSize = HubMessageSize;
|
||||
diagnosticInfo.HubProtocol = HubProtocol;
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
return diagnosticInfo;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Kicks of the task to show diagnostic information each 30 seconds.
|
||||
/// </summary>
|
||||
|
@ -59,32 +95,33 @@ namespace OpcPublisher
|
|||
{
|
||||
await Task.Delay((int)DiagnosticsInterval * 1000, ct);
|
||||
|
||||
DiagnosticInfoModel diagnosticInfo = GetDiagnosticInfo();
|
||||
Logger.Information("==========================================================================");
|
||||
Logger.Information($"OpcPublisher status @ {System.DateTime.UtcNow} (started @ {PublisherStartTime})");
|
||||
Logger.Information($"OpcPublisher status @ {System.DateTime.UtcNow} (started @ {diagnosticInfo.PublisherStartTime})");
|
||||
Logger.Information("---------------------------------");
|
||||
Logger.Information($"OPC sessions: {NumberOfOpcSessions}");
|
||||
Logger.Information($"connected OPC sessions: {NumberOfConnectedOpcSessions}");
|
||||
Logger.Information($"connected OPC subscriptions: {NumberOfConnectedOpcSubscriptions}");
|
||||
Logger.Information($"OPC monitored items: {NumberOfMonitoredItems}");
|
||||
Logger.Information($"OPC sessions: {diagnosticInfo.NumberOfOpcSessions}");
|
||||
Logger.Information($"connected OPC sessions: {diagnosticInfo.NumberOfConnectedOpcSessions}");
|
||||
Logger.Information($"connected OPC subscriptions: {diagnosticInfo.NumberOfConnectedOpcSubscriptions}");
|
||||
Logger.Information($"OPC monitored items: {diagnosticInfo.NumberOfMonitoredItems}");
|
||||
Logger.Information("---------------------------------");
|
||||
Logger.Information($"monitored items queue bounded capacity: {MonitoredItemsQueueCapacity}");
|
||||
Logger.Information($"monitored items queue current items: {MonitoredItemsQueueCount}");
|
||||
Logger.Information($"monitored item notifications enqueued: {EnqueueCount}");
|
||||
Logger.Information($"monitored item notifications enqueue failure: {EnqueueFailureCount}");
|
||||
Logger.Information($"monitored item notifications dequeued: {DequeueCount}");
|
||||
Logger.Information($"monitored items queue bounded capacity: {diagnosticInfo.MonitoredItemsQueueCapacity}");
|
||||
Logger.Information($"monitored items queue current items: {diagnosticInfo.MonitoredItemsQueueCount}");
|
||||
Logger.Information($"monitored item notifications enqueued: {diagnosticInfo.EnqueueCount}");
|
||||
Logger.Information($"monitored item notifications enqueue failure: {diagnosticInfo.EnqueueFailureCount}");
|
||||
Logger.Information("---------------------------------");
|
||||
Logger.Information($"messages sent to IoTHub: {SentMessages}");
|
||||
Logger.Information($"last successful msg sent @: {SentLastTime}");
|
||||
Logger.Information($"bytes sent to IoTHub: {SentBytes}");
|
||||
Logger.Information($"avg msg size: {SentBytes / (SentMessages == 0 ? 1 : SentMessages)}");
|
||||
Logger.Information($"msg send failures: {FailedMessages}");
|
||||
Logger.Information($"messages too large to sent to IoTHub: {TooLargeCount}");
|
||||
Logger.Information($"times we missed send interval: {MissedSendIntervalCount}");
|
||||
Logger.Information($"messages sent to IoTHub: {diagnosticInfo.SentMessages}");
|
||||
Logger.Information($"last successful msg sent @: {diagnosticInfo.SentLastTime}");
|
||||
Logger.Information($"bytes sent to IoTHub: {diagnosticInfo.SentBytes}");
|
||||
Logger.Information($"avg msg size: {diagnosticInfo.SentBytes / (diagnosticInfo.SentMessages == 0 ? 1 : diagnosticInfo.SentMessages)}");
|
||||
Logger.Information($"msg send failures: {diagnosticInfo.FailedMessages}");
|
||||
Logger.Information($"messages too large to sent to IoTHub: {diagnosticInfo.TooLargeCount}");
|
||||
Logger.Information($"times we missed send interval: {diagnosticInfo.MissedSendIntervalCount}");
|
||||
Logger.Information($"number of events: {diagnosticInfo.NumberOfEvents}");
|
||||
Logger.Information("---------------------------------");
|
||||
Logger.Information($"current working set in MB: {Process.GetCurrentProcess().WorkingSet64 / (1024 * 1024)}");
|
||||
Logger.Information($"--si setting: {DefaultSendIntervalSeconds}");
|
||||
Logger.Information($"--ms setting: {HubMessageSize}");
|
||||
Logger.Information($"--ih setting: {HubProtocol}");
|
||||
Logger.Information($"current working set in MB: {diagnosticInfo.WorkingSetMB}");
|
||||
Logger.Information($"--si setting: {diagnosticInfo.DefaultSendIntervalSeconds}");
|
||||
Logger.Information($"--ms setting: {diagnosticInfo.HubMessageSize}");
|
||||
Logger.Information($"--ih setting: {diagnosticInfo.HubProtocol}");
|
||||
Logger.Information("==========================================================================");
|
||||
}
|
||||
catch
|
||||
|
|
|
@ -15,6 +15,7 @@ namespace OpcPublisher
|
|||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using static Diagnostics;
|
||||
using static OpcApplicationConfiguration;
|
||||
using static OpcPublisher.OpcMonitoredItem;
|
||||
using static OpcPublisher.PublisherNodeConfiguration;
|
||||
|
@ -40,7 +41,7 @@ namespace OpcPublisher
|
|||
|
||||
public static long MonitoredItemsQueueCount => _monitoredItemsDataQueue.Count;
|
||||
|
||||
public static long DequeueCount { get; private set; }
|
||||
public static long NumberOfEvents { get; private set; }
|
||||
|
||||
public static long MissedSendIntervalCount { get; private set; }
|
||||
|
||||
|
@ -72,7 +73,6 @@ namespace OpcPublisher
|
|||
|
||||
public static bool IotCentralMode { get; set; } = false;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Ctor for the class.
|
||||
/// </summary>
|
||||
|
@ -292,13 +292,13 @@ namespace OpcPublisher
|
|||
{
|
||||
// add the node info to the subscription with the default publishing interval, execute syncronously
|
||||
Logger.Debug($"{logPrefix} Request to monitor item with NodeId '{nodeId.ToString()}' (PublishingInterval: {publishingInterval}, SamplingInterval: {samplingInterval})");
|
||||
statusCode = await opcSession.AddNodeForMonitoringAsync(nodeId, null, publishingInterval, samplingInterval, ShutdownTokenSource.Token);
|
||||
statusCode = await opcSession.AddNodeForMonitoringAsync(nodeId, null, publishingInterval, samplingInterval, node.DisplayName, ShutdownTokenSource.Token);
|
||||
}
|
||||
else
|
||||
{
|
||||
// add the node info to the subscription with the default publishing interval, execute syncronously
|
||||
Logger.Debug($"{logPrefix} Request to monitor item with ExpandedNodeId '{expandedNodeId.ToString()}' (PublishingInterval: {publishingInterval}, SamplingInterval: {samplingInterval})");
|
||||
statusCode = await opcSession.AddNodeForMonitoringAsync(null, expandedNodeId, publishingInterval, samplingInterval, ShutdownTokenSource.Token);
|
||||
statusCode = await opcSession.AddNodeForMonitoringAsync(null, expandedNodeId, publishingInterval, samplingInterval, node.DisplayName, ShutdownTokenSource.Token);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
|
@ -330,7 +330,6 @@ namespace OpcPublisher
|
|||
return (new MethodResponse((int)statusCode));
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Handle unpublish node method call.
|
||||
/// </summary>
|
||||
|
@ -437,8 +436,6 @@ namespace OpcPublisher
|
|||
return (new MethodResponse((int)statusCode));
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Handle unpublish all nodes method call.
|
||||
/// </summary>
|
||||
|
@ -497,9 +494,10 @@ namespace OpcPublisher
|
|||
foreach (var subscription in subscriptionsToCleanup)
|
||||
{
|
||||
// loop through all monitored items
|
||||
var monitoredItemsToCleanup = subscription.OpcMonitoredItems;
|
||||
foreach (var monitoredItem in monitoredItemsToCleanup)
|
||||
var monitoredItemsToCleanup = subscription.OpcMonitoredItems.ToArray();
|
||||
for (var i = 0; i < monitoredItemsToCleanup.Length; i++)
|
||||
{
|
||||
var monitoredItem = monitoredItemsToCleanup[i];
|
||||
if (monitoredItem.ConfigType == OpcMonitoredItemConfigurationType.NodeId)
|
||||
{
|
||||
await session.RequestMonitorItemRemovalAsync(monitoredItem.ConfigNodeId, null, ShutdownTokenSource.Token);
|
||||
|
@ -689,7 +687,7 @@ namespace OpcPublisher
|
|||
{
|
||||
getConfiguredNodesOnEndpointMethodResponse.ContinuationToken = (ulong)nodeConfigVersion << 32 | actualNodeCount + startIndex;
|
||||
}
|
||||
getConfiguredNodesOnEndpointMethodResponse.Nodes = opcNodes.GetRange((int)startIndex, (int)actualNodeCount).Select(n => new NodeModel(n.Id, n.OpcPublishingInterval, n.OpcSamplingInterval)).ToList();
|
||||
getConfiguredNodesOnEndpointMethodResponse.Nodes = opcNodes.GetRange((int)startIndex, (int)actualNodeCount).Select(n => new NodeModel(n.Id, n.OpcPublishingInterval, n.OpcSamplingInterval, n.DisplayName)).ToList();
|
||||
string resultString = JsonConvert.SerializeObject(getConfiguredNodesOnEndpointMethodResponse);
|
||||
byte[] result = Encoding.UTF8.GetBytes(resultString);
|
||||
MethodResponse methodResponse = new MethodResponse(result, (int)HttpStatusCode.OK);
|
||||
|
@ -697,6 +695,33 @@ namespace OpcPublisher
|
|||
return methodResponse;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handle method call to get diagnostic information.
|
||||
/// </summary>
|
||||
static async Task<MethodResponse> HandleGetDiagnosticInfoMethodAsync(MethodRequest methodRequest, object userContext)
|
||||
{
|
||||
string logPrefix = "HandleGetDiagnosticInfoMethodAsync:";
|
||||
|
||||
// get the diagnostic info
|
||||
DiagnosticInfoModel diagnosticInfo = new DiagnosticInfoModel();
|
||||
try
|
||||
{
|
||||
diagnosticInfo = GetDiagnosticInfo();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.Error(e, $"{logPrefix} Exception");
|
||||
return (new MethodResponse((int)HttpStatusCode.InternalServerError));
|
||||
}
|
||||
|
||||
// build response
|
||||
string resultString = JsonConvert.SerializeObject(diagnosticInfo);
|
||||
byte[] result = Encoding.UTF8.GetBytes(resultString);
|
||||
MethodResponse methodResponse = new MethodResponse(result, (int)HttpStatusCode.OK);
|
||||
Logger.Information($"{logPrefix} Success returning diagnostic info!");
|
||||
return methodResponse;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes internal message processing.
|
||||
/// </summary>
|
||||
|
@ -994,7 +1019,7 @@ namespace OpcPublisher
|
|||
jsonMessage = await CreateJsonMessageAsync(messageData);
|
||||
}
|
||||
|
||||
DequeueCount++;
|
||||
NumberOfEvents++;
|
||||
jsonMessageSize = Encoding.UTF8.GetByteCount(jsonMessage.ToString());
|
||||
|
||||
// sanity check that the user has set a large enough messages size
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
using Newtonsoft.Json;
|
||||
using Microsoft.Azure.Devices.Client;
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace OpcPublisher
|
||||
|
@ -15,11 +17,12 @@ namespace OpcPublisher
|
|||
|
||||
public class NodeModel
|
||||
{
|
||||
public NodeModel(string id, int? opcPublishingInterval = null, int? opcSamplingInterval = null)
|
||||
public NodeModel(string id, int? opcPublishingInterval = null, int? opcSamplingInterval = null, string displayName = null)
|
||||
{
|
||||
Id = id;
|
||||
OpcPublishingInterval = opcPublishingInterval;
|
||||
OpcSamplingInterval = opcSamplingInterval;
|
||||
DisplayName = displayName;
|
||||
}
|
||||
|
||||
public string Id { get; set; }
|
||||
|
@ -29,6 +32,76 @@ namespace OpcPublisher
|
|||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public int? OpcSamplingInterval { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public string DisplayName { get; set; }
|
||||
}
|
||||
|
||||
public class DiagnosticInfoModel
|
||||
{
|
||||
public DiagnosticInfoModel()
|
||||
{
|
||||
}
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public DateTime PublisherStartTime { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public int NumberOfOpcSessions { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public int NumberOfConnectedOpcSessions { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public int NumberOfConnectedOpcSubscriptions { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public int NumberOfMonitoredItems { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public int MonitoredItemsQueueCapacity { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long MonitoredItemsQueueCount { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long EnqueueCount { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long EnqueueFailureCount { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long NumberOfEvents { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long SentMessages { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public DateTime SentLastTime { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long SentBytes { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long FailedMessages { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long TooLargeCount { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long MissedSendIntervalCount { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public long WorkingSetMB { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public int DefaultSendIntervalSeconds { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public uint HubMessageSize { get; set; }
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Include)]
|
||||
public TransportType HubProtocol { get; set; }
|
||||
}
|
||||
|
||||
public class PublishNodesMethodRequestModel
|
||||
|
@ -77,7 +150,6 @@ namespace OpcPublisher
|
|||
public string EndpointUrl { get; set; }
|
||||
}
|
||||
|
||||
|
||||
public class GetConfiguredEndpointsMethodRequestModel
|
||||
{
|
||||
public GetConfiguredEndpointsMethodRequestModel(ulong? continuationToken = null)
|
||||
|
|
|
@ -0,0 +1,388 @@
|
|||
|
||||
using Opc.Ua.Client;
|
||||
using System;
|
||||
using System.Linq;
|
||||
|
||||
namespace OpcPublisher
|
||||
{
|
||||
using Opc.Ua;
|
||||
using static HubCommunication;
|
||||
using static OpcApplicationConfiguration;
|
||||
using static OpcPublisher.PublisherTelemetryConfiguration;
|
||||
using static Program;
|
||||
|
||||
/// <summary>
|
||||
/// Class to manage the OPC monitored items, which are the nodes we need to publish.
|
||||
/// </summary>
|
||||
public class OpcMonitoredItem
|
||||
{
|
||||
public enum OpcMonitoredItemState
|
||||
{
|
||||
Unmonitored = 0,
|
||||
UnmonitoredNamespaceUpdateRequested,
|
||||
Monitored,
|
||||
RemovalRequested,
|
||||
}
|
||||
|
||||
public enum OpcMonitoredItemConfigurationType
|
||||
{
|
||||
NodeId = 0,
|
||||
ExpandedNodeId
|
||||
}
|
||||
|
||||
public string DisplayName { get; set; }
|
||||
public OpcMonitoredItemState State { get; set; }
|
||||
public uint AttributeId { get; set; }
|
||||
public MonitoringMode MonitoringMode { get; set; }
|
||||
public int RequestedSamplingInterval { get; set; }
|
||||
public int SamplingInterval { get; set; }
|
||||
public uint QueueSize { get; set; }
|
||||
public bool DiscardOldest { get; set; }
|
||||
public MonitoredItemNotificationEventHandler Notification { get; set; }
|
||||
public Uri EndpointUrl { get; set; }
|
||||
public MonitoredItem OpcUaClientMonitoredItem { get; set; }
|
||||
public NodeId ConfigNodeId { get; set; }
|
||||
public ExpandedNodeId ConfigExpandedNodeId { get; set; }
|
||||
public string OriginalId { get; set; }
|
||||
public OpcMonitoredItemConfigurationType ConfigType { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Ctor using NodeId (ns syntax for namespace).
|
||||
/// </summary>
|
||||
public OpcMonitoredItem(NodeId nodeId, Uri sessionEndpointUrl, string displayName)
|
||||
{
|
||||
ConfigNodeId = nodeId;
|
||||
ConfigExpandedNodeId = null;
|
||||
OriginalId = nodeId.ToString();
|
||||
ConfigType = OpcMonitoredItemConfigurationType.NodeId;
|
||||
Init(sessionEndpointUrl);
|
||||
State = OpcMonitoredItemState.Unmonitored;
|
||||
DisplayName = displayName;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ctor using ExpandedNodeId (nsu syntax for namespace).
|
||||
/// </summary>
|
||||
public OpcMonitoredItem(ExpandedNodeId expandedNodeId, Uri sessionEndpointUrl, string displayName)
|
||||
{
|
||||
ConfigNodeId = null;
|
||||
ConfigExpandedNodeId = expandedNodeId;
|
||||
OriginalId = expandedNodeId.ToString();
|
||||
ConfigType = OpcMonitoredItemConfigurationType.ExpandedNodeId;
|
||||
Init(sessionEndpointUrl);
|
||||
State = OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested;
|
||||
DisplayName = displayName;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if the monitored item does monitor the node described by the given objects.
|
||||
/// </summary>
|
||||
public bool IsMonitoringThisNode(NodeId nodeId, ExpandedNodeId expandedNodeId, NamespaceTable namespaceTable)
|
||||
{
|
||||
if (State == OpcMonitoredItemState.RemovalRequested)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (ConfigType == OpcMonitoredItemConfigurationType.NodeId)
|
||||
{
|
||||
if (nodeId != null)
|
||||
{
|
||||
if (ConfigNodeId == nodeId)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (expandedNodeId != null)
|
||||
{
|
||||
string namespaceUri = namespaceTable.ToArray().ElementAtOrDefault(ConfigNodeId.NamespaceIndex);
|
||||
if (expandedNodeId.NamespaceUri != null && expandedNodeId.NamespaceUri.Equals(namespaceUri, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
if (expandedNodeId.Identifier.ToString().Equals(ConfigNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId)
|
||||
{
|
||||
if (nodeId != null)
|
||||
{
|
||||
int namespaceIndex = namespaceTable.GetIndex(ConfigExpandedNodeId?.NamespaceUri);
|
||||
if (nodeId.NamespaceIndex == namespaceIndex)
|
||||
{
|
||||
if (nodeId.Identifier.ToString().Equals(ConfigExpandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (expandedNodeId != null)
|
||||
{
|
||||
if (ConfigExpandedNodeId.NamespaceUri != null &&
|
||||
ConfigExpandedNodeId.NamespaceUri.Equals(expandedNodeId.NamespaceUri, StringComparison.OrdinalIgnoreCase) &&
|
||||
ConfigExpandedNodeId.Identifier.ToString().Equals(expandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Class used to pass data from the MonitoredItem notification to the hub message processing.
|
||||
/// </summary>
|
||||
public class MessageData
|
||||
{
|
||||
public string EndpointUrl;
|
||||
public string NodeId;
|
||||
public string ApplicationUri;
|
||||
public string DisplayName;
|
||||
public string Value;
|
||||
public string SourceTimestamp;
|
||||
public uint? StatusCode;
|
||||
public string Status;
|
||||
public bool PreserveValueQuotes;
|
||||
|
||||
public MessageData()
|
||||
{
|
||||
EndpointUrl = null;
|
||||
NodeId = null;
|
||||
ApplicationUri = null;
|
||||
DisplayName = null;
|
||||
Value = null;
|
||||
StatusCode = null;
|
||||
SourceTimestamp = null;
|
||||
Status = null;
|
||||
PreserveValueQuotes = false;
|
||||
}
|
||||
|
||||
public void ApplyPatterns(EndpointTelemetryConfiguration telemetryConfiguration)
|
||||
{
|
||||
if (telemetryConfiguration.EndpointUrl.Publish == true)
|
||||
{
|
||||
EndpointUrl = telemetryConfiguration.EndpointUrl.PatternMatch(EndpointUrl);
|
||||
}
|
||||
if (telemetryConfiguration.NodeId.Publish == true)
|
||||
{
|
||||
NodeId = telemetryConfiguration.NodeId.PatternMatch(NodeId);
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.ApplicationUri.Publish == true)
|
||||
{
|
||||
ApplicationUri = telemetryConfiguration.MonitoredItem.ApplicationUri.PatternMatch(ApplicationUri);
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.DisplayName.Publish == true)
|
||||
{
|
||||
DisplayName = telemetryConfiguration.MonitoredItem.DisplayName.PatternMatch(DisplayName);
|
||||
}
|
||||
if (telemetryConfiguration.Value.Value.Publish == true)
|
||||
{
|
||||
Value = telemetryConfiguration.Value.Value.PatternMatch(Value);
|
||||
}
|
||||
if (telemetryConfiguration.Value.SourceTimestamp.Publish == true)
|
||||
{
|
||||
SourceTimestamp = telemetryConfiguration.Value.SourceTimestamp.PatternMatch(SourceTimestamp);
|
||||
}
|
||||
if (telemetryConfiguration.Value.StatusCode.Publish == true && StatusCode != null)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(telemetryConfiguration.Value.StatusCode.Pattern))
|
||||
{
|
||||
Logger.Information($"'Pattern' settngs for StatusCode are ignored.");
|
||||
}
|
||||
}
|
||||
if (telemetryConfiguration.Value.Status.Publish == true)
|
||||
{
|
||||
Status = telemetryConfiguration.Value.Status.PatternMatch(Status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// The notification that the data for a monitored item has changed on an OPC UA server.
|
||||
/// </summary>
|
||||
public void MonitoredItem_Notification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs args)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (args == null || args.NotificationValue == null || monitoredItem == null || monitoredItem.Subscription == null || monitoredItem.Subscription.Session == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MonitoredItemNotification notification = args.NotificationValue as MonitoredItemNotification;
|
||||
if (notification == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
DataValue value = notification.Value as DataValue;
|
||||
if (value == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MessageData messageData = new MessageData();
|
||||
if (IotCentralMode)
|
||||
{
|
||||
// for IoTCentral we use the DisplayName as the key in the telemetry and the Value as the value.
|
||||
if (monitoredItem.DisplayName != null)
|
||||
{
|
||||
// use the DisplayName as reported in the MonitoredItem
|
||||
messageData.DisplayName = monitoredItem.DisplayName;
|
||||
}
|
||||
if (value.Value != null)
|
||||
{
|
||||
// use the Value as reported in the notification event argument encoded with the OPC UA JSON endcoder
|
||||
JsonEncoder encoder = new JsonEncoder(monitoredItem.Subscription.Session.MessageContext, false);
|
||||
value.ServerTimestamp = DateTime.MinValue;
|
||||
value.SourceTimestamp = DateTime.MinValue;
|
||||
value.StatusCode = StatusCodes.Good;
|
||||
encoder.WriteDataValue("Value", value);
|
||||
string valueString = encoder.CloseAndReturnText();
|
||||
// we only want the value string, search for everything till the real value starts
|
||||
// and get it
|
||||
string marker = "{\"Value\":{\"Value\":";
|
||||
int markerStart = valueString.IndexOf(marker);
|
||||
messageData.PreserveValueQuotes = true;
|
||||
if (markerStart >= 0)
|
||||
{
|
||||
// we either have a value in quotes or just a value
|
||||
int valueLength;
|
||||
int valueStart = marker.Length;
|
||||
if (valueString.IndexOf("\"", valueStart) >= 0)
|
||||
{
|
||||
// value is in quotes and two closing curly brackets at the end
|
||||
valueStart++;
|
||||
valueLength = valueString.Length - valueStart - 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
// value is without quotes with two curly brackets at the end
|
||||
valueLength = valueString.Length - marker.Length - 2;
|
||||
messageData.PreserveValueQuotes = false;
|
||||
}
|
||||
messageData.Value = valueString.Substring(valueStart, valueLength);
|
||||
}
|
||||
Logger.Debug($" IoTCentral key: {messageData.DisplayName}");
|
||||
Logger.Debug($" IoTCentral values: {messageData.Value}");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// update the required message data to pass only the required data to HubCommunication
|
||||
EndpointTelemetryConfiguration telemetryConfiguration = GetEndpointTelemetryConfiguration(EndpointUrl.AbsoluteUri);
|
||||
|
||||
// the endpoint URL is required to allow HubCommunication lookup the telemetry configuration
|
||||
messageData.EndpointUrl = EndpointUrl.AbsoluteUri;
|
||||
if (telemetryConfiguration.NodeId.Publish == true)
|
||||
{
|
||||
messageData.NodeId = OriginalId;
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.ApplicationUri.Publish == true)
|
||||
{
|
||||
messageData.ApplicationUri = (monitoredItem.Subscription.Session.Endpoint.Server.ApplicationUri + (string.IsNullOrEmpty(OpcSession.PublisherSite) ? "" : $":{OpcSession.PublisherSite}"));
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.DisplayName.Publish == true && monitoredItem.DisplayName != null)
|
||||
{
|
||||
// use the DisplayName as reported in the MonitoredItem
|
||||
messageData.DisplayName = monitoredItem.DisplayName;
|
||||
}
|
||||
if (telemetryConfiguration.Value.SourceTimestamp.Publish == true && value.SourceTimestamp != null)
|
||||
{
|
||||
// use the SourceTimestamp as reported in the notification event argument in ISO8601 format
|
||||
messageData.SourceTimestamp = value.SourceTimestamp.ToString("o");
|
||||
}
|
||||
if (telemetryConfiguration.Value.StatusCode.Publish == true && value.StatusCode != null)
|
||||
{
|
||||
// use the StatusCode as reported in the notification event argument
|
||||
messageData.StatusCode = value.StatusCode.Code;
|
||||
}
|
||||
if (telemetryConfiguration.Value.Status.Publish == true && value.StatusCode != null)
|
||||
{
|
||||
// use the StatusCode as reported in the notification event argument to lookup the symbolic name
|
||||
messageData.Status = StatusCode.LookupSymbolicId(value.StatusCode.Code);
|
||||
}
|
||||
if (telemetryConfiguration.Value.Value.Publish == true && value.Value != null)
|
||||
{
|
||||
// use the Value as reported in the notification event argument encoded with the OPC UA JSON endcoder
|
||||
JsonEncoder encoder = new JsonEncoder(monitoredItem.Subscription.Session.MessageContext, false);
|
||||
value.ServerTimestamp = DateTime.MinValue;
|
||||
value.SourceTimestamp = DateTime.MinValue;
|
||||
value.StatusCode = StatusCodes.Good;
|
||||
encoder.WriteDataValue("Value", value);
|
||||
string valueString = encoder.CloseAndReturnText();
|
||||
// we only want the value string, search for everything till the real value starts
|
||||
// and get it
|
||||
string marker = "{\"Value\":{\"Value\":";
|
||||
int markerStart = valueString.IndexOf(marker);
|
||||
messageData.PreserveValueQuotes = true;
|
||||
if (markerStart >= 0)
|
||||
{
|
||||
// we either have a value in quotes or just a value
|
||||
int valueLength;
|
||||
int valueStart = marker.Length;
|
||||
if (valueString.IndexOf("\"", valueStart) >= 0)
|
||||
{
|
||||
// value is in quotes and two closing curly brackets at the end
|
||||
valueStart++;
|
||||
valueLength = valueString.Length - valueStart - 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
// value is without quotes with two curly brackets at the end
|
||||
valueLength = valueString.Length - marker.Length - 2;
|
||||
messageData.PreserveValueQuotes = false;
|
||||
}
|
||||
messageData.Value = valueString.Substring(valueStart, valueLength);
|
||||
}
|
||||
}
|
||||
|
||||
// currently the pattern processing is done here, which adds runtime to the notification processing.
|
||||
// In case of perf issues it can be also done in CreateJsonMessageAsync of IoTHubMessaging.cs.
|
||||
|
||||
// apply patterns
|
||||
messageData.ApplyPatterns(telemetryConfiguration);
|
||||
|
||||
Logger.Debug($" ApplicationUri: {messageData.ApplicationUri}");
|
||||
Logger.Debug($" EndpointUrl: {messageData.EndpointUrl}");
|
||||
Logger.Debug($" DisplayName: {messageData.DisplayName}");
|
||||
Logger.Debug($" Value: {messageData.Value}");
|
||||
}
|
||||
|
||||
// add message to fifo send queue
|
||||
if (monitoredItem.Subscription == null)
|
||||
{
|
||||
Logger.Debug($"Subscription already removed. No more details available.");
|
||||
}
|
||||
else
|
||||
{
|
||||
Logger.Debug($"Enqueue a new message from subscription {(monitoredItem.Subscription == null ? "removed" : monitoredItem.Subscription.Id.ToString())}");
|
||||
Logger.Debug($" with publishing interval: {monitoredItem.Subscription.PublishingInterval} and sampling interval: {monitoredItem.SamplingInterval}):");
|
||||
}
|
||||
HubCommunication.Enqueue(messageData);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.Error(e, "Error processing monitored item notification");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Init instance variables.
|
||||
/// </summary>
|
||||
private void Init(Uri sessionEndpointUrl)
|
||||
{
|
||||
State = OpcMonitoredItemState.Unmonitored;
|
||||
DisplayName = string.Empty;
|
||||
AttributeId = Attributes.Value;
|
||||
MonitoringMode = MonitoringMode.Reporting;
|
||||
RequestedSamplingInterval = OpcSamplingInterval;
|
||||
QueueSize = 0;
|
||||
DiscardOldest = true;
|
||||
Notification = new MonitoredItemNotificationEventHandler(MonitoredItem_Notification);
|
||||
EndpointUrl = sessionEndpointUrl;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,405 +11,12 @@ namespace OpcPublisher
|
|||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using static HubCommunication;
|
||||
using static OpcApplicationConfiguration;
|
||||
using static OpcPublisher.OpcMonitoredItem;
|
||||
using static OpcPublisher.PublisherTelemetryConfiguration;
|
||||
using static OpcApplicationConfiguration;
|
||||
using static Program;
|
||||
using static PublisherNodeConfiguration;
|
||||
|
||||
/// <summary>
|
||||
/// Class to manage the OPC monitored items, which are the nodes we need to publish.
|
||||
/// </summary>
|
||||
public class OpcMonitoredItem
|
||||
{
|
||||
public enum OpcMonitoredItemState
|
||||
{
|
||||
Unmonitored = 0,
|
||||
UnmonitoredNamespaceUpdateRequested,
|
||||
Monitored,
|
||||
RemovalRequested,
|
||||
}
|
||||
|
||||
public enum OpcMonitoredItemConfigurationType
|
||||
{
|
||||
NodeId = 0,
|
||||
ExpandedNodeId
|
||||
}
|
||||
|
||||
public string DisplayName { get; set; }
|
||||
public OpcMonitoredItemState State { get; set; }
|
||||
public uint AttributeId { get; set; }
|
||||
public MonitoringMode MonitoringMode { get; set; }
|
||||
public int RequestedSamplingInterval { get; set; }
|
||||
public int SamplingInterval { get; set; }
|
||||
public uint QueueSize { get; set; }
|
||||
public bool DiscardOldest { get; set; }
|
||||
public MonitoredItemNotificationEventHandler Notification { get; set; }
|
||||
public Uri EndpointUrl { get; set; }
|
||||
public MonitoredItem OpcUaClientMonitoredItem { get; set; }
|
||||
public NodeId ConfigNodeId { get; set; }
|
||||
public ExpandedNodeId ConfigExpandedNodeId { get; set; }
|
||||
public string OriginalId { get; set; }
|
||||
public OpcMonitoredItemConfigurationType ConfigType { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Ctor using NodeId (ns syntax for namespace).
|
||||
/// </summary>
|
||||
public OpcMonitoredItem(NodeId nodeId, Uri sessionEndpointUrl)
|
||||
{
|
||||
ConfigNodeId = nodeId;
|
||||
ConfigExpandedNodeId = null;
|
||||
OriginalId = nodeId.ToString();
|
||||
ConfigType = OpcMonitoredItemConfigurationType.NodeId;
|
||||
Init(sessionEndpointUrl);
|
||||
State = OpcMonitoredItemState.Unmonitored;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ctor using ExpandedNodeId (nsu syntax for namespace).
|
||||
/// </summary>
|
||||
public OpcMonitoredItem(ExpandedNodeId expandedNodeId, Uri sessionEndpointUrl)
|
||||
{
|
||||
ConfigNodeId = null;
|
||||
ConfigExpandedNodeId = expandedNodeId;
|
||||
OriginalId = expandedNodeId.ToString();
|
||||
ConfigType = OpcMonitoredItemConfigurationType.ExpandedNodeId;
|
||||
Init(sessionEndpointUrl);
|
||||
State = OpcMonitoredItemState.UnmonitoredNamespaceUpdateRequested;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if the monitored item does monitor the node described by the given objects.
|
||||
/// </summary>
|
||||
public bool IsMonitoringThisNode(NodeId nodeId, ExpandedNodeId expandedNodeId, NamespaceTable namespaceTable)
|
||||
{
|
||||
if (State == OpcMonitoredItemState.RemovalRequested)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (ConfigType == OpcMonitoredItemConfigurationType.NodeId)
|
||||
{
|
||||
if (nodeId != null)
|
||||
{
|
||||
if (ConfigNodeId == nodeId)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (expandedNodeId != null)
|
||||
{
|
||||
string namespaceUri = namespaceTable.ToArray().ElementAtOrDefault(ConfigNodeId.NamespaceIndex);
|
||||
if (expandedNodeId.NamespaceUri != null && expandedNodeId.NamespaceUri.Equals(namespaceUri, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
if (expandedNodeId.Identifier.ToString().Equals(ConfigNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ConfigType == OpcMonitoredItemConfigurationType.ExpandedNodeId)
|
||||
{
|
||||
if (nodeId != null)
|
||||
{
|
||||
int namespaceIndex = namespaceTable.GetIndex(ConfigExpandedNodeId?.NamespaceUri);
|
||||
if (nodeId.NamespaceIndex == namespaceIndex)
|
||||
{
|
||||
if (nodeId.Identifier.ToString().Equals(ConfigExpandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (expandedNodeId != null)
|
||||
{
|
||||
if (ConfigExpandedNodeId.NamespaceUri != null &&
|
||||
ConfigExpandedNodeId.NamespaceUri.Equals(expandedNodeId.NamespaceUri, StringComparison.OrdinalIgnoreCase) &&
|
||||
ConfigExpandedNodeId.Identifier.ToString().Equals(expandedNodeId.Identifier.ToString(), StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Class used to pass data from the MonitoredItem notification to the hub message processing.
|
||||
/// </summary>
|
||||
public class MessageData
|
||||
{
|
||||
public string EndpointUrl;
|
||||
public string NodeId;
|
||||
public string ApplicationUri;
|
||||
public string DisplayName;
|
||||
public string Value;
|
||||
public string SourceTimestamp;
|
||||
public uint? StatusCode;
|
||||
public string Status;
|
||||
public bool PreserveValueQuotes;
|
||||
|
||||
public MessageData()
|
||||
{
|
||||
EndpointUrl = null;
|
||||
NodeId = null;
|
||||
ApplicationUri = null;
|
||||
DisplayName = null;
|
||||
Value = null;
|
||||
StatusCode = null;
|
||||
SourceTimestamp = null;
|
||||
Status = null;
|
||||
PreserveValueQuotes = false;
|
||||
}
|
||||
|
||||
public void ApplyPatterns(EndpointTelemetryConfiguration telemetryConfiguration)
|
||||
{
|
||||
if (telemetryConfiguration.EndpointUrl.Publish == true)
|
||||
{
|
||||
EndpointUrl = telemetryConfiguration.EndpointUrl.PatternMatch(EndpointUrl);
|
||||
}
|
||||
if (telemetryConfiguration.NodeId.Publish == true)
|
||||
{
|
||||
NodeId = telemetryConfiguration.NodeId.PatternMatch(NodeId);
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.ApplicationUri.Publish == true)
|
||||
{
|
||||
ApplicationUri = telemetryConfiguration.MonitoredItem.ApplicationUri.PatternMatch(ApplicationUri);
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.DisplayName.Publish == true)
|
||||
{
|
||||
DisplayName = telemetryConfiguration.MonitoredItem.DisplayName.PatternMatch(DisplayName);
|
||||
}
|
||||
if (telemetryConfiguration.Value.Value.Publish == true)
|
||||
{
|
||||
Value = telemetryConfiguration.Value.Value.PatternMatch(Value);
|
||||
}
|
||||
if (telemetryConfiguration.Value.SourceTimestamp.Publish == true)
|
||||
{
|
||||
SourceTimestamp = telemetryConfiguration.Value.SourceTimestamp.PatternMatch(SourceTimestamp);
|
||||
}
|
||||
if (telemetryConfiguration.Value.StatusCode.Publish == true && StatusCode != null)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(telemetryConfiguration.Value.StatusCode.Pattern))
|
||||
{
|
||||
Logger.Information($"'Pattern' settngs for StatusCode are ignored.");
|
||||
}
|
||||
}
|
||||
if (telemetryConfiguration.Value.Status.Publish == true)
|
||||
{
|
||||
Status = telemetryConfiguration.Value.Status.PatternMatch(Status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// The notification that the data for a monitored item has changed on an OPC UA server.
|
||||
/// </summary>
|
||||
public void MonitoredItem_Notification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs args)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (args == null || args.NotificationValue == null || monitoredItem == null || monitoredItem.Subscription == null || monitoredItem.Subscription.Session == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MonitoredItemNotification notification = args.NotificationValue as MonitoredItemNotification;
|
||||
if (notification == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
DataValue value = notification.Value as DataValue;
|
||||
if (value == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MessageData messageData = new MessageData();
|
||||
if (IotCentralMode)
|
||||
{
|
||||
// for IoTCentral we use the DisplayName as the key in the telemetry and the Value as the value.
|
||||
if (monitoredItem.DisplayName != null)
|
||||
{
|
||||
// use the DisplayName as reported in the MonitoredItem
|
||||
messageData.DisplayName = monitoredItem.DisplayName;
|
||||
}
|
||||
if (value.Value != null)
|
||||
{
|
||||
// use the Value as reported in the notification event argument encoded with the OPC UA JSON endcoder
|
||||
JsonEncoder encoder = new JsonEncoder(monitoredItem.Subscription.Session.MessageContext, false);
|
||||
value.ServerTimestamp = DateTime.MinValue;
|
||||
value.SourceTimestamp = DateTime.MinValue;
|
||||
value.StatusCode = StatusCodes.Good;
|
||||
encoder.WriteDataValue("Value", value);
|
||||
string valueString = encoder.CloseAndReturnText();
|
||||
// we only want the value string, search for everything till the real value starts
|
||||
// and get it
|
||||
string marker = "{\"Value\":{\"Value\":";
|
||||
int markerStart = valueString.IndexOf(marker);
|
||||
messageData.PreserveValueQuotes = true;
|
||||
if (markerStart >= 0)
|
||||
{
|
||||
// we either have a value in quotes or just a value
|
||||
int valueLength;
|
||||
int valueStart = marker.Length;
|
||||
if (valueString.IndexOf("\"", valueStart) >= 0)
|
||||
{
|
||||
// value is in quotes and two closing curly brackets at the end
|
||||
valueStart++;
|
||||
valueLength = valueString.Length - valueStart - 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
// value is without quotes with two curly brackets at the end
|
||||
valueLength = valueString.Length - marker.Length - 2;
|
||||
messageData.PreserveValueQuotes = false;
|
||||
}
|
||||
messageData.Value = valueString.Substring(valueStart, valueLength);
|
||||
}
|
||||
Logger.Debug($" IoTCentral key: {messageData.DisplayName}");
|
||||
Logger.Debug($" IoTCentral values: {messageData.Value}");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// update the required message data to pass only the required data to HubCommunication
|
||||
EndpointTelemetryConfiguration telemetryConfiguration = GetEndpointTelemetryConfiguration(EndpointUrl.AbsoluteUri);
|
||||
|
||||
// the endpoint URL is required to allow HubCommunication lookup the telemetry configuration
|
||||
messageData.EndpointUrl = EndpointUrl.AbsoluteUri;
|
||||
if (telemetryConfiguration.NodeId.Publish == true)
|
||||
{
|
||||
messageData.NodeId = OriginalId;
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.ApplicationUri.Publish == true)
|
||||
{
|
||||
messageData.ApplicationUri = (monitoredItem.Subscription.Session.Endpoint.Server.ApplicationUri + (string.IsNullOrEmpty(OpcSession.PublisherSite) ? "" : $":{OpcSession.PublisherSite}"));
|
||||
}
|
||||
if (telemetryConfiguration.MonitoredItem.DisplayName.Publish == true && monitoredItem.DisplayName != null)
|
||||
{
|
||||
// use the DisplayName as reported in the MonitoredItem
|
||||
messageData.DisplayName = monitoredItem.DisplayName;
|
||||
}
|
||||
if (telemetryConfiguration.Value.SourceTimestamp.Publish == true && value.SourceTimestamp != null)
|
||||
{
|
||||
// use the SourceTimestamp as reported in the notification event argument in ISO8601 format
|
||||
messageData.SourceTimestamp = value.SourceTimestamp.ToString("o");
|
||||
}
|
||||
if (telemetryConfiguration.Value.StatusCode.Publish == true && value.StatusCode != null)
|
||||
{
|
||||
// use the StatusCode as reported in the notification event argument
|
||||
messageData.StatusCode = value.StatusCode.Code;
|
||||
}
|
||||
if (telemetryConfiguration.Value.Status.Publish == true && value.StatusCode != null)
|
||||
{
|
||||
// use the StatusCode as reported in the notification event argument to lookup the symbolic name
|
||||
messageData.Status = StatusCode.LookupSymbolicId(value.StatusCode.Code);
|
||||
}
|
||||
if (telemetryConfiguration.Value.Value.Publish == true && value.Value != null)
|
||||
{
|
||||
// use the Value as reported in the notification event argument encoded with the OPC UA JSON endcoder
|
||||
JsonEncoder encoder = new JsonEncoder(monitoredItem.Subscription.Session.MessageContext, false);
|
||||
value.ServerTimestamp = DateTime.MinValue;
|
||||
value.SourceTimestamp = DateTime.MinValue;
|
||||
value.StatusCode = StatusCodes.Good;
|
||||
encoder.WriteDataValue("Value", value);
|
||||
string valueString = encoder.CloseAndReturnText();
|
||||
// we only want the value string, search for everything till the real value starts
|
||||
// and get it
|
||||
string marker = "{\"Value\":{\"Value\":";
|
||||
int markerStart = valueString.IndexOf(marker);
|
||||
messageData.PreserveValueQuotes = true;
|
||||
if (markerStart >= 0)
|
||||
{
|
||||
// we either have a value in quotes or just a value
|
||||
int valueLength;
|
||||
int valueStart = marker.Length;
|
||||
if (valueString.IndexOf("\"", valueStart) >= 0)
|
||||
{
|
||||
// value is in quotes and two closing curly brackets at the end
|
||||
valueStart++;
|
||||
valueLength = valueString.Length - valueStart - 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
// value is without quotes with two curly brackets at the end
|
||||
valueLength = valueString.Length - marker.Length - 2;
|
||||
messageData.PreserveValueQuotes = false;
|
||||
}
|
||||
messageData.Value = valueString.Substring(valueStart, valueLength);
|
||||
}
|
||||
}
|
||||
|
||||
// currently the pattern processing is done here, which adds runtime to the notification processing.
|
||||
// In case of perf issues it can be also done in CreateJsonMessageAsync of IoTHubMessaging.cs.
|
||||
|
||||
// apply patterns
|
||||
messageData.ApplyPatterns(telemetryConfiguration);
|
||||
|
||||
Logger.Debug($" ApplicationUri: {messageData.ApplicationUri}");
|
||||
Logger.Debug($" EndpointUrl: {messageData.EndpointUrl}");
|
||||
Logger.Debug($" DisplayName: {messageData.DisplayName}");
|
||||
Logger.Debug($" Value: {messageData.Value}");
|
||||
}
|
||||
|
||||
// add message to fifo send queue
|
||||
if (monitoredItem.Subscription == null)
|
||||
{
|
||||
Logger.Debug($"Subscription already removed. No more details available.");
|
||||
}
|
||||
else
|
||||
{
|
||||
Logger.Debug($"Enqueue a new message from subscription {(monitoredItem.Subscription == null ? "removed" : monitoredItem.Subscription.Id.ToString())}");
|
||||
Logger.Debug($" with publishing interval: {monitoredItem.Subscription.PublishingInterval} and sampling interval: {monitoredItem.SamplingInterval}):");
|
||||
}
|
||||
HubCommunication.Enqueue(messageData);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.Error(e, "Error processing monitored item notification");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Init instance variables.
|
||||
/// </summary>
|
||||
private void Init(Uri sessionEndpointUrl)
|
||||
{
|
||||
State = OpcMonitoredItemState.Unmonitored;
|
||||
DisplayName = string.Empty;
|
||||
AttributeId = Attributes.Value;
|
||||
MonitoringMode = MonitoringMode.Reporting;
|
||||
RequestedSamplingInterval = OpcSamplingInterval;
|
||||
QueueSize = 0;
|
||||
DiscardOldest = true;
|
||||
Notification = new MonitoredItemNotificationEventHandler(MonitoredItem_Notification);
|
||||
EndpointUrl = sessionEndpointUrl;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Class to manage OPC subscriptions. We create a subscription for each different publishing interval
|
||||
/// on an Endpoint.
|
||||
/// </summary>
|
||||
public class OpcSubscription
|
||||
{
|
||||
public List<OpcMonitoredItem> OpcMonitoredItems;
|
||||
public int RequestedPublishingInterval;
|
||||
public double PublishingInterval;
|
||||
public Subscription OpcUaClientSubscription;
|
||||
|
||||
public OpcSubscription(int? publishingInterval)
|
||||
{
|
||||
RequestedPublishingInterval = publishingInterval ?? OpcPublishingInterval;
|
||||
PublishingInterval = RequestedPublishingInterval;
|
||||
OpcMonitoredItems = new List<OpcMonitoredItem>();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Class to manage OPC sessions.
|
||||
/// </summary>
|
||||
|
@ -481,8 +88,7 @@ namespace OpcPublisher
|
|||
sessionLocked = LockSessionAsync().Result;
|
||||
if (sessionLocked)
|
||||
{
|
||||
var subscriptions = OpcSessions.SelectMany(s => s.OpcSubscriptions);
|
||||
foreach (var subscription in subscriptions)
|
||||
foreach (var subscription in OpcSubscriptions)
|
||||
{
|
||||
result += subscription.OpcMonitoredItems.Count(i => i.State == OpcMonitoredItemState.Monitored);
|
||||
}
|
||||
|
@ -789,7 +395,9 @@ namespace OpcPublisher
|
|||
}
|
||||
|
||||
// if configured, get the DisplayName for the node, otherwise use the nodeId
|
||||
Opc.Ua.Node node;
|
||||
Node node;
|
||||
if (string.IsNullOrEmpty(item.DisplayName))
|
||||
{
|
||||
if (FetchOpcNodeDisplayName == true)
|
||||
{
|
||||
node = OpcUaClientSession.ReadNode(currentNodeId);
|
||||
|
@ -799,6 +407,7 @@ namespace OpcPublisher
|
|||
{
|
||||
item.DisplayName = currentNodeId.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
// add the new monitored item.
|
||||
MonitoredItem monitoredItem = new MonitoredItem()
|
||||
|
@ -1096,7 +705,7 @@ namespace OpcPublisher
|
|||
/// Adds a node to be monitored. If there is no subscription with the requested publishing interval,
|
||||
/// one is created.
|
||||
/// </summary>
|
||||
public async Task<HttpStatusCode> AddNodeForMonitoringAsync(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval, CancellationToken ct)
|
||||
public async Task<HttpStatusCode> AddNodeForMonitoringAsync(NodeId nodeId, ExpandedNodeId expandedNodeId, int opcPublishingInterval, int opcSamplingInterval, string displayName, CancellationToken ct)
|
||||
{
|
||||
string logPrefix = "AddNodeForMonitoringAsync:";
|
||||
bool sessionLocked = false;
|
||||
|
@ -1149,11 +758,11 @@ namespace OpcPublisher
|
|||
// add a new item to monitor
|
||||
if (expandedNodeId == null)
|
||||
{
|
||||
opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUrl);
|
||||
opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUrl, displayName);
|
||||
}
|
||||
else
|
||||
{
|
||||
opcMonitoredItem = new OpcMonitoredItem(expandedNodeId, EndpointUrl);
|
||||
opcMonitoredItem = new OpcMonitoredItem(expandedNodeId, EndpointUrl, displayName);
|
||||
}
|
||||
opcMonitoredItem.RequestedSamplingInterval = opcSamplingInterval;
|
||||
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
|
||||
using Opc.Ua.Client;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace OpcPublisher
|
||||
{
|
||||
using static OpcApplicationConfiguration;
|
||||
|
||||
/// <summary>
|
||||
/// Class to manage OPC subscriptions. We create a subscription for each different publishing interval
|
||||
/// on an Endpoint.
|
||||
/// </summary>
|
||||
public class OpcSubscription
|
||||
{
|
||||
public List<OpcMonitoredItem> OpcMonitoredItems;
|
||||
public int RequestedPublishingInterval;
|
||||
public double PublishingInterval;
|
||||
public Subscription OpcUaClientSubscription;
|
||||
|
||||
public OpcSubscription(int? publishingInterval)
|
||||
{
|
||||
RequestedPublishingInterval = publishingInterval ?? OpcPublishingInterval;
|
||||
PublishingInterval = RequestedPublishingInterval;
|
||||
OpcMonitoredItems = new List<OpcMonitoredItem>();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -176,7 +176,7 @@ namespace OpcPublisher
|
|||
if (opcNode.ExpandedNodeId != null)
|
||||
{
|
||||
ExpandedNodeId expandedNodeId = ExpandedNodeId.Parse(opcNode.ExpandedNodeId);
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(expandedNodeId, opcNode.ExpandedNodeId, publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval));
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(expandedNodeId, opcNode.ExpandedNodeId, publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval, opcNode.DisplayName));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -185,13 +185,13 @@ namespace OpcPublisher
|
|||
{
|
||||
// ExpandedNodeId format
|
||||
ExpandedNodeId expandedNodeId = ExpandedNodeId.Parse(opcNode.Id);
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(expandedNodeId, opcNode.Id, publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval));
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(expandedNodeId, opcNode.Id, publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval, opcNode.DisplayName));
|
||||
}
|
||||
else
|
||||
{
|
||||
// NodeId format
|
||||
NodeId nodeId = NodeId.Parse(opcNode.Id);
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(nodeId, opcNode.Id, publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval));
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(nodeId, opcNode.Id, publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, opcNode.DisplayName, opcNode.OpcSamplingInterval ?? OpcSamplingInterval, opcNode.OpcPublishingInterval ?? OpcPublishingInterval));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -199,7 +199,7 @@ namespace OpcPublisher
|
|||
else
|
||||
{
|
||||
// NodeId (ns=) format node configuration syntax using default sampling and publishing interval.
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(publisherConfigFileEntryLegacy.NodeId, publisherConfigFileEntryLegacy.NodeId.ToString(), publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, OpcSamplingInterval, OpcPublishingInterval));
|
||||
_nodePublishingConfiguration.Add(new NodePublishingConfigurationModel(publisherConfigFileEntryLegacy.NodeId, publisherConfigFileEntryLegacy.NodeId.ToString(), publisherConfigFileEntryLegacy.EndpointUrl, publisherConfigFileEntryLegacy.UseSecurity, null, OpcSamplingInterval, OpcPublishingInterval));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -256,7 +256,7 @@ namespace OpcPublisher
|
|||
{
|
||||
// create a monitored item for the node, we do not have the namespace index without a connected session.
|
||||
// so request a namespace update.
|
||||
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.ExpandedNodeId, opcSession.EndpointUrl)
|
||||
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.ExpandedNodeId, opcSession.EndpointUrl, nodeInfo.DisplayName)
|
||||
{
|
||||
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
|
||||
SamplingInterval = nodeInfo.OpcSamplingInterval
|
||||
|
@ -267,7 +267,7 @@ namespace OpcPublisher
|
|||
else if (nodeInfo.NodeId != null)
|
||||
{
|
||||
// create a monitored item for the node with the configured or default sampling interval
|
||||
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUrl)
|
||||
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUrl, nodeInfo.DisplayName)
|
||||
{
|
||||
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
|
||||
SamplingInterval = nodeInfo.OpcSamplingInterval
|
||||
|
|
|
@ -23,6 +23,9 @@ namespace OpcPublisher
|
|||
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
|
||||
public string ExpandedNodeId;
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
|
||||
public string DisplayName;
|
||||
|
||||
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
|
||||
public int? OpcSamplingInterval;
|
||||
|
||||
|
@ -65,27 +68,30 @@ namespace OpcPublisher
|
|||
public NodeId NodeId;
|
||||
public ExpandedNodeId ExpandedNodeId;
|
||||
public string OriginalId;
|
||||
public string DisplayName;
|
||||
public int OpcSamplingInterval;
|
||||
public int OpcPublishingInterval;
|
||||
|
||||
public NodePublishingConfigurationModel(ExpandedNodeId expandedNodeId, string originalId, Uri endpointUrl, bool? useSecurity, int opcSamplingInterval, int opcPublishingInterval)
|
||||
public NodePublishingConfigurationModel(ExpandedNodeId expandedNodeId, string originalId, Uri endpointUrl, bool? useSecurity, int opcSamplingInterval, int opcPublishingInterval, string displayName)
|
||||
{
|
||||
NodeId = null;
|
||||
ExpandedNodeId = expandedNodeId;
|
||||
OriginalId = originalId;
|
||||
EndpointUrl = endpointUrl;
|
||||
UseSecurity = useSecurity ?? true;
|
||||
DisplayName = displayName;
|
||||
OpcSamplingInterval = opcSamplingInterval;
|
||||
OpcPublishingInterval = opcPublishingInterval;
|
||||
}
|
||||
|
||||
public NodePublishingConfigurationModel(NodeId nodeId, string originalId, Uri endpointUrl, bool? useSecurity, int opcSamplingInterval, int opcPublishingInterval)
|
||||
public NodePublishingConfigurationModel(NodeId nodeId, string originalId, Uri endpointUrl, bool? useSecurity, string displayName, int opcSamplingInterval, int opcPublishingInterval)
|
||||
{
|
||||
NodeId = nodeId;
|
||||
ExpandedNodeId = null;
|
||||
OriginalId = originalId;
|
||||
EndpointUrl = endpointUrl;
|
||||
UseSecurity = useSecurity ?? true;
|
||||
DisplayName = displayName;
|
||||
OpcSamplingInterval = opcSamplingInterval;
|
||||
OpcPublishingInterval = opcPublishingInterval;
|
||||
}
|
||||
|
|
|
@ -493,13 +493,13 @@ namespace OpcPublisher
|
|||
{
|
||||
// add the node info to the subscription with the default publishing interval, execute syncronously
|
||||
Logger.Debug($"{logPrefix} Request to monitor item with NodeId '{nodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})");
|
||||
statusCode = opcSession.AddNodeForMonitoringAsync(nodeId, null, OpcPublishingInterval, OpcSamplingInterval, ShutdownTokenSource.Token).Result;
|
||||
statusCode = opcSession.AddNodeForMonitoringAsync(nodeId, null, OpcPublishingInterval, OpcSamplingInterval, null, ShutdownTokenSource.Token).Result;
|
||||
}
|
||||
else
|
||||
{
|
||||
// add the node info to the subscription with the default publishing interval, execute syncronously
|
||||
Logger.Debug($"{logPrefix} Request to monitor item with ExpandedNodeId '{expandedNodeId.ToString()}' (PublishingInterval: {OpcPublishingInterval}, SamplingInterval: {OpcSamplingInterval})");
|
||||
statusCode = opcSession.AddNodeForMonitoringAsync(null, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval, ShutdownTokenSource.Token).Result;
|
||||
statusCode = opcSession.AddNodeForMonitoringAsync(null, expandedNodeId, OpcPublishingInterval, OpcSamplingInterval, null, ShutdownTokenSource.Token).Result;
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
|
|
Загрузка…
Ссылка в новой задаче