Change the way how the worker task for session creation and node monitoring is started and signaled. This fixes an issue if a high number of nodes is published via IoTHub direct methods, which earlier has blocked execution of the app.

This commit is contained in:
Hans Gschossmann 2018-09-05 19:35:16 +02:00
Родитель b1aab476f0
Коммит e143a65c92
3 изменённых файлов: 81 добавлений и 69 удалений

Просмотреть файл

@ -306,6 +306,12 @@ namespace OpcPublisher
Logger.Error(e, $"{logPrefix} Exception while trying to configure publishing node '{(isNodeIdFormat ? nodeId.ToString() : expandedNodeId.ToString())}'");
return (new MethodResponse((int)HttpStatusCode.InternalServerError));
}
// stop when we encounter a problem
if (statusCode != HttpStatusCode.OK && statusCode != HttpStatusCode.Accepted)
{
break;
}
}
}
catch (AggregateException e)
@ -406,13 +412,13 @@ namespace OpcPublisher
if (isNodeIdFormat)
{
// stop monitoring the node, execute syncronously
// stop monitoring the node, execute synchronously
Logger.Information($"{logPrefix} Request to stop monitoring item with NodeId '{nodeId.ToString()}')");
statusCode = await opcSession.RequestMonitorItemRemovalAsync(nodeId, null, ShutdownTokenSource.Token);
}
else
{
// stop monitoring the node, execute syncronously
// stop monitoring the node, execute synchronously
Logger.Information($"{logPrefix} Request to stop monitoring item with ExpandedNodeId '{expandedNodeId.ToString()}')");
statusCode = await opcSession.RequestMonitorItemRemovalAsync(null, expandedNodeId, ShutdownTokenSource.Token);
}

Просмотреть файл

