Merge remote-tracking branch 'pubgit/master'

This commit is contained in:
Hans Gschossmann 2017-08-21 15:49:34 +02:00
Родитель 618c7e8967 e1da0adae3
Коммит 77183f413c
10 изменённых файлов: 4790 добавлений и 240 удалений

5
.gitignore поставляемый
Просмотреть файл

@ -17,7 +17,10 @@
*.dll
*.so
*.pdb
/src/GatewayApp.NetCore/Logs
*.der
*.pfx
*.exe
/src/out
/src/Logs

Просмотреть файл

@ -0,0 +1,819 @@
/* ========================================================================
* Copyright (c) 2005-2016 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
using System;
using System.Collections.Generic;
using System.Text;
using Opc.Ua.Server;
namespace Opc.Ua.Sample
{
/// <summary>
/// Provides a basic monitored item implementation which does not support queuing.
/// </summary>
public class DataChangeMonitoredItem : IDataChangeMonitoredItem
{
#region Constructors
/// <summary>
/// Constructs a new instance.
/// </summary>
public DataChangeMonitoredItem(
MonitoredNode source,
uint id,
uint attributeId,
NumericRange indexRange,
QualifiedName dataEncoding,
DiagnosticsMasks diagnosticsMasks,
TimestampsToReturn timestampsToReturn,
MonitoringMode monitoringMode,
uint clientHandle,
double samplingInterval,
bool alwaysReportUpdates)
{
m_source = source;
m_id = id;
m_attributeId = attributeId;
m_indexRange = indexRange;
m_dataEncoding = dataEncoding;
m_timestampsToReturn = timestampsToReturn;
m_diagnosticsMasks = diagnosticsMasks;
m_monitoringMode = monitoringMode;
m_clientHandle = clientHandle;
m_samplingInterval = samplingInterval;
m_nextSampleTime = DateTime.UtcNow.Ticks;
m_readyToPublish = false;
m_readyToTrigger = false;
m_alwaysReportUpdates = alwaysReportUpdates;
}
/// <summary>
/// Constructs a new instance.
/// </summary>
public DataChangeMonitoredItem(
MonitoredNode source,
uint id,
uint attributeId,
NumericRange indexRange,
QualifiedName dataEncoding,
DiagnosticsMasks diagnosticsMasks,
TimestampsToReturn timestampsToReturn,
MonitoringMode monitoringMode,
uint clientHandle,
double samplingInterval,
uint queueSize,
bool discardOldest,
DataChangeFilter filter,
Range range,
bool alwaysReportUpdates)
{
m_source = source;
m_id = id;
m_attributeId = attributeId;
m_indexRange = indexRange;
m_dataEncoding = dataEncoding;
m_timestampsToReturn = timestampsToReturn;
m_diagnosticsMasks = diagnosticsMasks;
m_monitoringMode = monitoringMode;
m_clientHandle = clientHandle;
m_samplingInterval = samplingInterval;
m_nextSampleTime = DateTime.UtcNow.Ticks;
m_readyToPublish = false;
m_readyToTrigger = false;
m_queue = null;
m_filter = filter;
m_range = 0;
m_alwaysReportUpdates = alwaysReportUpdates;
if (range != null)
{
m_range = range.High - range.Low;
}
if (queueSize > 1)
{
m_queue = new MonitoredItemQueue();
m_queue.SetQueueSize(queueSize, discardOldest, diagnosticsMasks);
m_queue.SetSamplingInterval(samplingInterval);
}
}
#endregion
#region Public Members
/// <summary>
/// Gets the id for the attribute being monitored.
/// </summary>
public uint AttributeId
{
get { return m_attributeId; }
}
/// <summary>
/// Gets the index range used to selected a subset of the value.
/// </summary>
public NumericRange IndexRange
{
get { return m_indexRange; }
}
/// <summary>
/// Gets the data encoding to use when returning the value.
/// </summary>
public QualifiedName DataEncoding
{
get { return m_dataEncoding; }
}
/// <summary>
/// Whether the monitored item should report a value without checking if it was changed.
/// </summary>
public bool AlwaysReportUpdates
{
get { return m_alwaysReportUpdates; }
set { m_alwaysReportUpdates = value; }
}
/// <summary>
/// The number of milliseconds until the next sample.
/// </summary>
public int TimeToNextSample
{
get
{
lock (m_lock)
{
if (m_monitoringMode == MonitoringMode.Disabled)
{
return Int32.MaxValue;
}
DateTime now = DateTime.UtcNow;
if (m_nextSampleTime <= now.Ticks)
{
return 0;
}
return (int)((m_nextSampleTime - now.Ticks)/TimeSpan.TicksPerMillisecond);
}
}
}
/// <summary>
/// The monitoring mode.
/// </summary>
public MonitoringMode MonitoringMode
{
get
{
return m_monitoringMode;
}
}
/// <summary>
/// The sampling interval.
/// </summary>
public double SamplingInterval
{
get
{
lock (m_lock)
{
return m_samplingInterval;
}
}
}
/// <summary>
/// Modifies the monitored item parameters,
/// </summary>
public ServiceResult Modify(
DiagnosticsMasks diagnosticsMasks,
TimestampsToReturn timestampsToReturn,
uint clientHandle,
double samplingInterval)
{
return Modify(diagnosticsMasks, timestampsToReturn, clientHandle, samplingInterval, 0, false, null, null);
}
/// <summary>
/// Modifies the monitored item parameters,
/// </summary>
public ServiceResult Modify(
DiagnosticsMasks diagnosticsMasks,
TimestampsToReturn timestampsToReturn,
uint clientHandle,
double samplingInterval,
uint queueSize,
bool discardOldest,
DataChangeFilter filter,
Range range)
{
lock (m_lock)
{
m_diagnosticsMasks = diagnosticsMasks;
m_timestampsToReturn = timestampsToReturn;
m_clientHandle = clientHandle;
// subtract the previous sampling interval.
long oldSamplingInterval = (long)(m_samplingInterval*TimeSpan.TicksPerMillisecond);
if (oldSamplingInterval < m_nextSampleTime)
{
m_nextSampleTime -= oldSamplingInterval;
}
m_samplingInterval = samplingInterval;
// calculate the next sampling interval.
long newSamplingInterval = (long)(m_samplingInterval*TimeSpan.TicksPerMillisecond);
if (m_samplingInterval > 0)
{
m_nextSampleTime += newSamplingInterval;
}
else
{
m_nextSampleTime = 0;
}
// update the filter and the range.
m_filter = filter;
m_range = 0;
if (range != null)
{
m_range = range.High - range.Low;
}
// update the queue size.
if (queueSize > 1)
{
if (m_queue == null)
{
m_queue = new MonitoredItemQueue();
}
m_queue.SetQueueSize(queueSize, discardOldest, diagnosticsMasks);
m_queue.SetSamplingInterval(samplingInterval);
}
else
{
m_queue = null;
}
return ServiceResult.Good;
}
}
/// <summary>
/// Called when the attribute being monitored changed. Reads and queues the value.
/// </summary>
public void ValueChanged(ISystemContext context)
{
DataValue value = new DataValue();
ServiceResult error = m_source.Node.ReadAttribute(context, m_attributeId, NumericRange.Empty, null, value);
if (ServiceResult.IsBad(error))
{
value = new DataValue(error.StatusCode);
}
value.ServerTimestamp = DateTime.UtcNow;
QueueValue(value, error);
}
#endregion
#region IMonitoredItem Members
/// <summary>
/// The node manager for the monitored item.
/// </summary>
public INodeManager NodeManager
{
get { return m_source.NodeManager; }
}
/// <summary>
/// The session for the monitored item.
/// </summary>
public Session Session
{
get
{
ISubscription subscription = m_subscription;
if (subscription != null)
{
return subscription.Session;
}
return null;
}
}
/// <summary>
/// The identifier for the subscription that the monitored item belongs to.
/// </summary>
public uint SubscriptionId
{
get
{
ISubscription subscription = m_subscription;
if (subscription != null)
{
return subscription.Id;
}
return 0;
}
}
/// <summary>
/// The unique identifier for the monitored item.
/// </summary>
public uint Id
{
get { return m_id; }
}
/// <summary>
/// The client handle.
/// </summary>
public uint ClientHandle
{
get { return m_clientHandle; }
}
/// <summary>
/// The callback to use to notify the subscription when values are ready to publish.
/// </summary>
public ISubscription SubscriptionCallback
{
get
{
return m_subscription;
}
set
{
m_subscription = value;
}
}
/// <summary>
/// The handle assigned to the monitored item by the node manager.
/// </summary>
public object ManagerHandle
{
get { return m_source; }
}
/// <summary>
/// The type of monitor item.
/// </summary>
public int MonitoredItemType
{
get { return MonitoredItemTypeMask.DataChange; }
}
/// <summary>
/// Returns true if the item is ready to publish.
/// </summary>
public bool IsReadyToPublish
{
get
{
lock (m_lock)
{
// check if not ready to publish.
if (!m_readyToPublish)
{
return false;
}
// check if monitoring was turned off.
if (m_monitoringMode != MonitoringMode.Reporting)
{
return false;
}
// re-queue if too little time has passed since the last publish.
long now = DateTime.UtcNow.Ticks;
if (m_nextSampleTime > now)
{
return false;
}
return true;
}
}
}
/// <summary>
/// Gets or Sets a value indicating whether the item is ready to trigger in case it has some linked items.
/// </summary>
public bool IsReadyToTrigger
{
get
{
lock (m_lock)
{
// only allow to trigger if sampling or reporting.
if (m_monitoringMode == MonitoringMode.Disabled)
{
return false;
}
return m_readyToTrigger;
}
}
set
{
lock (m_lock)
{
m_readyToTrigger = value;
}
}
}
/// <summary>
/// Returns the results for the create request.
/// </summary>
public ServiceResult GetCreateResult(out MonitoredItemCreateResult result)
{
lock (m_lock)
{
result = new MonitoredItemCreateResult();
result.MonitoredItemId = m_id;
result.StatusCode = StatusCodes.Good;
result.RevisedSamplingInterval = m_samplingInterval;
result.RevisedQueueSize = 0;
result.FilterResult = null;
if (m_queue != null)
{
result.RevisedQueueSize = m_queue.QueueSize;
}
return ServiceResult.Good;
}
}
/// <summary>
/// Returns the results for the modify request.
/// </summary>
public ServiceResult GetModifyResult(out MonitoredItemModifyResult result)
{
lock (m_lock)
{
result = new MonitoredItemModifyResult();
result.StatusCode = StatusCodes.Good;
result.RevisedSamplingInterval = m_samplingInterval;
result.RevisedQueueSize = 0;
result.FilterResult = null;
if (m_queue != null)
{
result.RevisedQueueSize = m_queue.QueueSize;
}
return ServiceResult.Good;
}
}
#endregion
#region IDataChangeMonitoredItem Members
/// <summary>
/// Queues a new data change.
/// </summary>
public void QueueValue(DataValue value, ServiceResult error)
{
lock (m_lock)
{
// check if value has changed.
if (!m_alwaysReportUpdates)
{
if (!Opc.Ua.Server.MonitoredItem.ValueChanged(value, error, m_lastValue, m_lastError, m_filter, m_range))
{
return;
}
}
// make a shallow copy of the value.
if (value != null)
{
DataValue copy = new DataValue();
copy.WrappedValue = value.WrappedValue;
copy.StatusCode = value.StatusCode;
copy.SourceTimestamp = value.SourceTimestamp;
copy.SourcePicoseconds = value.SourcePicoseconds;
copy.ServerTimestamp = value.ServerTimestamp;
copy.ServerPicoseconds = value.ServerPicoseconds;
value = copy;
// ensure the data value matches the error status code.
if (error != null && error.StatusCode.Code != 0)
{
value.StatusCode = error.StatusCode;
}
}
m_lastValue = value;
m_lastError = error;
// queue value.
if (m_queue != null)
{
m_queue.QueueValue(value, error);
}
// flag the item as ready to publish.
m_readyToPublish = true;
m_readyToTrigger = true;
}
}
/// <summary>
/// Sets a flag indicating that the semantics for the monitored node have changed.
/// </summary>
/// <remarks>
/// The StatusCode for next value reported by the monitored item will have the SemanticsChanged bit set.
/// </remarks>
public void SetSemanticsChanged()
{
lock (m_lock)
{
m_semanticsChanged = true;
}
}
/// <summary>
/// Sets a flag indicating that the structure of the monitored node has changed.
/// </summary>
/// <remarks>
/// The StatusCode for next value reported by the monitored item will have the StructureChanged bit set.
/// </remarks>
public void SetStructureChanged()
{
lock (m_lock)
{
m_structureChanged = true;
}
}
/// <summary>
/// Changes the monitoring mode.
/// </summary>
public MonitoringMode SetMonitoringMode(MonitoringMode monitoringMode)
{
lock (m_lock)
{
MonitoringMode previousMode = m_monitoringMode;
if (previousMode == monitoringMode)
{
return previousMode;
}
if (previousMode == MonitoringMode.Disabled)
{
m_nextSampleTime = DateTime.UtcNow.Ticks;
m_lastError = null;
m_lastValue = null;
}
m_monitoringMode = monitoringMode;
if (monitoringMode == MonitoringMode.Disabled)
{
m_readyToPublish = false;
m_readyToTrigger = false;
}
return previousMode;
}
}
/// <summary>
/// No filters supported.
/// </summary>
public DataChangeFilter DataChangeFilter
{
get { return m_filter; }
}
/// <summary>
/// Increments the sample time to the next interval.
/// </summary>
private void IncrementSampleTime()
{
// update next sample time.
long now = DateTime.UtcNow.Ticks;
long samplingInterval = (long)(m_samplingInterval*TimeSpan.TicksPerMillisecond);
if (m_nextSampleTime > 0)
{
long delta = now - m_nextSampleTime;
if (samplingInterval > 0 && delta >= 0)
{
m_nextSampleTime += ((delta/samplingInterval)+1)*samplingInterval;
}
}
// set sampling time based on current time.
else
{
m_nextSampleTime = now + samplingInterval;
}
}
/// <summary>
/// Called by the subscription to publish any notification.
/// </summary>
public bool Publish(OperationContext context, Queue<MonitoredItemNotification> notifications, Queue<DiagnosticInfo> diagnostics)
{
lock (m_lock)
{
// check if not ready to publish.
if (!IsReadyToPublish)
{
return false;
}
// update sample time.
IncrementSampleTime();
// update publish flag.
m_readyToPublish = false;
m_readyToTrigger = false;
// check if queuing is enabled.
if (m_queue == null)
{
Publish(context, m_lastValue, m_lastError, notifications, diagnostics);
}
else
{
DataValue value = null;
ServiceResult error = null;
while (m_queue.Publish(out value, out error))
{
Publish(context, value, error, notifications, diagnostics);
}
}
return true;
}
}
/// <summary>
/// Publishes a value.
/// </summary>
private void Publish(
OperationContext context,
DataValue value,
ServiceResult error,
Queue<MonitoredItemNotification> notifications,
Queue<DiagnosticInfo> diagnostics)
{
// set semantics changed bit.
if (m_semanticsChanged)
{
if (value != null)
{
value.StatusCode = value.StatusCode.SetSemanticsChanged(true);
}
if (error != null)
{
error = new ServiceResult(
error.StatusCode.SetSemanticsChanged(true),
error.SymbolicId,
error.NamespaceUri,
error.LocalizedText,
error.AdditionalInfo,
error.InnerResult);
}
m_semanticsChanged = false;
}
// set structure changed bit.
if (m_structureChanged)
{
if (value != null)
{
value.StatusCode = value.StatusCode.SetStructureChanged(true);
}
if (error != null)
{
error = new ServiceResult(
error.StatusCode.SetStructureChanged(true),
error.SymbolicId,
error.NamespaceUri,
error.LocalizedText,
error.AdditionalInfo,
error.InnerResult);
}
m_structureChanged = false;
}
// copy data value.
MonitoredItemNotification item = new MonitoredItemNotification();
item.ClientHandle = m_clientHandle;
item.Value = value;
// apply timestamp filter.
if (m_timestampsToReturn != TimestampsToReturn.Server && m_timestampsToReturn != TimestampsToReturn.Both)
{
item.Value.ServerTimestamp = DateTime.MinValue;
}
if (m_timestampsToReturn != TimestampsToReturn.Source && m_timestampsToReturn != TimestampsToReturn.Both)
{
item.Value.SourceTimestamp = DateTime.MinValue;
}
notifications.Enqueue(item);
// update diagnostic info.
DiagnosticInfo diagnosticInfo = null;
if (m_lastError != null)
{
if ((m_diagnosticsMasks & DiagnosticsMasks.OperationAll) != 0)
{
diagnosticInfo = ServerUtils.CreateDiagnosticInfo(m_source.Server, context, m_lastError);
}
}
diagnostics.Enqueue(diagnosticInfo);
}
#endregion
#region Private Fields
private object m_lock = new object();
private MonitoredNode m_source;
private ISubscription m_subscription;
private uint m_id;
private DataValue m_lastValue;
private ServiceResult m_lastError;
private uint m_attributeId;
private NumericRange m_indexRange;
private QualifiedName m_dataEncoding;
private TimestampsToReturn m_timestampsToReturn;
private DiagnosticsMasks m_diagnosticsMasks;
private uint m_clientHandle;
private double m_samplingInterval;
private MonitoredItemQueue m_queue;
private DataChangeFilter m_filter;
private double m_range;
private MonitoringMode m_monitoringMode;
private long m_nextSampleTime;
private bool m_readyToPublish;
private bool m_readyToTrigger;
private bool m_alwaysReportUpdates;
private bool m_semanticsChanged;
private bool m_structureChanged;
#endregion
}
}

Просмотреть файл

@ -22,6 +22,19 @@ namespace Opc.Ua.Publisher
Configuration.ClientConfiguration = new ClientConfiguration();
Configuration.ServerConfiguration = new ServerConfiguration();
// enable logging
Configuration.TraceConfiguration = new TraceConfiguration();
Configuration.TraceConfiguration.TraceMasks = Utils.TraceMasks.Error | Utils.TraceMasks.Security | Utils.TraceMasks.StackTrace | Utils.TraceMasks.StartStop;
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_LOGP")))
{
Configuration.TraceConfiguration.OutputFilePath = Environment.GetEnvironmentVariable("_GW_LOGP");
}
else
{
Configuration.TraceConfiguration.OutputFilePath = "./Logs/" + Configuration.ApplicationName + ".log.txt";
}
Configuration.TraceConfiguration.ApplySettings();
if (Configuration.SecurityConfiguration == null)
{
Configuration.SecurityConfiguration = new SecurityConfiguration();
@ -87,10 +100,18 @@ namespace Opc.Ua.Publisher
certificate = CertificateFactory.CreateCertificate(
Configuration.SecurityConfiguration.ApplicationCertificate.StoreType,
Configuration.SecurityConfiguration.ApplicationCertificate.StorePath,
null,
Configuration.ApplicationUri,
Configuration.ApplicationName,
Configuration.ApplicationName,
new List<string>(){ Configuration.ApplicationName }
null,
CertificateFactory.defaultKeySize,
DateTime.UtcNow - TimeSpan.FromDays(1),
CertificateFactory.defaultLifeTime,
CertificateFactory.defaultHashSize,
false,
null,
null
);
}
if (certificate == null)
@ -153,16 +174,15 @@ namespace Opc.Ua.Publisher
newPolicy.SecurityPolicyUri = SecurityPolicies.Basic128Rsa15;
Configuration.ServerConfiguration.SecurityPolicies.Add(newPolicy);
// enable logging
Configuration.TraceConfiguration = new TraceConfiguration();
Configuration.TraceConfiguration.DeleteOnLoad = true;
Configuration.TraceConfiguration.TraceMasks = 519;
Configuration.TraceConfiguration.OutputFilePath = "./Logs/" + Configuration.ApplicationName + ".log.txt";
Configuration.TraceConfiguration.ApplySettings();
// the OperationTimeout should be twice the minimum value for PublishingInterval * KeepAliveCount, so set to 120s
Configuration.TransportQuotas.OperationTimeout = 120000;
// allow SHA1 certificates for now as many OPC Servers still use them
Configuration.SecurityConfiguration.RejectSHA1SignedCertificates = false;
// allow 1024 minimum key size as many OPC Servers still use them
Configuration.SecurityConfiguration.MinimumCertificateKeySize = 1024;
// validate the configuration now
Configuration.Validate(Configuration.ApplicationType).Wait();
}

386
src/MonitoredItemQueue.cs Normal file
Просмотреть файл

@ -0,0 +1,386 @@
/* ========================================================================
* Copyright (c) 2005-2016 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
using System;
using System.Collections.Generic;
using System.Text;
namespace Opc.Ua.Sample
{
/// <summary>
/// Provides a queue for data changes.
/// </summary>
public class MonitoredItemQueue
{
/// <summary>
/// Creates an empty queue.
/// </summary>
public MonitoredItemQueue()
{
m_values = null;
m_errors = null;
m_start = -1;
m_end = -1;
m_overflow = -1;
m_discardOldest = false;
m_nextSampleTime = 0;
m_samplingInterval = 0;
}
#region Public Methods
/// <summary>
/// Gets the current queue size.
/// </summary>
public uint QueueSize
{
get
{
if (m_values == null)
{
return 0;
}
return (uint)m_values.Length;
}
}
/// <summary>
/// Sets the sampling interval used when queuing values.
/// </summary>
/// <param name="samplingInterval">The new sampling interval.</param>
public void SetSamplingInterval(double samplingInterval)
{
// substract the previous sampling interval.
if (m_samplingInterval < m_nextSampleTime)
{
m_nextSampleTime -= m_samplingInterval;
}
// calculate the next sampling interval.
m_samplingInterval = (long)(samplingInterval*TimeSpan.TicksPerMillisecond);
if (m_samplingInterval > 0)
{
m_nextSampleTime += m_samplingInterval;
}
else
{
m_nextSampleTime = 0;
}
}
/// <summary>
/// Sets the queue size.
/// </summary>
/// <param name="queueSize">The new queue size.</param>
/// <param name="discardOldest">Whether to discard the oldest values if the queue overflows.</param>
/// <param name="diagnosticsMasks">Specifies which diagnostics which should be kept in the queue.</param>
public void SetQueueSize(uint queueSize, bool discardOldest, DiagnosticsMasks diagnosticsMasks)
{
int length = (int)queueSize;
if (length < 1)
{
length = 1;
}
int start = m_start;
int end = m_end;
// create new queue.
DataValue[] values = new DataValue[length];
ServiceResult[] errors = null;
if ((diagnosticsMasks & DiagnosticsMasks.OperationAll) != 0)
{
errors = new ServiceResult[length];
}
// copy existing values.
List<DataValue> existingValues = null;
List<ServiceResult> existingErrors = null;
if (m_start >= 0)
{
existingValues = new List<DataValue>();
existingErrors = new List<ServiceResult>();
DataValue value = null;
ServiceResult error = null;
while (Dequeue(out value, out error))
{
existingValues.Add(value);
existingErrors.Add(error);
}
}
// update internals.
m_values = values;
m_errors = errors;
m_start = -1;
m_end = 0;
m_overflow = -1;
m_discardOldest = discardOldest;
// requeue the data.
if (existingValues != null)
{
for (int ii = 0; ii < existingValues.Count; ii++)
{
Enqueue(existingValues[ii], existingErrors[ii]);
}
}
}
/// <summary>
/// Adds the value to the queue.
/// </summary>
/// <param name="value">The value to queue.</param>
/// <param name="error">The error to queue.</param>
public void QueueValue(DataValue value, ServiceResult error)
{
long now = DateTime.UtcNow.Ticks;
if (m_start >= 0)
{
// check if too soon for another sample.
if (now < m_nextSampleTime)
{
int last = m_end-1;
if (last < 0)
{
last = m_values.Length-1;
}
// replace last value and error.
m_values[last] = value;
if (m_errors != null)
{
m_errors[last] = error;
}
return;
}
}
// update next sample time.
if (m_nextSampleTime > 0)
{
long delta = now - m_nextSampleTime;
if (m_samplingInterval > 0 && delta >= 0)
{
m_nextSampleTime += ((delta/m_samplingInterval)+1)*m_samplingInterval;
}
}
else
{
m_nextSampleTime = now + m_samplingInterval;
}
// queue next value.
Enqueue(value, error);
}
/// <summary>
/// Publishes the oldest value in the queue.
/// </summary>
/// <param name="value">The value.</param>
/// <param name="error">The error associated with the value.</param>
/// <returns>True if a value was found. False if the queue is empty.</returns>
public bool Publish(out DataValue value, out ServiceResult error)
{
return Dequeue(out value, out error);
}
#endregion
#region Private Methods
/// <summary>
/// Adds the value to the queue. Discards values if the queue is full.
/// </summary>
/// <param name="value">The value to add.</param>
/// <param name="error">The error to add.</param>
private void Enqueue(DataValue value, ServiceResult error)
{
// check for empty queue.
if (m_start < 0)
{
m_start = 0;
m_end = 1;
m_overflow = -1;
m_values[m_start] = value;
if (m_errors != null)
{
m_errors[m_start] = error;
}
return;
}
int next = m_end;
// check for wrap around.
if (next >= m_values.Length)
{
next = 0;
}
// check if queue is full.
if (m_start == next)
{
if (!m_discardOldest)
{
m_overflow = m_end-1;
return;
}
// remove oldest value.
m_start++;
if (m_start >= m_values.Length)
{
m_start = 0;
}
// set overflow bit.
m_overflow = m_start;
}
// add value.
m_values[next] = value;
if (m_errors != null)
{
m_errors[next] = error;
}
m_end = next+1;
}
/// <summary>
/// Removes a value and an error from the queue.
/// </summary>
/// <param name="value">The value removed from the queue.</param>
/// <param name="error">The error removed from the queue.</param>
/// <returns>True if a value was found. False if the queue is empty.</returns>
private bool Dequeue(out DataValue value, out ServiceResult error)
{
value = null;
error = null;
// check for empty queue.
if (m_start < 0)
{
return false;
}
value = m_values[m_start];
m_values[m_start] = null;
if (m_errors != null)
{
error = m_errors[m_start];
m_errors[m_start] = null;
}
// set the overflow bit.
if (m_overflow == m_start)
{
SetOverflowBit(ref value, ref error);
m_overflow = -1;
}
m_start++;
// check if queue has been emptied.
if (m_start == m_end)
{
m_start = -1;
m_end = 0;
}
// check for wrap around.
else if (m_start >= m_values.Length)
{
m_start = 0;
}
return true;
}
/// <summary>
/// Sets the overflow bit in the value and error.
/// </summary>
/// <param name="value">The value to update.</param>
/// <param name="error">The error to update.</param>
private void SetOverflowBit(ref DataValue value, ref ServiceResult error)
{
if (value != null)
{
StatusCode status = value.StatusCode;
status.Overflow = true;
value.StatusCode = status;
}
if (error != null)
{
StatusCode status = error.StatusCode;
status.Overflow = true;
// have to copy before updating because the ServiceResult is invariant.
ServiceResult copy = new ServiceResult(
status,
error.SymbolicId,
error.NamespaceUri,
error.LocalizedText,
error.AdditionalInfo,
error.InnerResult);
error = copy;
}
}
#endregion
#region Private Fields
private DataValue[] m_values;
private ServiceResult[] m_errors;
private int m_start;
private int m_end;
private int m_overflow;
private bool m_discardOldest;
private long m_nextSampleTime;
private long m_samplingInterval;
#endregion
}
}

382
src/MonitoredNode.cs Normal file
Просмотреть файл

@ -0,0 +1,382 @@
/* ========================================================================
* Copyright (c) 2005-2016 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
using System;
using System.Collections.Generic;
using System.Text;
using Opc.Ua.Server;
namespace Opc.Ua.Sample
{
/// <summary>
/// Keeps track of the monitored items for a single node.
/// </summary>
public class MonitoredNode
{
#region Constructors
/// <summary>
/// Initializes the instance with the context for the node being monitored.
/// </summary>
public MonitoredNode(
IServerInternal server,
INodeManager nodeManager,
NodeState node)
{
m_server = server;
m_nodeManager = nodeManager;
m_node = node;
}
#endregion
#region Public Properties
/// <summary>
/// The server that the node belongs to.
/// </summary>
public IServerInternal Server
{
get { return m_server; }
}
/// <summary>
/// The node manager that the node belongs to.
/// </summary>
public INodeManager NodeManager
{
get { return m_nodeManager; }
}
/// <summary>
/// The node being monitored.
/// </summary>
public NodeState Node
{
get { return m_node; }
}
/// <summary>
/// Whether the node has any active monitored items for the specified attribute.
/// </summary>
public bool IsMonitoringRequired(uint attributeId)
{
if (m_monitoredItems != null)
{
for (int ii = 0; ii < m_monitoredItems.Count; ii++)
{
DataChangeMonitoredItem monitoredItem = m_monitoredItems[ii];
if (monitoredItem.AttributeId == attributeId && monitoredItem.MonitoringMode != MonitoringMode.Disabled)
{
return true;
}
}
}
return false;
}
#endregion
#region Public Methods
/// <summary>
/// Creates a new data change monitored item.
/// </summary>
/// <param name="context">The system context.</param>
/// <param name="monitoredItemId">The unique identifier for the monitiored item.</param>
/// <param name="attributeId">The attribute to monitor.</param>
/// <param name="indexRange">The index range to use for array values.</param>
/// <param name="dataEncoding">The data encoding to return for structured values.</param>
/// <param name="diagnosticsMasks">The diagnostics masks to use.</param>
/// <param name="timestampsToReturn">The timestamps to return.</param>
/// <param name="monitoringMode">The initial monitoring mode.</param>
/// <param name="clientHandle">The handle assigned by the client.</param>
/// <param name="samplingInterval">The sampling interval.</param>
/// <param name="queueSize">The queue size.</param>
/// <param name="discardOldest">Whether to discard the oldest values when the queue overflows.</param>
/// <param name="filter">The data change filter to use.</param>
/// <param name="range">The range to use when evaluating a percentage deadband filter.</param>
/// <param name="alwaysReportUpdates">Whether the monitored item should skip the check for a change in value.</param>
/// <returns>The new monitored item.</returns>
public DataChangeMonitoredItem CreateDataChangeItem(
ISystemContext context,
uint monitoredItemId,
uint attributeId,
NumericRange indexRange,
QualifiedName dataEncoding,
DiagnosticsMasks diagnosticsMasks,
TimestampsToReturn timestampsToReturn,
MonitoringMode monitoringMode,
uint clientHandle,
double samplingInterval,
uint queueSize,
bool discardOldest,
DataChangeFilter filter,
Range range,
bool alwaysReportUpdates)
{
DataChangeMonitoredItem monitoredItem = new DataChangeMonitoredItem(
this,
monitoredItemId,
attributeId,
indexRange,
dataEncoding,
diagnosticsMasks,
timestampsToReturn,
monitoringMode,
clientHandle,
samplingInterval,
queueSize,
discardOldest,
filter,
range,
alwaysReportUpdates);
if (m_monitoredItems == null)
{
m_monitoredItems = new List<DataChangeMonitoredItem>();
m_node.OnStateChanged = OnNodeChange;
}
m_monitoredItems.Add(monitoredItem);
return monitoredItem;
}
/// <summary>
/// Creates a new data change monitored item.
/// </summary>
/// <param name="context">The system context.</param>
/// <param name="monitoredItemId">The unique identifier for the monitiored item.</param>
/// <param name="attributeId">The attribute to monitor.</param>
/// <param name="indexRange">The index range to use for array values.</param>
/// <param name="dataEncoding">The data encoding to return for structured values.</param>
/// <param name="diagnosticsMasks">The diagnostics masks to use.</param>
/// <param name="timestampsToReturn">The timestamps to return.</param>
/// <param name="monitoringMode">The initial monitoring mode.</param>
/// <param name="clientHandle">The handle assigned by the client.</param>
/// <param name="samplingInterval">The sampling interval.</param>
/// <param name="alwaysReportUpdates">Whether the monitored item should skip the check for a change in value.</param>
/// <returns>The new monitored item.</returns>
public DataChangeMonitoredItem CreateDataChangeItem(
ISystemContext context,
uint monitoredItemId,
uint attributeId,
NumericRange indexRange,
QualifiedName dataEncoding,
DiagnosticsMasks diagnosticsMasks,
TimestampsToReturn timestampsToReturn,
MonitoringMode monitoringMode,
uint clientHandle,
double samplingInterval,
bool alwaysReportUpdates)
{
return CreateDataChangeItem(
context,
monitoredItemId,
attributeId,
indexRange,
dataEncoding,
diagnosticsMasks,
timestampsToReturn,
monitoringMode,
clientHandle,
samplingInterval,
0,
false,
null,
null,
alwaysReportUpdates);
}
/// <summary>
/// Deletes the monitored item.
/// </summary>
public void DeleteItem(IMonitoredItem monitoredItem)
{
if (m_monitoredItems != null)
{
for (int ii = 0; ii < m_monitoredItems.Count; ii++)
{
if (Object.ReferenceEquals(monitoredItem, m_monitoredItems[ii]))
{
m_monitoredItems.RemoveAt(ii);
break;
}
}
}
}
/// <summary>
/// Handles change events raised by the node.
/// </summary>
/// <param name="context">The system context.</param>
/// <param name="state">The node that raised the event.</param>
/// <param name="masks">What caused the event to be raised</param>
public void OnNodeChange(ISystemContext context, NodeState state, NodeStateChangeMasks masks)
{
if (m_monitoredItems != null)
{
for (int ii = 0; ii < m_monitoredItems.Count; ii++)
{
DataChangeMonitoredItem monitoredItem = m_monitoredItems[ii];
// check if the node has been deleted.
if ((masks & NodeStateChangeMasks.Deleted) != 0)
{
monitoredItem.QueueValue(null, StatusCodes.BadNodeIdUnknown);
continue;
}
if (monitoredItem.AttributeId == Attributes.Value)
{
if ((masks & NodeStateChangeMasks.Value) != 0)
{
monitoredItem.ValueChanged(context);
}
}
else
{
if ((masks & NodeStateChangeMasks.NonValue) != 0)
{
monitoredItem.ValueChanged(context);
}
}
}
}
}
/// <summary>
/// Subscribes to events produced by the node.
/// </summary>
public void SubscribeToEvents(ISystemContext context, IEventMonitoredItem eventSubscription)
{
if (m_eventSubscriptions == null)
{
m_eventSubscriptions = new List<IEventMonitoredItem>();
}
if (m_eventSubscriptions.Count == 0)
{
m_node.OnReportEvent = OnReportEvent;
m_node.SetAreEventsMonitored(context, true, true);
}
for (int ii = 0; ii < m_eventSubscriptions.Count; ii++)
{
if (Object.ReferenceEquals(eventSubscription, m_eventSubscriptions[ii]))
{
return;
}
}
m_eventSubscriptions.Add(eventSubscription);
}
/// <summary>
/// Unsubscribes to events produced by the node.
/// </summary>
public void UnsubscribeToEvents(ISystemContext context, IEventMonitoredItem eventSubscription)
{
if (m_eventSubscriptions != null)
{
for (int ii = 0; ii < m_eventSubscriptions.Count; ii++)
{
if (Object.ReferenceEquals(eventSubscription, m_eventSubscriptions[ii]))
{
m_eventSubscriptions.RemoveAt(ii);
if (m_eventSubscriptions.Count == 0)
{
m_node.SetAreEventsMonitored(context, false, true);
m_node.OnReportEvent = null;
}
break;
}
}
}
}
/// <summary>
/// Handles events reported by the node.
/// </summary>
/// <param name="context">The system context.</param>
/// <param name="state">The node that raised the event.</param>
/// <param name="e">The event to report.</param>
public void OnReportEvent(ISystemContext context, NodeState state, IFilterTarget e)
{
if (m_eventSubscriptions != null)
{
for (int ii = 0; ii < m_eventSubscriptions.Count; ii++)
{
m_eventSubscriptions[ii].QueueEvent(e);
}
}
}
/// <summary>
/// Resends the events for any conditions belonging to the node or its children.
/// </summary>
/// <param name="context">The system context.</param>
/// <param name="monitoredItem">The item to refresh.</param>
public void ConditionRefresh(
ISystemContext context,
IEventMonitoredItem monitoredItem)
{
if (m_eventSubscriptions != null)
{
for (int ii = 0; ii < m_eventSubscriptions.Count; ii++)
{
// only process items monitoring this node.
if (!Object.ReferenceEquals(monitoredItem, m_eventSubscriptions[ii]))
{
continue;
}
// get the set of condition events for the node and its children.
List<IFilterTarget> events = new List<IFilterTarget>();
m_node.ConditionRefresh(context, events, true);
// report the events to the monitored item.
for (int jj = 0; jj < events.Count; jj++)
{
monitoredItem.QueueEvent(events[jj]);
}
}
}
}
#endregion
#region Private Fields
private IServerInternal m_server;
private INodeManager m_nodeManager;
private NodeState m_node;
private List<IEventMonitoredItem> m_eventSubscriptions;
private List<DataChangeMonitoredItem> m_monitoredItems;
#endregion
}
}

Просмотреть файл

@ -37,9 +37,13 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices" Version="1.2.8" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.2.13" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.SDK" Version="0.2.4" />
<PackageReference Include="Microsoft.Azure.Devices" Version="1.3.0" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.4.0" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="0.4.1" />
</ItemGroup>
<ItemGroup>
<Folder Include="Properties\" />
</ItemGroup>
</Project>

Просмотреть файл

@ -29,192 +29,194 @@ namespace Opc.Ua.Publisher
/// </summary>
public static void Trace(string message, params object[] args)
{
Utils.Trace(message, args);
Console.WriteLine(message, args);
Utils.Trace(Utils.TraceMasks.Error, message, args);
Console.WriteLine(DateTime.Now.ToString() + ": " + message, args);
}
public static void Trace(int traceMask, string format, params object[] args)
{
Utils.Trace(traceMask, format, args);
Console.WriteLine(format, args);
Console.WriteLine(DateTime.Now.ToString() + ": " + format, args);
}
public static void Trace(Exception e, string format, params object[] args)
{
Utils.Trace(e, format, args);
Console.WriteLine(e.ToString());
Console.WriteLine(format, args);
Console.WriteLine(DateTime.Now.ToString() + ": " + e.Message.ToString());
Console.WriteLine(DateTime.Now.ToString() + ": " + format, args);
}
public static void Main(string[] args)
{
if ((args.Length == 0) || string.IsNullOrEmpty(args[0]))
try
{
Console.WriteLine("Please specify an application name as argument!");
return;
}
m_applicationName = args[0];
string ownerConnectionString = string.Empty;
// check if we also received an owner connection string
if ((args.Length > 1) && !string.IsNullOrEmpty(args[1]))
{
ownerConnectionString = args[1];
}
else
{
Trace("IoT Hub owner connection string not passed as argument.");
// check if we have an environment variable to register ourselves with IoT Hub
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_HUB_CS")))
if ((args.Length == 0) || string.IsNullOrEmpty(args[0]) || args[0].Equals("localhost", StringComparison.OrdinalIgnoreCase))
{
ownerConnectionString = Environment.GetEnvironmentVariable("_HUB_CS");
}
}
// register ourselves with IoT Hub
if (ownerConnectionString != string.Empty)
{
Trace("Attemping to register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
RegistryManager manager = RegistryManager.CreateFromConnectionString(ownerConnectionString);
// remove any existing device
Device existingDevice = manager.GetDeviceAsync(m_applicationName).Result;
if (existingDevice != null)
{
manager.RemoveDeviceAsync(m_applicationName).Wait();
}
Device newDevice = manager.AddDeviceAsync(new Device(m_applicationName)).Result;
if (newDevice != null)
{
string hostname = ownerConnectionString.Substring(0, ownerConnectionString.IndexOf(";"));
string deviceConnectionString = hostname + ";DeviceId=" + m_applicationName + ";SharedAccessKey=" + newDevice.Authentication.SymmetricKey.PrimaryKey;
SecureIoTHubToken.Write(m_applicationName, deviceConnectionString);
m_applicationName = Utils.GetHostName();
}
else
{
Trace("Could not register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
m_applicationName = args[0];
}
}
else
{
Trace("IoT Hub owner connection string not found, registration with IoT Hub abandoned.");
}
// try to read connection string from secure store and open IoTHub client
Trace("Attemping to read connection string from secure store with certificate name: " + m_applicationName);
string connectionString = SecureIoTHubToken.Read(m_applicationName);
if (!string.IsNullOrEmpty(connectionString))
{
Trace("Attemping to configure publisher with connection string: " + connectionString);
m_deviceClient = DeviceClient.CreateFromConnectionString(connectionString, Microsoft.Azure.Devices.Client.TransportType.Mqtt);
m_deviceClient.RetryPolicy = RetryPolicyType.Exponential_Backoff_With_Jitter;
m_deviceClient.OpenAsync().Wait();
}
else
{
Trace("Device connection string not found in secure store.");
}
ModuleConfiguration moduleConfiguration = new ModuleConfiguration(m_applicationName);
m_configuration = moduleConfiguration.Configuration;
m_configuration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_CertificateValidation);
Trace("Publisher is starting up...");
ModuleConfiguration moduleConfiguration = new ModuleConfiguration(m_applicationName);
m_configuration = moduleConfiguration.Configuration;
m_configuration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_CertificateValidation);
// update log configuration, if available
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_LOGP")))
{
m_configuration.TraceConfiguration.OutputFilePath = Environment.GetEnvironmentVariable("_GW_LOGP");
m_configuration.TraceConfiguration.ApplySettings();
}
// get a list of persisted endpoint URLs and create a session for each.
try
{
// check if we have an env variable specifying the published nodes path, otherwise use current directory
string publishedNodesFilePath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + "publishednodes.json";
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_PNFP")))
// start our server interface
try
{
publishedNodesFilePath = Environment.GetEnvironmentVariable("_GW_PNFP");
Trace("Starting server on endpoint " + m_configuration.ServerConfiguration.BaseAddresses[0].ToString() + "...");
m_server.Start(m_configuration);
Trace("Server started.");
}
Trace("Attemping to load nodes file from: " + publishedNodesFilePath);
m_nodesLookups = JsonConvert.DeserializeObject<PublishedNodesCollection>(File.ReadAllText(publishedNodesFilePath));
Trace("Loaded " + m_nodesLookups.Count.ToString() + " nodes.");
}
catch (Exception ex)
{
Trace("Nodes file loading failed with: " + ex.Message);
}
foreach (NodeLookup nodeLookup in m_nodesLookups)
{
if (!m_endpointUrls.Contains(nodeLookup.EndPointURL))
catch (Exception ex)
{
m_endpointUrls.Add(nodeLookup.EndPointURL);
Trace("Starting server failed with: " + ex.Message);
}
}
// start the server
try
{
Trace("Starting server on endpoint " + m_configuration.ServerConfiguration.BaseAddresses[0].ToString() + "...");
m_server.Start(m_configuration);
Trace("Server started.");
}
catch (Exception ex)
{
Trace("Starting server failed with: " + ex.Message);
}
// connect to servers
Trace("Attemping to connect to servers...");
try
{
List<Task> connectionAttempts = new List<Task>();
foreach (Uri endpointUrl in m_endpointUrls)
// check if we also received an owner connection string
string ownerConnectionString = string.Empty;
if ((args.Length > 1) && !string.IsNullOrEmpty(args[1]))
{
Trace("Connecting to server: " + endpointUrl);
connectionAttempts.Add(EndpointConnect(endpointUrl));
ownerConnectionString = args[1];
}
else
{
Trace("IoT Hub owner connection string not passed as argument.");
// check if we have an environment variable to register ourselves with IoT Hub
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_HUB_CS")))
{
ownerConnectionString = Environment.GetEnvironmentVariable("_HUB_CS");
}
}
// Wait for all sessions to be connected
Task.WaitAll(connectionAttempts.ToArray());
}
catch (Exception ex)
{
Trace("Exception: " + ex.ToString() + "\r\n" + ex.InnerException != null ? ex.InnerException.ToString() : null);
}
// register ourselves with IoT Hub
if (ownerConnectionString != string.Empty)
{
Trace("Attemping to register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
RegistryManager manager = RegistryManager.CreateFromConnectionString(ownerConnectionString);
// remove any existing device
Device existingDevice = manager.GetDeviceAsync(m_applicationName).Result;
if (existingDevice != null)
{
manager.RemoveDeviceAsync(m_applicationName).Wait();
}
Device newDevice = manager.AddDeviceAsync(new Device(m_applicationName)).Result;
if (newDevice != null)
{
string hostname = ownerConnectionString.Substring(0, ownerConnectionString.IndexOf(";"));
string deviceConnectionString = hostname + ";DeviceId=" + m_applicationName + ";SharedAccessKey=" + newDevice.Authentication.SymmetricKey.PrimaryKey;
SecureIoTHubToken.Write(m_applicationName, deviceConnectionString);
}
else
{
Trace("Could not register ourselves with IoT Hub using owner connection string: " + ownerConnectionString);
}
}
else
{
Trace("IoT Hub owner connection string not found, registration with IoT Hub abandoned.");
}
// try to read connection string from secure store and open IoTHub client
Trace("Attemping to read connection string from secure store with certificate name: " + m_applicationName);
string connectionString = SecureIoTHubToken.Read(m_applicationName);
if (!string.IsNullOrEmpty(connectionString))
{
Trace("Attemping to configure publisher with connection string: " + connectionString);
m_deviceClient = DeviceClient.CreateFromConnectionString(connectionString, Microsoft.Azure.Devices.Client.TransportType.Mqtt);
m_deviceClient.RetryPolicy = RetryPolicyType.Exponential_Backoff_With_Jitter;
m_deviceClient.OpenAsync().Wait();
}
else
{
Trace("Device connection string not found in secure store.");
}
// get a list of persisted endpoint URLs and create a session for each.
try
{
// check if we have an env variable specifying the published nodes path, otherwise use current directory
string publishedNodesFilePath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + "publishednodes.json";
if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("_GW_PNFP")))
{
publishedNodesFilePath = Environment.GetEnvironmentVariable("_GW_PNFP");
}
Trace("Attemping to load nodes file from: " + publishedNodesFilePath);
m_nodesLookups = JsonConvert.DeserializeObject<PublishedNodesCollection>(File.ReadAllText(publishedNodesFilePath));
Trace("Loaded " + m_nodesLookups.Count.ToString() + " nodes.");
}
catch (Exception ex)
{
Trace("Nodes file loading failed with: " + ex.Message);
}
// subscribe to preconfigured nodes
Trace("Attemping to subscribe to published nodes...");
if (m_nodesLookups != null)
{
foreach (NodeLookup nodeLookup in m_nodesLookups)
{
try
if (!m_endpointUrls.Contains(nodeLookup.EndPointURL))
{
CreateMonitoredItem(nodeLookup);
}
catch (Exception ex)
{
Trace("Unexpected error publishing node: " + ex.Message + "\r\nIgnoring node: " + nodeLookup.EndPointURL.AbsoluteUri + ", " + nodeLookup.NodeID.ToString());
m_endpointUrls.Add(nodeLookup.EndPointURL);
}
}
// connect to the other servers
Trace("Attemping to connect to servers...");
try
{
List<Task> connectionAttempts = new List<Task>();
foreach (Uri endpointUrl in m_endpointUrls)
{
Trace("Connecting to server: " + endpointUrl);
connectionAttempts.Add(EndpointConnect(endpointUrl));
}
// Wait for all sessions to be connected
Task.WaitAll(connectionAttempts.ToArray());
}
catch (Exception ex)
{
Trace("Exception: " + ex.ToString() + "\r\n" + ex.InnerException != null ? ex.InnerException.ToString() : null);
}
// subscribe to preconfigured nodes
Trace("Attemping to subscribe to published nodes...");
if (m_nodesLookups != null)
{
foreach (NodeLookup nodeLookup in m_nodesLookups)
{
try
{
CreateMonitoredItem(nodeLookup);
}
catch (Exception ex)
{
Trace("Unexpected error publishing node: " + ex.Message + "\r\nIgnoring node: " + nodeLookup.EndPointURL.AbsoluteUri + ", " + nodeLookup.NodeID.ToString());
}
}
}
Trace("Publisher is running. Press enter to quit.");
Console.ReadLine();
foreach (Session session in m_sessions)
{
session.Close();
}
if (m_deviceClient != null)
{
m_deviceClient.CloseAsync().Wait();
}
}
Console.WriteLine("Publisher is running. Press enter to quit.");
Console.ReadLine();
foreach (Session session in m_sessions)
catch (Exception e)
{
session.Close();
}
if (m_deviceClient != null)
{
m_deviceClient.CloseAsync().Wait();
Trace(e, "Unhandled exception in Publisher, exiting!");
}
}
@ -223,7 +225,7 @@ namespace Opc.Ua.Publisher
/// </summary>
public static async Task EndpointConnect(Uri endpointUrl)
{
EndpointDescription selectedEndpoint = SelectUaTcpEndpoint(DiscoverEndpoints(m_configuration, endpointUrl, 10));
EndpointDescription selectedEndpoint = CoreClientUtils.SelectEndpoint(endpointUrl.AbsoluteUri, true);
ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(selectedEndpoint.Server, EndpointConfiguration.Create(m_configuration));
configuredEndpoint.Update(selectedEndpoint);
@ -381,76 +383,6 @@ namespace Opc.Ua.Publisher
}
}
/// <summary>
/// Discovers all endpoints provided by an OPC UA server using a discovery client
/// </summary>
private static EndpointDescriptionCollection DiscoverEndpoints(ApplicationConfiguration config, Uri discoveryUrl, int timeout)
{
EndpointConfiguration configuration = EndpointConfiguration.Create(config);
configuration.OperationTimeout = timeout;
using (DiscoveryClient client = DiscoveryClient.Create(
discoveryUrl,
EndpointConfiguration.Create(config)))
{
try
{
EndpointDescriptionCollection endpoints = client.GetEndpoints(null);
return ReplaceLocalHostWithRemoteHost(endpoints, discoveryUrl);
}
catch (Exception e)
{
Trace("Could not fetch endpoints from url: " + discoveryUrl.ToString());
Trace("Reason = " + e.Message);
throw e;
}
}
}
/// <summary>
/// Replaces all instances of "LocalHost" in a collection of endpoint description with the real host name
/// </summary>
private static EndpointDescriptionCollection ReplaceLocalHostWithRemoteHost(EndpointDescriptionCollection endpoints, Uri discoveryUrl)
{
EndpointDescriptionCollection updatedEndpoints = endpoints;
foreach (EndpointDescription endpoint in updatedEndpoints)
{
endpoint.EndpointUrl = Utils.ReplaceLocalhost(endpoint.EndpointUrl, discoveryUrl.DnsSafeHost);
StringCollection updatedDiscoveryUrls = new StringCollection();
foreach (string url in endpoint.Server.DiscoveryUrls)
{
updatedDiscoveryUrls.Add(Utils.ReplaceLocalhost(url, discoveryUrl.DnsSafeHost));
}
endpoint.Server.DiscoveryUrls = updatedDiscoveryUrls;
}
return updatedEndpoints;
}
/// <summary>
/// Selects the UA TCP endpoint from an endpoint collection with the highest security settings offered
/// </summary>
private static EndpointDescription SelectUaTcpEndpoint(EndpointDescriptionCollection endpointCollection)
{
EndpointDescription bestEndpoint = null;
foreach (EndpointDescription endpoint in endpointCollection)
{
if (endpoint.TransportProfileUri == Profiles.UaTcpTransport)
{
if ((bestEndpoint == null) ||
(endpoint.SecurityLevel > bestEndpoint.SecurityLevel))
{
bestEndpoint = endpoint;
}
}
}
return bestEndpoint;
}
/// <summary>
/// Standard certificate validation callback
/// </summary>

Просмотреть файл

@ -1,8 +0,0 @@
{
"profiles": {
"Opc.Ua.Publisher": {
"commandName": "Project",
"commandLineArgs": "myApp"
}
}
}

Просмотреть файл

@ -213,7 +213,8 @@ namespace Publisher
// update our data on success only
// we keep the session to the server, as there may be other nodes still published on it
Program.m_nodesLookups.Remove(lookup);
var itemToRemove = Program.m_nodesLookups.Find(l => l.NodeID == lookup.NodeID && l.EndPointURL == lookup.EndPointURL);
Program.m_nodesLookups.Remove(itemToRemove);
//serialize Program.m_nodesLookups to disk
string publishedNodesFilePath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + "publishednodes.json";

3011
src/SampleNodeManager.cs Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу