diff --git a/src/IotHubMessaging.cs b/src/IotHubMessaging.cs index ea5930f..d9b4eed 100644 --- a/src/IotHubMessaging.cs +++ b/src/IotHubMessaging.cs @@ -34,6 +34,7 @@ namespace Opc.Ua.Publisher private ConcurrentQueue _sendQueue; private int _currentSizeOfIotHubMessageBytes; private List _messageList; + private SemaphoreSlim _messageListSemaphore; private uint _maxSizeOfIoTHubMessageBytes; private int _defaultSendIntervalSeconds; private CancellationTokenSource _tokenSource; @@ -47,6 +48,7 @@ namespace Opc.Ua.Publisher _sendQueue = new ConcurrentQueue(); _sendQueueEvent = new AutoResetEvent(false); _messageList = new List(); + _messageListSemaphore = new SemaphoreSlim(1); _currentSizeOfIotHubMessageBytes = 0; } @@ -141,7 +143,7 @@ namespace Opc.Ua.Publisher return true; } - public void UpdateConnectionString(string iotHubOwnerConnectionString) + public void ConnectionStringWrite(string iotHubOwnerConnectionString) { DeviceClient newClient = DeviceClient.CreateFromConnectionString(iotHubOwnerConnectionString, IotHubProtocol); newClient.RetryPolicy = RetryPolicyType.Exponential_Backoff_With_Jitter; @@ -252,7 +254,9 @@ namespace Opc.Ua.Publisher if (isDequeueSuccessful) { OpcUaMessage msgPayload = JsonConvert.DeserializeObject(messageInJson); + _messageListSemaphore.Wait(); _messageList.Add(msgPayload); + _messageListSemaphore.Release(); _currentSizeOfIotHubMessageBytes = _currentSizeOfIotHubMessageBytes + nextMessageSizeBytes; Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {nextMessageSizeBytes} to IoTHub message (size is now {_currentSizeOfIotHubMessageBytes}). {_sendQueue.Count} message(s) in send queue."); @@ -283,9 +287,13 @@ namespace Opc.Ua.Publisher { if (_messageList.Count > 0) { + // process all queued messages + _messageListSemaphore.Wait(); string msgListInJson = JsonConvert.SerializeObject(_messageList); - var encodedMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(msgListInJson)); + _currentSizeOfIotHubMessageBytes = 0; + _messageList.Clear(); + _messageListSemaphore.Release(); // publish encodedMessage.Properties.Add("content-type", "application/opcua+uajson"); @@ -300,17 +308,13 @@ namespace Opc.Ua.Publisher } else { - Trace("No IoTHub client available "); + Trace("No IoTHub client available. Dropping messages..."); } } catch (Exception e) { - Trace(e, "Exception while sending message to IoTHub. Dropping..."); + Trace(e, "Exception while sending message to IoTHub. Dropping messages..."); } - - // reset IoTHub message size - _currentSizeOfIotHubMessageBytes = 0; - _messageList.Clear(); } // Restart timer diff --git a/src/ModuleConfiguration.cs b/src/ModuleConfiguration.cs index 9c6d5f9..a482622 100644 --- a/src/ModuleConfiguration.cs +++ b/src/ModuleConfiguration.cs @@ -27,13 +27,11 @@ namespace Opc.Ua.Publisher Configuration.ClientConfiguration = new ClientConfiguration(); Configuration.ServerConfiguration = new ServerConfiguration(); - // enable logging and enforce information flag - Program.OpcStackTraceMask |= Utils.TraceMasks.Information; + // initialize stack tracing Configuration.TraceConfiguration = new TraceConfiguration() { TraceMasks = Program.OpcStackTraceMask }; - // StdOutAndFile is not working correct, due to a bug in the stack. Need to workaround with own Trace for now. Utils.SetTraceOutput(Utils.TraceOutput.FileOnly); if (string.IsNullOrEmpty(Program.LogFileName)) { diff --git a/src/OpcSession.cs b/src/OpcSession.cs index 7a59c59..1e9ef16 100644 --- a/src/OpcSession.cs +++ b/src/OpcSession.cs @@ -143,7 +143,7 @@ namespace Opc.Ua.Publisher _namespaceTable = new NamespaceTable(); } - public async Task ConnectAndOrMonitor() + public async Task ConnectAndMonitor() { _opcSessionSemaphore.Wait(); Trace($"Connect and monitor session and nodes on endpoint '{EndpointUri.AbsoluteUri}'."); diff --git a/src/Program.cs b/src/Program.cs index c87dd92..4c7a272 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -31,6 +31,7 @@ namespace Opc.Ua.Publisher public static string NodesToPublishAbsFilenameDefault = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json"; public static string NodesToPublishAbsFilename { get; set; } public static string ShopfloorDomain { get; set; } + public static bool VerboseConsole { get; set; } // // IoTHub related @@ -58,8 +59,8 @@ namespace Opc.Ua.Publisher public static int OpcOperationTimeout = 120000; public static bool TrustMyself = true; // Enable Utils.TraceMasks.OperationDetail to get output for IoTHub telemetry operations. Current: 0x287 (647), with OperationDetail: 0x2C7 (711) - public static int OpcStackTraceMask = Utils.TraceMasks.Error | Utils.TraceMasks.Security | Utils.TraceMasks.StackTrace | Utils.TraceMasks.StartStop | Utils.TraceMasks.Information; - public static bool OpcPublisherAutoAccept = false; + public static int OpcStackTraceMask = Utils.TraceMasks.Error | Utils.TraceMasks.Security | Utils.TraceMasks.StackTrace | Utils.TraceMasks.StartStop; + public static bool OpcPublisherAutoTrustServerCerts = false; public static uint OpcSessionCreationTimeout = 10; public static uint OpcSessionCreationBackoffMax = 5; public static uint OpcKeepAliveDisconnectThreshold = 10; @@ -132,12 +133,12 @@ namespace Opc.Ua.Publisher { var shouldShowHelp = false; - // these are the available options, not that they set the variables + // command line options configuration Mono.Options.OptionSet options = new Mono.Options.OptionSet { // Publishing configuration options { "pf|publishfile=", $"the filename to configure the nodes to publish.\nDefault: '{NodesToPublishAbsFilenameDefault}'", (string p) => NodesToPublishAbsFilename = p }, { "sd|shopfloordomain=", $"the domain of the shopfloor. if specified this domain is appended (delimited by a ':' to the 'ApplicationURI' property when telemetry is ingested to IoTHub.\n" + - "The value must follw the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => { + "The value must follow the syntactical rules of a DNS hostname.\nDefault: not set", (string s) => { Regex domainNameRegex = new Regex("^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$"); if (domainNameRegex.IsMatch(s)) { @@ -149,7 +150,7 @@ namespace Opc.Ua.Publisher } } }, - { "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints\nMin: 10\nDefault: {PublisherSessionConnectWaitSec}", (int i) => { + { "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints and starts monitoring unmonitored items\nMin: 10\nDefault: {PublisherSessionConnectWaitSec}", (int i) => { if (i > 10) { PublisherSessionConnectWaitSec = i; @@ -160,6 +161,7 @@ namespace Opc.Ua.Publisher } } }, + { "vc|verboseconsole=", $"the output of publisher is shown on the console.\nDefault: {VerboseConsole}", (bool b) => VerboseConsole = b }, // IoTHub specific options { "ih|iothubprotocol=", $"the protocol to use for communication with Azure IoTHub (allowed values: {string.Join(", ", Enum.GetNames(IotHubProtocol.GetType()))}).\nDefault: {Enum.GetName(IotHubProtocol.GetType(), IotHubProtocol)}", @@ -214,7 +216,8 @@ namespace Opc.Ua.Publisher } } }, - { "ds|defaultsamplingrate=", $"the sampling rate in millisecond for which monitored nodes should be queried.\nMin: 100\nDefault: {OpcSamplingRateMillisec}", (int i) => { + { "ds|defaultsamplingrate=", $"the sampling interval in milliseconds, the OPC UA servers should use to sample the values of the nodes to publish.\n" + + $"Please check the OPC UA spec for more details.\nMin: 100\nDefault: {OpcSamplingRateMillisec}", (int i) => { if (i >= 100) { OpcSamplingRateMillisec = i; @@ -258,7 +261,7 @@ namespace Opc.Ua.Publisher } } }, - { "st|opcstacktracemask=", $"the trace mask for the OPC stack. See github OPC .NET stack for definitions.\nTo enable IoTHub telemetry tracing set it to 711. Information mask 0x2 is enforced.\nDefault: {OpcStackTraceMask:X} ({Program.OpcStackTraceMask})", (int i) => { + { "st|opcstacktracemask=", $"the trace mask for the OPC stack. See github OPC .NET stack for definitions.\nTo enable IoTHub telemetry tracing set it to 711.\nDefault: {OpcStackTraceMask:X} ({Program.OpcStackTraceMask})", (int i) => { if (i >= 0) { OpcStackTraceMask = i; @@ -269,7 +272,7 @@ namespace Opc.Ua.Publisher } } }, - { "aa|autoaccept=", $"the publisher accept all servers it is connecting to.\nDefault: {OpcPublisherAutoAccept}", (bool b) => OpcPublisherAutoAccept = b }, + { "as|autotrustservercerts=", $"the publisher trusts all servers it is establishing a connection to.\nDefault: {OpcPublisherAutoTrustServerCerts}", (bool b) => OpcPublisherAutoTrustServerCerts = b }, // trust own public cert option { "tm|trustmyself=", $"the publisher certificate is put into the trusted certificate store automatically.\nDefault: {TrustMyself}", (bool b) => TrustMyself = b }, @@ -418,14 +421,14 @@ namespace Opc.Ua.Publisher } // Set certificate validator. - if (OpcPublisherAutoAccept) + if (OpcPublisherAutoTrustServerCerts) { - Trace("Publisher configured to auto trust server certificates."); - OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_AutoAccept); + Trace("Publisher configured to auto trust server certificates of the servers it is connecting to."); + OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_AutoTrustServerCerts); } else { - Trace("Publisher configured to not auto trust server certificates, but use certificate stores."); + Trace("Publisher configured to not auto trust server certificates. When connecting to servers, you need to manually copy the rejected server certs to the trusted store to trust them."); OpcConfiguration.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_Default); } @@ -542,11 +545,11 @@ namespace Opc.Ua.Publisher Task.Run( async () => await SessionConnector(cts.Token)); // stop on user request - Trace(""); - Trace(""); - Trace("Publisher is running. Press ENTER to quit."); - Trace(""); - Trace(""); + WriteLine(""); + WriteLine(""); + WriteLine("Publisher is running. Press ENTER to quit."); + WriteLine(""); + WriteLine(""); ReadLine(); cts.Cancel(); @@ -595,7 +598,7 @@ namespace Opc.Ua.Publisher try { // get tasks for all disconnected sessions and start them - var singleSessionHandlerTaskList = OpcSessions.Where(p => p.State == OpcSession.SessionState.Disconnected).Select(s => s.ConnectAndOrMonitor()); + var singleSessionHandlerTaskList = OpcSessions.Where(p => p.State == OpcSession.SessionState.Disconnected).Select(s => s.ConnectAndMonitor()); await Task.WhenAll(singleSessionHandlerTaskList); } catch (Exception e) @@ -643,13 +646,13 @@ namespace Opc.Ua.Publisher } /// - /// Default certificate validation callback + /// Auto trust server certificate validation callback /// - private static void CertificateValidator_AutoAccept(CertificateValidator validator, CertificateValidationEventArgs e) + private static void CertificateValidator_AutoTrustServerCerts(CertificateValidator validator, CertificateValidationEventArgs e) { if (e.Error.StatusCode == StatusCodes.BadCertificateUntrusted) { - Trace($"Certificate '{e.Certificate.Subject}' will be auto accepted."); + Trace($"Certificate '{e.Certificate.Subject}' will be trusted, since the autotrustservercerts options was specified."); e.Accept = true; return; } diff --git a/src/PublisherState.cs b/src/PublisherState.cs index 924e4df..9d590c5 100644 --- a/src/PublisherState.cs +++ b/src/PublisherState.cs @@ -83,8 +83,8 @@ namespace Publisher opcSession.AddNodeForMonitoring(nodeToPublish.NodeId); Trace("DoPublish: Requested to monitor item."); - // do whatever is needed. - Task monitorTask = Task.Run(async () => await opcSession.ConnectAndOrMonitor()); + // start monitoring the node + Task monitorTask = Task.Run(async () => await opcSession.ConnectAndMonitor()); monitorTask.Wait(); Trace("DoPublish: Session processing completed."); @@ -134,7 +134,7 @@ namespace Publisher return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA endpoint URL as second argument!"); } - // Find the session and stop monitoring the item. + // Find the session and stop monitoring the node. try { // find the session we need to monitor the node @@ -154,8 +154,8 @@ namespace Publisher opcSession.TagNodeForMonitoringStop(nodeToUnpublish.NodeId); Trace("UnPublishNodeMethod: Requested to stop monitoring of node."); - // do whatever is needed. - Task monitorTask = Task.Run(async () => await opcSession.ConnectAndOrMonitor()); + // stop monitoring the node + Task monitorTask = Task.Run(async () => await opcSession.ConnectAndMonitor()); monitorTask.Wait(); Trace("UnPublishNodeMethod: Session processing completed."); @@ -213,7 +213,7 @@ namespace Publisher // configure publisher and write connection string try { - IotHubMessaging.UpdateConnectionString(connectionString); + IotHubMessaging.ConnectionStringWrite(connectionString); } catch (Exception e) { diff --git a/src/SecureIoTHubToken.cs b/src/SecureIoTHubToken.cs index fbf8ec2..575033b 100644 --- a/src/SecureIoTHubToken.cs +++ b/src/SecureIoTHubToken.cs @@ -21,14 +21,42 @@ namespace IoTHubCredentialTools { public class SecureIoTHubToken { + private static string CheckForToken(X509Certificate2 cert, string name) + { + if ((cert.SubjectName.Decode(X500DistinguishedNameFlags.None | X500DistinguishedNameFlags.DoNotUseQuotes).Equals("CN=" + name, StringComparison.OrdinalIgnoreCase)) && + (DateTime.Now < cert.NotAfter)) + { + using (RSA rsa = cert.GetRSAPrivateKey()) + { + if (rsa != null) + { + foreach (System.Security.Cryptography.X509Certificates.X509Extension extension in cert.Extensions) + { + // check for instruction code extension + if ((extension.Oid.Value == "2.5.29.23") && (extension.RawData.Length >= 4)) + { + byte[] bytes = new byte[extension.RawData.Length - 4]; + Array.Copy(extension.RawData, 4, bytes, 0, bytes.Length); + byte[] token = rsa.Decrypt(bytes, RSAEncryptionPadding.OaepSHA1); + return Encoding.ASCII.GetString(token); + } + } + } + } + } + return null; + } + public static string Read(string name, string storeType, string storePath) { + string token = null; + // handle each store type differently switch (storeType) { case CertificateStoreType.Directory: { - // load an existing key from a no-expired cert with the subject name passed in from the OS-provided X509Store + // search a non expired cert with the given subject in the directory cert store and return the token using (DirectoryCertificateStore store = new DirectoryCertificateStore()) { store.Open(storePath); @@ -36,64 +64,32 @@ namespace IoTHubCredentialTools foreach (X509Certificate2 cert in certificates) { - if ((cert.SubjectName.Decode(X500DistinguishedNameFlags.None | X500DistinguishedNameFlags.DoNotUseQuotes).Equals("CN=" + name, StringComparison.OrdinalIgnoreCase)) && - (DateTime.Now < cert.NotAfter)) + if ((token = CheckForToken(cert, name)) != null) { - using (RSA rsa = cert.GetRSAPrivateKey()) - { - if (rsa != null) - { - foreach (System.Security.Cryptography.X509Certificates.X509Extension extension in cert.Extensions) - { - // check for instruction code extension - if ((extension.Oid.Value == "2.5.29.23") && (extension.RawData.Length >= 4)) - { - byte[] bytes = new byte[extension.RawData.Length - 4]; - Array.Copy(extension.RawData, 4, bytes, 0, bytes.Length); - byte[] token = rsa.Decrypt(bytes, RSAEncryptionPadding.OaepSHA1); - return Encoding.ASCII.GetString(token); - } - } - } - } + return token; } } } break; } + case CertificateStoreType.X509Store: { - // load an existing key from a no-expired cert with the subject name passed in from the OS-provided X509Store + // search a non expired cert with the given subject in the X509 cert store and return the token using (X509Store store = new X509Store(storePath, StoreLocation.CurrentUser)) { store.Open(OpenFlags.ReadOnly); foreach (X509Certificate2 cert in store.Certificates) { - if ((cert.SubjectName.Decode(X500DistinguishedNameFlags.None | X500DistinguishedNameFlags.DoNotUseQuotes).Equals("CN=" + name, StringComparison.OrdinalIgnoreCase)) && - (DateTime.Now < cert.NotAfter)) + if ((token = CheckForToken(cert, name)) != null) { - using (RSA rsa = cert.GetRSAPrivateKey()) - { - if (rsa != null) - { - foreach (System.Security.Cryptography.X509Certificates.X509Extension extension in cert.Extensions) - { - // check for instruction code extension - if ((extension.Oid.Value == "2.5.29.23") && (extension.RawData.Length >= 4)) - { - byte[] bytes = new byte[extension.RawData.Length - 4]; - Array.Copy(extension.RawData, 4, bytes, 0, bytes.Length); - byte[] token = rsa.Decrypt(bytes, RSAEncryptionPadding.OaepSHA1); - return Encoding.ASCII.GetString(token); - } - } - } - } + return token; } } } break; } + default: { throw new Exception($"The requested store type '{storeType}' is not supported. Please change."); diff --git a/src/TraceWorkaround.cs b/src/TraceWorkaround.cs index c1f8819..d2ca6a3 100644 --- a/src/TraceWorkaround.cs +++ b/src/TraceWorkaround.cs @@ -13,14 +13,17 @@ namespace Opc.Ua.Workarounds /// public static void Trace(string message, params object[] args) { - Utils.Trace(Utils.TraceMasks.Information, message, args); - WriteLine(DateTime.Now.ToString() + ": " + message, args); + Utils.Trace(Utils.TraceMasks.Error, message, args); + if (VerboseConsole) + { + WriteLine(DateTime.Now.ToString() + ": " + message, args); + } } public static void Trace(int traceMask, string format, params object[] args) { Utils.Trace(traceMask, format, args); - if ((OpcStackTraceMask & traceMask) != 0) + if (VerboseConsole && (OpcStackTraceMask & traceMask) != 0) { WriteLine(DateTime.Now.ToString() + ": " + format, args); }