This commit is contained in:
Hans Gschossmann 2017-09-04 22:43:00 +02:00
Родитель 26147c856b
Коммит 880f0dea00
7 изменённых файлов: 86 добавлений и 82 удалений

Просмотреть файл

@ -34,6 +34,7 @@ namespace Opc.Ua.Publisher
private ConcurrentQueue<string> _sendQueue;
private int _currentSizeOfIotHubMessageBytes;
private List<OpcUaMessage> _messageList;
private SemaphoreSlim _messageListSemaphore;
private uint _maxSizeOfIoTHubMessageBytes;
private int _defaultSendIntervalSeconds;
private CancellationTokenSource _tokenSource;
@ -47,6 +48,7 @@ namespace Opc.Ua.Publisher
_sendQueue = new ConcurrentQueue<string>();
_sendQueueEvent = new AutoResetEvent(false);
_messageList = new List<OpcUaMessage>();
_messageListSemaphore = new SemaphoreSlim(1);
_currentSizeOfIotHubMessageBytes = 0;
}
@ -141,7 +143,7 @@ namespace Opc.Ua.Publisher
return true;
}
public void UpdateConnectionString(string iotHubOwnerConnectionString)
public void ConnectionStringWrite(string iotHubOwnerConnectionString)
{
DeviceClient newClient = DeviceClient.CreateFromConnectionString(iotHubOwnerConnectionString, IotHubProtocol);
newClient.RetryPolicy = RetryPolicyType.Exponential_Backoff_With_Jitter;
@ -252,7 +254,9 @@ namespace Opc.Ua.Publisher
if (isDequeueSuccessful)
{
OpcUaMessage msgPayload = JsonConvert.DeserializeObject<OpcUaMessage>(messageInJson);
_messageListSemaphore.Wait();
_messageList.Add(msgPayload);
_messageListSemaphore.Release();
_currentSizeOfIotHubMessageBytes = _currentSizeOfIotHubMessageBytes + nextMessageSizeBytes;
Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {nextMessageSizeBytes} to IoTHub message (size is now {_currentSizeOfIotHubMessageBytes}). {_sendQueue.Count} message(s) in send queue.");
@ -283,9 +287,13 @@ namespace Opc.Ua.Publisher
{
if (_messageList.Count > 0)
{
// process all queued messages
_messageListSemaphore.Wait();
string msgListInJson = JsonConvert.SerializeObject(_messageList);
var encodedMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(msgListInJson));
_currentSizeOfIotHubMessageBytes = 0;
_messageList.Clear();
_messageListSemaphore.Release();
// publish
encodedMessage.Properties.Add("content-type", "application/opcua+uajson");
@ -300,17 +308,13 @@ namespace Opc.Ua.Publisher
}
else
{
Trace("No IoTHub client available ");
Trace("No IoTHub client available. Dropping messages...");
}
}
catch (Exception e)
{
Trace(e, "Exception while sending message to IoTHub. Dropping...");
Trace(e, "Exception while sending message to IoTHub. Dropping messages...");
}
// reset IoTHub message size
_currentSizeOfIotHubMessageBytes = 0;
_messageList.Clear();
}
// Restart timer

Просмотреть файл

