Fix publishing and unpublishing OPC UA server methods, protect and

persist publishednodes.json, smaller fixes.
This commit is contained in:
Hans Gschossmann 2017-09-18 22:18:37 +02:00
Родитель 26c301250a
Коммит 253a76f539
4 изменённых файлов: 185 добавлений и 161 удалений

Просмотреть файл

@ -128,41 +128,6 @@ namespace Opc.Ua.Publisher
null
);
Configuration.SecurityConfiguration.ApplicationCertificate.Certificate = certificate ?? throw new Exception("OPC UA application certificate could not be created! Cannot continue without it!");
// Trust myself if requested.
if (Program.TrustMyself)
{
// Ensure it is trusted
try
{
ICertificateStore store = Configuration.SecurityConfiguration.TrustedPeerCertificates.OpenStore();
if (store == null)
{
Trace($"Could not open trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
}
else
{
try
{
Trace($"Adding publisher certificate to trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
X509Certificate2 publicKey = new X509Certificate2(certificate.RawData);
store.Add(publicKey).Wait();
}
finally
{
store.Close();
}
}
}
catch (Exception e)
{
Trace(e, $"Could not add publisher certificate to trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
}
}
else
{
Trace("Publisher certificate is not added to trusted peer store.");
}
}
else
{
@ -171,10 +136,45 @@ namespace Opc.Ua.Publisher
Configuration.ApplicationUri = Utils.GetApplicationUriFromCertificate(certificate);
Trace($"Application certificate is for Application URI: {Configuration.ApplicationUri}");
// Trust myself if requested.
if (Program.TrustMyself)
{
// Ensure it is trusted
try
{
ICertificateStore store = Configuration.SecurityConfiguration.TrustedPeerCertificates.OpenStore();
if (store == null)
{
Trace($"Could not open trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
}
else
{
try
{
Trace($"Adding publisher certificate to trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
X509Certificate2 publicKey = new X509Certificate2(certificate.RawData);
store.Add(publicKey).Wait();
}
finally
{
store.Close();
}
}
}
catch (Exception e)
{
Trace(e, $"Could not add publisher certificate to trusted peer store. StorePath={Configuration.SecurityConfiguration.TrustedPeerCertificates.StorePath}");
}
}
else
{
Trace("Publisher certificate is not added to trusted peer store.");
}
// patch our base address
if (Configuration.ServerConfiguration.BaseAddresses.Count == 0)
{
Configuration.ServerConfiguration.BaseAddresses.Add($"opc.tcp://{Configuration.ApplicationName.ToLowerInvariant()}:{Program.PublisherServerPort}{Program.PublisherServerPath}");
Configuration.ServerConfiguration.BaseAddresses.Add($"opc.tcp://{Utils.GetHostName().ToLowerInvariant()}:{Program.PublisherServerPort}{Program.PublisherServerPath}");
}
foreach (var endpoint in Configuration.ServerConfiguration.BaseAddresses)
{

Просмотреть файл

@ -207,8 +207,7 @@ namespace Opc.Ua.Publisher
{
Trace($"Connect and monitor session and nodes on endpoint '{EndpointUri.AbsoluteUri}'.");
EndpointDescription selectedEndpoint = CoreClientUtils.SelectEndpoint(EndpointUri.AbsoluteUri, true);
ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(selectedEndpoint.Server, EndpointConfiguration.Create(OpcConfiguration));
configuredEndpoint.Update(selectedEndpoint);
ConfiguredEndpoint configuredEndpoint = new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(OpcConfiguration));
try
{
@ -388,14 +387,20 @@ namespace Opc.Ua.Publisher
}
// shutdown unused sessions.
OpcSessionsSemaphore.Wait();
var unusedSessions = OpcSessions.Where(s => s.OpcSubscriptions.Count == 0);
foreach (var unusedSession in unusedSessions)
try
{
await unusedSession.Shutdown();
OpcSessions.Remove(unusedSession);
OpcSessionsSemaphore.Wait();
var unusedSessions = OpcSessions.Where(s => s.OpcSubscriptions.Count == 0);
foreach (var unusedSession in unusedSessions)
{
OpcSessions.Remove(unusedSession);
await unusedSession.Shutdown();
}
}
finally
{
OpcSessionsSemaphore.Release();
}
OpcSessionsSemaphore.Release();
}
catch (Exception e)
{
@ -468,49 +473,37 @@ namespace Opc.Ua.Publisher
/// If there is no spubscription with the requested publishing interval, one is created.
/// </summary>
/// <param name="publishingInterval"></param>
/// <param name="samplingInterval"></param>
/// <param name="nodeId"></param>
public void AddNodeForMonitoring(int publishingInterval, NodeId nodeId)
public void AddNodeForMonitoring(int publishingInterval, int samplingInterval, NodeId nodeId)
{
_opcSessionSemaphore.Wait();
OpcSubscription opcSubscription = null;
try
{
// find a subscription we could the node monitor on
try
{
opcSubscription = OpcSubscriptions.FirstOrDefault(s => s.RequestedPublishingInterval == publishingInterval);
}
catch
{
opcSubscription = null;
}
OpcSubscription opcSubscription = OpcSubscriptions.FirstOrDefault(s => s.RequestedPublishingInterval == publishingInterval);
// if there was none found, create one
if (opcSubscription == null)
{
int revisedPublishingInterval;
opcSubscription = new OpcSubscription(publishingInterval)
{
Subscription = CreateSubscription(publishingInterval, out revisedPublishingInterval),
PublishingInterval = revisedPublishingInterval
};
opcSubscription = new OpcSubscription(publishingInterval);
OpcSubscriptions.Add(opcSubscription);
Trace($"AddNodeForMonitoring: No matching subscription with publishing interval of {publishingInterval} found'. Requested to create a new one.");
}
// if it is already there, we just ignore it, otherwise we add a new item to monitor.
OpcMonitoredItem opcMonitoredItem = null;
try
{
opcMonitoredItem = opcSubscription.OpcMonitoredItems.FirstOrDefault(m => m.StartNodeId == nodeId);
}
catch
{
opcMonitoredItem = null;
}
OpcMonitoredItem opcMonitoredItem = opcSubscription.OpcMonitoredItems.FirstOrDefault(m => m.StartNodeId == nodeId);
// if there was none found, create one
if (opcMonitoredItem == null)
{
// add a new item to monitor
opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUri);
opcMonitoredItem = new OpcMonitoredItem(nodeId, EndpointUri)
{
RequestedSamplingInterval = samplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
Trace($"AddNodeForMonitoring: Added item with nodeId '{nodeId.ToString()}' for monitoring.");
}
}
finally
@ -585,10 +578,7 @@ namespace Opc.Ua.Publisher
finally
{
_opcSessionSemaphore.Release();
if (OpcSessions.Count(s => s.State == SessionState.Connected) == 0)
{
_opcSessionSemaphore.Dispose();
}
_opcSessionSemaphore.Dispose();
}
}
@ -630,10 +620,6 @@ namespace Opc.Ua.Publisher
var opcSessions = OpcSessions.Where(s => s.Session != null);
opcSession = opcSessions.Where(s => s.Session.ConfiguredEndpoint.EndpointUrl.Equals(session.ConfiguredEndpoint.EndpointUrl)).FirstOrDefault();
}
catch
{
opcSession = null;
}
finally
{
OpcSessionsSemaphore.Release();

Просмотреть файл

@ -28,6 +28,7 @@ namespace Opc.Ua.Publisher
public static List<PublishNodeConfig> PublishConfig = new List<PublishNodeConfig>();
public static string NodesToPublishAbsFilenameDefault = $"{System.IO.Directory.GetCurrentDirectory()}{Path.DirectorySeparatorChar}publishednodes.json";
public static string NodesToPublishAbsFilename { get; set; }
public static SemaphoreSlim PublishDataSemaphore = new SemaphoreSlim(1);
public static string ShopfloorDomain { get; set; }
public static bool VerboseConsole { get; set; }
@ -68,7 +69,7 @@ namespace Opc.Ua.Publisher
public static string PublisherServerSecurityPolicy = SecurityPolicies.Basic128Rsa15;
public static string OpcOwnCertStoreType = X509Store;
public static string OpcOwnCertStoreType = Directory;
private const string _opcOwnCertDirectoryStorePathDefault = "CertificateStores/own";
private const string _opcOwnCertX509StorePathDefault = "CurrentUser\\UA_MachineDefault";
public static string OpcOwnCertStorePath = _opcOwnCertX509StorePathDefault;
@ -417,7 +418,7 @@ namespace Opc.Ua.Publisher
}
WriteLine("Publisher is starting up...");
// init OPC configuration and tracing
ModuleConfiguration moduleConfiguration = new ModuleConfiguration(ApplicationName);
Init(OpcStackTraceMask, VerboseConsole);
@ -464,6 +465,7 @@ namespace Opc.Ua.Publisher
// get information on the nodes to publish and validate the json by deserializing it.
try
{
PublishDataSemaphore.Wait();
if (string.IsNullOrEmpty(NodesToPublishAbsFilename))
{
// check if we have an env variable specifying the published nodes path, otherwise use the default
@ -502,6 +504,10 @@ namespace Opc.Ua.Publisher
Trace("exiting...");
return;
}
finally
{
PublishDataSemaphore.Release();
}
Trace($"There are {PublishConfig.Count.ToString()} nodes to publish.");
// initialize and start IoTHub messaging
@ -512,27 +518,30 @@ namespace Opc.Ua.Publisher
}
// create a list to manage sessions, subscriptions and monitored items.
OpcSessionsSemaphore.Wait();
var uniqueEndpointUrls = PublishConfig.Select(n => n.EndpointUri).Distinct();
foreach (var endpointUrl in uniqueEndpointUrls)
try
{
// create new session info.
OpcSession opcSession = new OpcSession(endpointUrl, OpcSessionCreationTimeout);
// create for all different OPC publishing intervals to this endpoint separate subscriptions
var nodesDistinctPublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Select( c => c.OpcPublishingInterval).Distinct();
foreach (var nodeDistinctPublishingInterval in nodesDistinctPublishingInterval)
PublishDataSemaphore.Wait();
OpcSessionsSemaphore.Wait();
var uniqueEndpointUrls = PublishConfig.Select(n => n.EndpointUri).Distinct();
foreach (var endpointUrl in uniqueEndpointUrls)
{
// create a subscription for the publishing interval and add it to the session.
OpcSubscription opcSubscription = new OpcSubscription(nodeDistinctPublishingInterval);
// create new session info.
OpcSession opcSession = new OpcSession(endpointUrl, OpcSessionCreationTimeout);
// add all nodes with this OPC publishing interval to this subscription.
var nodesWithSamePublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Where(n => n.OpcPublishingInterval == nodeDistinctPublishingInterval);
foreach (var nodeInfo in nodesWithSamePublishingInterval)
// create for all different OPC publishing intervals to this endpoint separate subscriptions
var nodesDistinctPublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Select(c => c.OpcPublishingInterval).Distinct();
foreach (var nodeDistinctPublishingInterval in nodesDistinctPublishingInterval)
{
// differentiate if legacy (using ns=) or new syntax (using nsu=) is used
if (nodeInfo.NodeId == null)
// create a subscription for the publishing interval and add it to the session.
OpcSubscription opcSubscription = new OpcSubscription(nodeDistinctPublishingInterval);
// add all nodes with this OPC publishing interval to this subscription.
var nodesWithSamePublishingInterval = PublishConfig.Where(n => n.EndpointUri.Equals(endpointUrl)).Where(n => n.OpcPublishingInterval == nodeDistinctPublishingInterval);
foreach (var nodeInfo in nodesWithSamePublishingInterval)
{
// differentiate if legacy (using ns=) or new syntax (using nsu=) is used
if (nodeInfo.NodeId == null)
{
// create a monitored item for the node
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.ExpandedNodeId, opcSession.EndpointUri)
{
@ -540,30 +549,35 @@ namespace Opc.Ua.Publisher
SamplingInterval = nodeInfo.OpcSamplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
}
else
{
// give user a warning that the syntax is obsolete
Trace($"Please update the syntax of the configuration file and use ExpandedNodeId instead of NodeId property name for node with identifier '{nodeInfo.NodeId.ToString()}' on EndpointUrl '{nodeInfo.EndpointUri.AbsolutePath}'.");
// create a monitored item for the node with the configured or default sampling interval
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUri)
}
else
{
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
SamplingInterval = nodeInfo.OpcSamplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
// give user a warning that the syntax is obsolete
Trace($"Please update the syntax of the configuration file and use ExpandedNodeId instead of NodeId property name for node with identifier '{nodeInfo.NodeId.ToString()}' on EndpointUrl '{nodeInfo.EndpointUri.AbsolutePath}'.");
// create a monitored item for the node with the configured or default sampling interval
OpcMonitoredItem opcMonitoredItem = new OpcMonitoredItem(nodeInfo.NodeId, opcSession.EndpointUri)
{
RequestedSamplingInterval = nodeInfo.OpcSamplingInterval,
SamplingInterval = nodeInfo.OpcSamplingInterval
};
opcSubscription.OpcMonitoredItems.Add(opcMonitoredItem);
}
}
// add subscription to session.
opcSession.OpcSubscriptions.Add(opcSubscription);
}
// add subscription to session.
opcSession.OpcSubscriptions.Add(opcSubscription);
// add session.
OpcSessions.Add(opcSession);
}
// add session.
OpcSessions.Add(opcSession);
}
OpcSessionsSemaphore.Release();
finally
{
OpcSessionsSemaphore.Release();
PublishDataSemaphore.Release();
}
// kick off the task to maintain all sessions
var cts = new CancellationTokenSource();
@ -644,11 +658,19 @@ namespace Opc.Ua.Publisher
try
{
// get tasks for all disconnected sessions and start them
var shutdownSessionTaskList = OpcSessions.Select(s => s.Shutdown());
if (shutdownSessionTaskList.GetEnumerator().MoveNext())
try
{
shutdownSessionTaskList.GetEnumerator().Reset();
await Task.WhenAll(shutdownSessionTaskList);
OpcSessionsSemaphore.Wait();
var shutdownSessionTaskList = OpcSessions.Select(s => s.Shutdown());
if (shutdownSessionTaskList.GetEnumerator().MoveNext())
{
shutdownSessionTaskList.GetEnumerator().Reset();
await Task.WhenAll(shutdownSessionTaskList);
}
}
finally
{
OpcSessionsSemaphore.Release();
}
}
catch (Exception e)

Просмотреть файл

@ -45,13 +45,13 @@ namespace Publisher
Uri endpointUri = null;
try
{
nodeId = inputArguments[0] as string;
endpointUri = inputArguments[1] as Uri;
if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string))
{
Trace($"PublishNodeMethod: Arguments (0 (nodeId), 1 (endpointUrl)) are not valid strings!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
}
nodeId = inputArguments[0] as string;
endpointUri = new Uri(inputArguments[1] as string);
publishNodeConfig = new PublishNodeConfig(nodeId, endpointUri, OpcSamplingInterval, OpcPublishingInterval);
}
catch (UriFormatException)
@ -60,6 +60,15 @@ namespace Publisher
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide a valid OPC UA endpoint URL as second argument!");
}
// Process publishing and sampling intervals if passed in.
int publishingInterval = OpcPublishingInterval;
int samplingInterval = OpcSamplingInterval;
if (inputArguments.Count > 2)
{
int.TryParse(inputArguments[2] as string, out publishingInterval);
int.TryParse(inputArguments[3] as string, out samplingInterval);
}
// find/create a session to the endpoint URL and start monitoring the node.
try
{
@ -68,14 +77,7 @@ namespace Publisher
try
{
OpcSessionsSemaphore.Wait();
try
{
opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == publishNodeConfig.EndpointUri);
}
catch
{
opcSession = null;
}
opcSession = OpcSessions.FirstOrDefault(s => s.EndpointUri == publishNodeConfig.EndpointUri);
// add a new session.
if (opcSession == null)
@ -83,38 +85,41 @@ namespace Publisher
// create new session info.
opcSession = new OpcSession(publishNodeConfig.EndpointUri, OpcSessionCreationTimeout);
OpcSessions.Add(opcSession);
Trace($"PublishNodeMethod: No matching session found for endpoint '{publishNodeConfig.EndpointUri.AbsolutePath}'. Requested to create a new one.");
Trace($"PublishNodeMethod: No matching session found for endpoint '{publishNodeConfig.EndpointUri.OriginalString}'. Requested to create a new one.");
}
else
{
Trace($"PublishNodeMethod: Session found for endpoint '{publishNodeConfig.EndpointUri.AbsolutePath}'");
Trace($"PublishNodeMethod: Session found for endpoint '{publishNodeConfig.EndpointUri.OriginalString}'");
}
// add the node info to the subscription with the default publishing interval
opcSession.AddNodeForMonitoring(publishingInterval, samplingInterval, publishNodeConfig.NodeId);
Trace("PublishNodeMethod: Requested to monitor item.");
}
finally
{
OpcSessionsSemaphore.Release();
}
// add the node info to the subscription with the default publishing interval
opcSession.AddNodeForMonitoring(OpcPublishingInterval, publishNodeConfig.NodeId);
Trace("PublishNodeMethod: Requested to monitor item.");
// start monitoring the node
Task monitorTask = Task.Run(async () => await opcSession.ConnectAndMonitor());
monitorTask.Wait();
Trace("PublishNodeMethod: Session processing completed.");
// update our data
PublishConfig.Add(publishNodeConfig);
// add it also to the publish file
var publishConfigFileEntry = new PublishConfigFileEntry()
try
{
EndpointUri = endpointUri,
NodeId = nodeId
};
PublishConfigFileEntries.Add(publishConfigFileEntry);
File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublishConfigFileEntries));
PublishDataSemaphore.Wait();
PublishConfig.Add(publishNodeConfig);
// add it also to the publish file
var publishConfigFileEntry = new PublishConfigFileEntry()
{
EndpointUri = endpointUri,
NodeId = nodeId
};
PublishConfigFileEntries.Add(publishConfigFileEntry);
File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublishConfigFileEntries));
}
finally
{
PublishDataSemaphore.Release();
}
Trace($"PublishNodeMethod: Now publishing: {publishNodeConfig.NodeId.ToString()}");
return ServiceResult.Good;
@ -141,13 +146,13 @@ namespace Publisher
Uri endpointUri = null;
try
{
nodeId = inputArguments[0] as string;
endpointUri = inputArguments[1] as Uri;
if (string.IsNullOrEmpty(inputArguments[0] as string) || string.IsNullOrEmpty(inputArguments[1] as string))
{
Trace($"UnPublishNodeMethod: Arguments (0 (nodeId), 1 (endpointUrl)) are not valid strings!");
return ServiceResult.Create(StatusCodes.BadArgumentsMissing, "Please provide all arguments as strings!");
}
nodeId = inputArguments[0] as string;
endpointUri = new Uri(inputArguments[1] as string);
}
catch (UriFormatException)
{
@ -177,27 +182,30 @@ namespace Publisher
if (opcSession == null)
{
// do nothing if there is no session for this endpoint.
Trace($"UnPublishNodeMethod: Session for endpoint '{endpointUri.AbsolutePath}' not found.");
Trace($"UnPublishNodeMethod: Session for endpoint '{endpointUri.OriginalString}' not found.");
return ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session for endpoint of published node not found!");
}
else
{
Trace($"UnPublishNodeMethod: Session found for endpoint '{endpointUri.AbsolutePath}'");
Trace($"UnPublishNodeMethod: Session found for endpoint '{endpointUri.OriginalString}'");
}
// remove the node from the sessions monitored items list.
opcSession.TagNodeForMonitoringStop(nodeId);
Trace("UnPublishNodeMethod: Requested to stop monitoring of node.");
// stop monitoring the node
Task monitorTask = Task.Run(async () => await opcSession.ConnectAndMonitor());
monitorTask.Wait();
Trace("UnPublishNodeMethod: Session processing completed.");
// remove node from persisted config file
var entryToRemove = PublishConfigFileEntries.Find(l => l.NodeId == nodeId && l.EndpointUri == endpointUri);
PublishConfigFileEntries.Remove(entryToRemove);
File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublishConfigFileEntries));
try
{
PublishDataSemaphore.Wait();
var entryToRemove = PublishConfigFileEntries.Find(l => l.NodeId == nodeId && l.EndpointUri == endpointUri);
PublishConfigFileEntries.Remove(entryToRemove);
File.WriteAllText(NodesToPublishAbsFilename, JsonConvert.SerializeObject(PublishConfigFileEntries));
}
finally
{
PublishDataSemaphore.Release();
}
}
catch (Exception e)
{
@ -212,7 +220,15 @@ namespace Publisher
/// </summary>
private ServiceResult GetListOfPublishedNodesMethod(ISystemContext context, MethodState method, IList<object> inputArguments, IList<object> outputArguments)
{
outputArguments[0] = JsonConvert.SerializeObject(PublishConfigFileEntries);
try
{
PublishDataSemaphore.Wait();
outputArguments[0] = JsonConvert.SerializeObject(PublishConfigFileEntries);
}
finally
{
PublishDataSemaphore.Release();
}
Trace("GetListOfPublishedNodesMethod: Success!");
return ServiceResult.Good;