@ -428,6 +428,8 @@ namespace OpcPublisher
public static Int32 NodeConfigVersion = 0;
public static int SessionConnectWaitSec { get; set; } = 10;
public Uri EndpointUrl;
public Session OpcUaClientSession;
@ -446,6 +448,8 @@ namespace OpcPublisher
public bool UseSecurity { get; set; } = true;
public AutoResetEvent ConnectAndMonitorSession;
public int GetNumberOfOpcSubscriptions()
{
int result = 0;
@ -507,46 +511,70 @@ namespace OpcPublisher
MissedKeepAlives = 0;
PublishingInterval = OpcPublishingInterval;
UseSecurity = useSecurity;
ConnectAndMonitorSession = new AutoResetEvent(false);
_sessionCancelationTokenSource = new CancellationTokenSource();
_sessionCancelationToken = _sessionCancelationTokenSource.Token;
_opcSessionSemaphore = new SemaphoreSlim(1);
_namespaceTable = new NamespaceTable();
_telemetryConfiguration = GetEndpointTelemetryConfiguration(endpointUrl.AbsoluteUri);
Task.Run(ConnectAndMonitorAsync);
}
/// <summary>
/// This task is executed regularily and ensures:
/// This task is started when a session is configured and is running till session shutdown and ensures:
/// - disconnected sessions are reconnected.
/// - monitored nodes are no longer monitored if requested to do so.
/// - monitoring for a node starts if it is required.
/// - unused subscriptions (without any nodes to monitor) are removed.
/// - sessions with out subscriptions are removed.
/// </summary>
public async Task ConnectAndMonitorAsync(CancellationToken ct)
public async Task ConnectAndMonitorAsync()
{
uint lastNodeConfigVersion = 0;
try
WaitHandle[] connectAndMonitorEvents = new WaitHandle[]
{
await ConnectSessionAsync(ct);
_sessionCancelationToken.WaitHandle,
ConnectAndMonitorSession
};
await MonitorNodesAsync(ct);
await StopMonitoringNodesAsync(ct);
await RemoveUnusedSubscriptionsAsync(ct);
await RemoveUnusedSessionsAsync(ct);
// update the config file if required
if (NodeConfigVersion != lastNodeConfigVersion)
// run till session is closed
while (!_sessionCancelationToken.IsCancellationRequested)
{
try
{
lastNodeConfigVersion = (uint)NodeConfigVersion;
await UpdateNodeConfigurationFileAsync();
// wait till:
// - cancelation is requested
// - got signaled because we need to check for pending session activity
// - timeout to try to reestablish any disconnected sessions
WaitHandle.WaitAny(connectAndMonitorEvents, SessionConnectWaitSec * 1000);
// step out on cancel
if (_sessionCancelationToken.IsCancellationRequested)
{
break;
}
await ConnectSessionAsync(_sessionCancelationToken);
await MonitorNodesAsync(_sessionCancelationToken);
await StopMonitoringNodesAsync(_sessionCancelationToken);
await RemoveUnusedSubscriptionsAsync(_sessionCancelationToken);
await RemoveUnusedSessionsAsync(_sessionCancelationToken);
// update the config file if required
if (NodeConfigVersion != lastNodeConfigVersion)
{
lastNodeConfigVersion = (uint)NodeConfigVersion;
await UpdateNodeConfigurationFileAsync();
}
}
catch (Exception e)
{
Logger.Error(e, "Error in ConnectAndMonitorAsync.");
}
}
catch (Exception e)
{
Logger.Error(e, "Error in ConnectAndMonitorAsync.");
}
}
@ -821,20 +849,20 @@ namespace OpcPublisher
case StatusCodes.BadNodeIdInvalid:
case StatusCodes.BadNodeIdUnknown:
{
Logger.Error($"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUrl}'.");
Logger.Error($"Failed to monitor node '{currentNodeId}' on endpoint '{EndpointUrl}'.");
Logger.Error($"OPC UA ServiceResultException is '{sre.Result}'. Please check your publisher configuration for this node.");
break;
}
default:
{
Logger.Error($"Unhandled OPC UA ServiceResultException '{sre.Result}' when monitoring node '{currentNodeId.Identifier}' on endpoint '{EndpointUrl}'. Continue.");
Logger.Error($"Unhandled OPC UA ServiceResultException '{sre.Result}' when monitoring node '{currentNodeId}' on endpoint '{EndpointUrl}'. Continue.");
break;
}
}
}
catch (Exception e)
{
Logger.Error(e, $"Failed to monitor node '{currentNodeId.Identifier}' on endpoint '{EndpointUrl}'");
Logger.Error(e, $"Failed to monitor node '{currentNodeId}' on endpoint '{EndpointUrl}'");
}
}
opcSubscription.OpcUaClientSubscription.SetPublishingMode(true);
@ -1133,7 +1161,7 @@ namespace OpcPublisher
Logger.Debug($"{logPrefix} Added item with nodeId '{(expandedNodeId == null ? nodeId.ToString() : expandedNodeId.ToString())}' for monitoring.");
// trigger the actual OPC communication with the server to be done
Task t = Task.Run(async () => await ConnectAndMonitorAsync(ct));
ConnectAndMonitorSession.Set();
return HttpStatusCode.Accepted;
}
else
@ -1211,7 +1239,7 @@ namespace OpcPublisher
}
// trigger the actual OPC communication with the server to be done
Task t = Task.Run(async () => await ConnectAndMonitorAsync(ct));
ConnectAndMonitorSession.Set();
}
catch (Exception e)
{

Просмотреть файл

@ -105,10 +105,10 @@ namespace OpcPublisher
}
},
{ "ic|iotcentral", $"publisher will send OPC UA data in IoTCentral compatible format (DisplayName of a node is used as key, this key is the Field name in IoTCentral). you need to ensure that all DisplayName's are unique. (Auto enables fetch display name)\nDefault: {IotCentralMode}", b => IotCentralMode = FetchOpcNodeDisplayName = b != null },
{ "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints and starts monitoring unmonitored items\nMin: 10\nDefault: {_publisherSessionConnectWaitSec}", (int i) => {
{ "sw|sessionconnectwait=", $"specify the wait time in seconds publisher is trying to connect to disconnected endpoints and starts monitoring unmonitored items\nMin: 10\nDefault: {SessionConnectWaitSec}", (int i) => {
if (i > 10)
{
_publisherSessionConnectWaitSec = i;
SessionConnectWaitSec = i;
}
else
{
@ -587,8 +587,8 @@ namespace OpcPublisher
return;
}
// kick off the task to maintain all sessions
Task sessionConnectorAsync = Task.Run(async () => await SessionConnectorAsync(ShutdownTokenSource.Token));
// kick off OPC session creation and node monitoring
await SessionStartAsync();
// Show notification on session events
_publisherServer.CurrentInstance.SessionManager.SessionActivated += ServerEventStatus;
@ -620,13 +620,10 @@ namespace OpcPublisher
ShutdownTokenSource.Cancel();
Logger.Information("Publisher is shutting down...");
// Wait for session connector completion
await sessionConnectorAsync;
// stop the server
_publisherServer.Stop();
// Clean up Publisher sessions
// shutdown all OPC sessions
await SessionShutdownAsync();
// shutdown the IoTHub messaging
@ -655,40 +652,22 @@ namespace OpcPublisher
}
/// <summary>
/// Kicks of the work horse of the publisher regularily for all sessions.
/// Start all sessions.
/// </summary>
public static async Task SessionConnectorAsync(CancellationToken ct)
public async static Task SessionStartAsync()
{
while (true)
try
{
try
{
// get tasks for all disconnected sessions and start them
Task[] singleSessionHandlerTaskList;
try
{
await OpcSessionsListSemaphore.WaitAsync();
singleSessionHandlerTaskList = OpcSessions.Select(s => s.ConnectAndMonitorAsync(ct)).ToArray();
}
finally
{
OpcSessionsListSemaphore.Release();
}
Task.WaitAll(singleSessionHandlerTaskList);
}
catch (Exception e)
{
Logger.Error(e, $"Failed to connect and monitor a disconnected server. {(e.InnerException != null ? e.InnerException.Message : "")}");
}
try
{
await Task.Delay(_publisherSessionConnectWaitSec * 1000, ct);
}
catch { }
if (ct.IsCancellationRequested)
{
return;
}
await OpcSessionsListSemaphore.WaitAsync();
OpcSessions.ForEach(s => s.ConnectAndMonitorSession.Set());
}
catch (Exception e)
{
Logger.Fatal(e, "Failed to start all sessions.");
}
finally
{
OpcSessionsListSemaphore.Release();
}
}
@ -734,8 +713,8 @@ namespace OpcPublisher
Logger.Information($"There are still {sessionCount} sessions alive. Ignore and continue shutdown.");
return;
}
Logger.Information($"Publisher is shutting down. Wait {_publisherSessionConnectWaitSec} seconds, since there are stil {sessionCount} sessions alive...");
await Task.Delay(_publisherSessionConnectWaitSec * 1000);
Logger.Information($"Publisher is shutting down. Wait {SessionConnectWaitSec} seconds, since there are stil {sessionCount} sessions alive...");
await Task.Delay(SessionConnectWaitSec * 1000);
}
}
@ -893,7 +872,6 @@ namespace OpcPublisher
}
private static PublisherServer _publisherServer;
private static int _publisherSessionConnectWaitSec = 10;
private static bool _noShutdown = false;
private static bool _installOnly = false;
private static string _logFileName = $"{Utils.GetHostName()}-publisher.log";