@ -27,13 +27,11 @@ namespace Opc.Ua.Publisher
Configuration.ClientConfiguration = new ClientConfiguration();
Configuration.ServerConfiguration = new ServerConfiguration();
// enable logging and enforce information flag
Program.OpcStackTraceMask |= Utils.TraceMasks.Information;
// initialize stack tracing
Configuration.TraceConfiguration = new TraceConfiguration()
{
TraceMasks = Program.OpcStackTraceMask
};
// StdOutAndFile is not working correct, due to a bug in the stack. Need to workaround with own Trace for now.
Utils.SetTraceOutput(Utils.TraceOutput.FileOnly);
if (string.IsNullOrEmpty(Program.LogFileName))
{

Просмотреть файл

@ -143,7 +143,7 @@ namespace Opc.Ua.Publisher
_namespaceTable = new NamespaceTable();
}
public async Task ConnectAndOrMonitor()
public async Task ConnectAndMonitor()
{
_opcSessionSemaphore.Wait();
Trace($"Connect and monitor session and nodes on endpoint '{EndpointUri.AbsoluteUri}'.");

Просмотреть файл

@ -31,6 +31,7 @@ namespace Opc.Ua.Publisher
public static string NodesToPublishAbsFilenameDefault = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
public static string NodesToPublishAbsFilename { get; set; }
public static string ShopfloorDomain { get; set; }
public static bool VerboseConsole { get; set; }
//
// IoTHub related
@ -58,8 +59,8 @@ namespace Opc.Ua.Publisher
public static int OpcOperationTimeout = 120000;
public static bool TrustMyself = true;
// Enable Utils.TraceMasks.OperationDetail to get output for IoTHub telemetry operations. Current: 0x287 (647), with OperationDetail: 0x2C7 (711)
public static int OpcStackTraceMask = Utils.TraceMasks.Error | Utils.TraceMasks.Security | Utils.TraceMasks.StackTrace | Utils.TraceMasks.StartStop | Utils.TraceMasks.Information;
public static bool OpcPublisherAutoAccept = false;
public static int OpcStackTraceMask = Utils.TraceMasks.Error | Utils.TraceMasks.Security | Utils.TraceMasks.StackTrace | Utils.TraceMasks.StartStop;
public static bool OpcPublisherAutoTrustServerCerts = false;
public static uint OpcSessionCreationTimeout = 10;
public static uint OpcSessionCreationBackoffMax = 5;
public static uint OpcKeepAliveDisconnectThreshold = 10;
@ -132,12 +133,12 @@ namespace Opc.Ua.Publisher
{
var shouldShowHelp = false;
// these are the available options, not that they set the variables
// command line options configuration
Mono.Options.OptionSet options = new Mono.Options.OptionSet {
// Publishing configuration options
{ "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{NodesToPublishAbsFilenameDefault}'", (string p) => NodesToPublishAbsFilename = p },
{ "sd|shopfloordomain=", $"the domain of the shopfloor. if specified this domain is appended (delimited by a ':' to the 'ApplicationURI' property when telemetry is ingested to IoTHub.\n" +
"The value must follw the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => {
"The value must follow the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => {
Regex domainNameRegex = new Regex("^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$");
if (domainNameRegex.IsMatch(s))
{
@ -149,7 +150,7 @@ namespace Opc.Ua.Publisher
}
}
},
{ "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints\nMin: 10\nDefault: {PublisherSessionConnectWaitSec}", (int i) => {
{ "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints and starts monitoring unmonitored items\nMin: 10\nDefault: {PublisherSessionConnectWaitSec}", (int i) => {
if (i > 10)
{
PublisherSessionConnectWaitSec = i;
@ -160,6 +161,7 @@ namespace Opc.Ua.Publisher
}
}
},
{ "vc|verboseconsole=", $"the output of publisher is shown on the console.\nDefault: {VerboseConsole}", (bool b) => VerboseConsole = b },
// IoTHub specific options
{ "ih|iothubprotocol=", $"the protocol to use for communication with Azure IoTHub (allowed values: {string.Join(", ", Enum.GetNames(IotHubProtocol.GetType()))}).\nDefault: {Enum.GetName(IotHubProtocol.GetType(), IotHubProtocol)}",
@ -214,7 +216,8 @@ namespace Opc.Ua.Publisher
}
}
},
{ "ds|defaultsamplingrate=", $"the sampling rate in millisecond for which monitored nodes should be queried.\nMin: 100\nDefault: {OpcSamplingRateMillisec}", (int i) => {
{ "ds|defaultsamplingrate=", $"the sampling interval in milliseconds, the OPC UA servers should use to sample the values of the nodes to publish.\n" +
$"Please check the OPC UA spec for more details.\nMin: 100\nDefault: {OpcSamplingRateMillisec}", (int i) => {
if (i >= 100)
{
OpcSamplingRateMillisec = i;
@ -258,7 +261,7 @@ namespace Opc.Ua.Publisher
}
}
},
{ "st|opcstacktracemask=", $"the trace mask for the OPC stack. See github OPC .NET stack for definitions.\nTo enable IoTHub telemetry tracing set it to 711. Information mask 0x2 is enforced.\nDefault: {OpcStackTraceMask:X} ({Program.OpcStackTraceMask})", (int i) => {
{ "st|opcstacktracemask=", $"the trace mask for the OPC stack. See github OPC .NET stack for definitions.\nTo enable IoTHub telemetry tracing set it to 711.\nDefault: {OpcStackTraceMask:X} ({Program.OpcStackTraceMask})", (int i) => {
if (i >= 0)
{
OpcStackTraceMask = i;
@ -269,7 +272,7 @@ namespace Opc.Ua.Publisher
}
}
},
{ "aa|autoaccept=", $"the publisher accept all servers it is connecting to.\nDefault: {OpcPublisherAutoAccept}", (bool b) => OpcPublisherAutoAccept = b },
{ "as|autotrustservercerts=", $"the publisher trusts all servers it is establishing a connection to.\nDefault: {OpcPublisherAutoTrustServerCerts}", (bool b) => OpcPublisherAutoTrustServerCerts = b },
// trust own public cert option
{ "tm|trustmyself=", $"the publisher certificate is put into the trusted certificate store automatically.\nDefault: {TrustMyself}", (bool b) => TrustMyself = b },
@ -418,14 +421,14 @@ namespace Opc.Ua.Publisher
}
// Set certificate validator.
if (OpcPublisherAutoAccept)
if (OpcPublisherAutoTrustServerCerts)
{
Trace("Publisher configured to auto trust server certificates.");
OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_AutoAccept);
Trace("Publisher configured to auto trust server certificates of the servers it is connecting to.");
OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_AutoTrustServerCerts);
}
else
{
Trace("Publisher configured to not auto trust server certificates, but use certificate stores.");
Trace("Publisher configured to not auto trust server certificates. When connecting to servers, you need to manually copy the rejected server certs to the trusted store to trust them.");
OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_Default);
}
@ -542,11 +545,11 @@ namespace Opc.Ua.Publisher
Task.Run( async () => await SessionConnector(cts.Token));
// stop on user request
Trace("");
Trace("");
Trace("Publisher is running. Press ENTER to quit.");
Trace("");
Trace("");
WriteLine("");
WriteLine("");
WriteLine("Publisher is running. Press ENTER to quit.");
WriteLine("");
WriteLine("");
ReadLine();
cts.Cancel();
@ -595,7 +598,7 @@ namespace Opc.Ua.Publisher
try
{
// get tasks for all disconnected sessions and start them
var singleSessionHandlerTaskList = OpcSessions.Where(p => p.State == OpcSession.SessionState.Disconnected).Select(s => s.ConnectAndOrMonitor());
var singleSessionHandlerTaskList = OpcSessions.Where(p => p.State == OpcSession.SessionState.Disconnected).Select(s => s.ConnectAndMonitor());
await Task.WhenAll(singleSessionHandlerTaskList);
}
catch (Exception e)
@ -643,13 +646,13 @@ namespace Opc.Ua.Publisher
}
/// <summary>
/// Default certificate validation callback
/// Auto trust server certificate validation callback
/// </summary>
private static void CertificateValidator_AutoAccept(CertificateValidator validator, CertificateValidationEventArgs e)
private static void CertificateValidator_AutoTrustServerCerts(CertificateValidator validator, CertificateValidationEventArgs e)
{
if (e.Error.StatusCode == StatusCodes.BadCertificateUntrusted)
{
Trace($"Certificate '{e.Certificate.Subject}' will be auto accepted.");
Trace($"Certificate '{e.Certificate.Subject}' will be trusted, since the autotrustservercerts options was specified.");
e.Accept = true;
return;
}

Просмотреть файл

@ -83,8 +83,8 @@ namespace Publisher
opcSession.AddNodeForMonitoring(nodeToPublish.NodeId);
Trace("DoPublish: Requested to monitor item.");
// do whatever is needed.
Task monitorTask = Task.Run(async () => await opcSession.ConnectAndOrMonitor());
// start monitoring the node
Task monitorTask = Task.Run(async () => await opcSession.ConnectAndMonitor());
monitorTask.Wait();
Trace("DoPublish: Session processing completed.");
@ -134,7 +134,7 @@ namespace Publisher
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA endpoint URL as second argument!");
}
// Find the session and stop monitoring the item.
// Find the session and stop monitoring the node.
try
{
// find the session we need to monitor the node
@ -154,8 +154,8 @@ namespace Publisher
opcSession.TagNodeForMonitoringStop(nodeToUnpublish.NodeId);
Trace("UnPublishNodeMethod: Requested to stop monitoring of node.");
// do whatever is needed.
Task monitorTask = Task.Run(async () => await opcSession.ConnectAndOrMonitor());
// stop monitoring the node
Task monitorTask = Task.Run(async () => await opcSession.ConnectAndMonitor());
monitorTask.Wait();
Trace("UnPublishNodeMethod: Session processing completed.");
@ -213,7 +213,7 @@ namespace Publisher
// configure publisher and write connection string
try
{
IotHubMessaging.UpdateConnectionString(connectionString);
IotHubMessaging.ConnectionStringWrite(connectionString);
}
catch (Exception e)
{

Просмотреть файл

@ -21,14 +21,42 @@ namespace IoTHubCredentialTools
{
public class SecureIoTHubToken
{
private static string CheckForToken(X509Certificate2 cert, string name)
{
if ((cert.SubjectName.Decode(X500DistinguishedNameFlags.None | X500DistinguishedNameFlags.DoNotUseQuotes).Equals("CN=" + name, StringComparison.OrdinalIgnoreCase)) &&
(DateTime.Now < cert.NotAfter))
{
using (RSA rsa = cert.GetRSAPrivateKey())
{
if (rsa != null)
{
foreach (System.Security.Cryptography.X509Certificates.X509Extension extension in cert.Extensions)
{
// check for instruction code extension
if ((extension.Oid.Value == "2.5.29.23") && (extension.RawData.Length >= 4))
{
byte[] bytes = new byte[extension.RawData.Length - 4];
Array.Copy(extension.RawData, 4, bytes, 0, bytes.Length);
byte[] token = rsa.Decrypt(bytes, RSAEncryptionPadding.OaepSHA1);
return Encoding.ASCII.GetString(token);
}
}
}
}
}
return null;
}
public static string Read(string name, string storeType, string storePath)
{
string token = null;
// handle each store type differently
switch (storeType)
{
case CertificateStoreType.Directory:
{
// load an existing key from a no-expired cert with the subject name passed in from the OS-provided X509Store
// search a non expired cert with the given subject in the directory cert store and return the token
using (DirectoryCertificateStore store = new DirectoryCertificateStore())
{
store.Open(storePath);
@ -36,64 +64,32 @@ namespace IoTHubCredentialTools
foreach (X509Certificate2 cert in certificates)
{
if ((cert.SubjectName.Decode(X500DistinguishedNameFlags.None | X500DistinguishedNameFlags.DoNotUseQuotes).Equals("CN=" + name, StringComparison.OrdinalIgnoreCase)) &&
(DateTime.Now < cert.NotAfter))
if ((token = CheckForToken(cert, name)) != null)
{
using (RSA rsa = cert.GetRSAPrivateKey())
{
if (rsa != null)
{
foreach (System.Security.Cryptography.X509Certificates.X509Extension extension in cert.Extensions)
{
// check for instruction code extension
if ((extension.Oid.Value == "2.5.29.23") && (extension.RawData.Length >= 4))
{
byte[] bytes = new byte[extension.RawData.Length - 4];
Array.Copy(extension.RawData, 4, bytes, 0, bytes.Length);
byte[] token = rsa.Decrypt(bytes, RSAEncryptionPadding.OaepSHA1);
return Encoding.ASCII.GetString(token);
}
}
}
}
return token;
}
}
}
break;
}
case CertificateStoreType.X509Store:
{
// load an existing key from a no-expired cert with the subject name passed in from the OS-provided X509Store
// search a non expired cert with the given subject in the X509 cert store and return the token
using (X509Store store = new X509Store(storePath, StoreLocation.CurrentUser))
{
store.Open(OpenFlags.ReadOnly);
foreach (X509Certificate2 cert in store.Certificates)
{
if ((cert.SubjectName.Decode(X500DistinguishedNameFlags.None | X500DistinguishedNameFlags.DoNotUseQuotes).Equals("CN=" + name, StringComparison.OrdinalIgnoreCase)) &&
(DateTime.Now < cert.NotAfter))
if ((token = CheckForToken(cert, name)) != null)
{
using (RSA rsa = cert.GetRSAPrivateKey())
{
if (rsa != null)
{
foreach (System.Security.Cryptography.X509Certificates.X509Extension extension in cert.Extensions)
{
// check for instruction code extension
if ((extension.Oid.Value == "2.5.29.23") && (extension.RawData.Length >= 4))
{
byte[] bytes = new byte[extension.RawData.Length - 4];
Array.Copy(extension.RawData, 4, bytes, 0, bytes.Length);
byte[] token = rsa.Decrypt(bytes, RSAEncryptionPadding.OaepSHA1);
return Encoding.ASCII.GetString(token);
}
}
}
}
return token;
}
}
}
break;
}
default:
{
throw new Exception($"The requested store type '{storeType}' is not supported. Please change.");

Просмотреть файл

@ -13,14 +13,17 @@ namespace Opc.Ua.Workarounds
/// </summary>
public static void Trace(string message, params object[] args)
{
Utils.Trace(Utils.TraceMasks.Information, message, args);
WriteLine(DateTime.Now.ToString() + ": " + message, args);
Utils.Trace(Utils.TraceMasks.Error, message, args);
if (VerboseConsole)
{
WriteLine(DateTime.Now.ToString() + ": " + message, args);
}
}
public static void Trace(int traceMask, string format, params object[] args)
{
Utils.Trace(traceMask, format, args);
if ((OpcStackTraceMask & traceMask) != 0)
if (VerboseConsole && (OpcStackTraceMask & traceMask) != 0)
{
WriteLine(DateTime.Now.ToString() + ": " + format, args);
}