Merge pull request #67 from khilscher/master

Added message batching to OpcPublisher
This commit is contained in:
Hans Gschossmann 2017-08-22 13:52:44 +02:00 коммит произвёл GitHub
Родитель e1da0adae3 3e4098ecc1
Коммит ff2d01dae4
2 изменённых файлов: 152 добавлений и 9 удалений

Просмотреть файл

@ -40,6 +40,8 @@
<PackageReference Include="Microsoft.Azure.Devices" Version="1.3.0" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.4.0" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="0.4.1" />
<PackageReference Include="System.Threading.Timer" Version="4.3.0" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -6,10 +6,13 @@ using Newtonsoft.Json;
using Opc.Ua.Client;
using Publisher;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace Opc.Ua.Publisher
{
@ -21,7 +24,11 @@ namespace Opc.Ua.Publisher
public static List<Uri> m_endpointUrls = new List<Uri>();
public static string m_applicationName = string.Empty;
public static DeviceClient m_deviceClient = null;
private static ConcurrentQueue<string> m_sendQueue = new ConcurrentQueue<string>();
private const uint m_maxSizeOfIoTHubMessageBytes = 4096;
private const int m_defaultSendIntervalInMilliSeconds = 1000;
private static int m_currentSizeOfIoTHubMessageBytes = 0;
private static List<OpcUaMessage> m_messageList = new List<OpcUaMessage>();
private static PublisherServer m_server = new PublisherServer();
/// <summary>
@ -201,6 +208,21 @@ namespace Opc.Ua.Publisher
}
}
Task dequeueAndSendTask = null;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
Trace("Creating task to send OPC UA messages in batches to IoT Hub...");
try
{
dequeueAndSendTask = Task.Run(() => DeQueueMessagesAsync(token),token);
}
catch (Exception ex)
{
Trace("Exception: " + ex.ToString());
}
Trace("Publisher is running. Press enter to quit.");
Console.ReadLine();
@ -209,10 +231,22 @@ namespace Opc.Ua.Publisher
session.Close();
}
//Send cancellation token and wait for last IoT Hub message to be sent.
try
{
tokenSource.Cancel();
dequeueAndSendTask.Wait();
}
catch (Exception ex)
{
Trace("Exception: " + ex.ToString());
}
if (m_deviceClient != null)
{
m_deviceClient.CloseAsync().Wait();
}
}
catch (Exception e)
{
@ -340,17 +374,110 @@ namespace Opc.Ua.Publisher
encoder.WriteDataValue("Value", value);
string json = encoder.CloseAndReturnText();
var eventMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(json));
// add message to fifo send queue
m_sendQueue.Enqueue(json);
}
catch (Exception exception)
{
Trace("Error processing monitored item notification: " + exception.ToString());
}
}
/// <summary>
/// Dequeue messages
/// </summary>
private static async Task DeQueueMessagesAsync(CancellationToken ct)
{
try
{
//Send every x seconds, regardless if IoT Hub message is full.
Timer sendTimer = new Timer(async state => await SendToIoTHubAsync(), null, 0, m_defaultSendIntervalInMilliSeconds);
while (true)
{
if (ct.IsCancellationRequested)
{
Trace("Cancellation requested. Sending remaining messages.");
sendTimer.Dispose();
await SendToIoTHubAsync();
break;
}
if (m_sendQueue.Count > 0)
{
bool isPeekSuccessful = false;
bool isDequeueSuccessful = false;
string messageInJson = string.Empty;
int nextMessageSizeBytes = 0;
//Perform a TryPeek to determine size of next message
//and whether it will fit. If so, dequeue message and add it to the list.
//If it cannot fit, send current message to IoT Hub, reset it, and repeat.
isPeekSuccessful = m_sendQueue.TryPeek(out messageInJson);
//Get size of next message in the queue
if (isPeekSuccessful)
{
nextMessageSizeBytes = System.Text.Encoding.UTF8.GetByteCount(messageInJson);
}
//Determine if it will fit into remaining space of the IoT Hub message.
//If so, dequeue it
if (m_currentSizeOfIoTHubMessageBytes + nextMessageSizeBytes < m_maxSizeOfIoTHubMessageBytes)
{
isDequeueSuccessful = m_sendQueue.TryDequeue(out messageInJson);
//Add dequeued message to list
if (isDequeueSuccessful)
{
OpcUaMessage msgPayload = JsonConvert.DeserializeObject<OpcUaMessage>(messageInJson);
m_messageList.Add(msgPayload);
m_currentSizeOfIoTHubMessageBytes = m_currentSizeOfIoTHubMessageBytes + nextMessageSizeBytes;
}
}
else
{
//Message is full. Send it to IoT Hub
await SendToIoTHubAsync();
}
}
}
}
catch (Exception exception)
{
Console.WriteLine("Error dequeuing message: " + exception.ToString());
}
}
/// <summary>
/// Send dequeued messages to IoT Hub
/// </summary>
private static async Task SendToIoTHubAsync()
{
if (m_messageList.Count > 0)
{
string msgListInJson = JsonConvert.SerializeObject(m_messageList);
var encodedMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(msgListInJson));
// publish
eventMessage.Properties.Add("content-type", "application/opcua+uajson");
eventMessage.Properties.Add("deviceName", m_applicationName);
encodedMessage.Properties.Add("content-type", "application/opcua+uajson");
encodedMessage.Properties.Add("deviceName", m_applicationName);
try
{
if (m_deviceClient != null)
{
m_deviceClient.SendEventAsync(eventMessage).Wait();
await m_deviceClient.SendEventAsync(encodedMessage);
}
}
catch (Exception ex)
@ -358,11 +485,25 @@ namespace Opc.Ua.Publisher
Trace("Failed to publish message, dropping...");
Trace(ex.ToString());
}
//Reset IoT Hub message size
m_currentSizeOfIoTHubMessageBytes = 0;
m_messageList.Clear();
}
catch (Exception exception)
{
Trace("Error processing monitored item notification: " + exception.ToString());
}
}
private class OpcUaMessage
{
public string ApplicationUri { get; set; }
public string DisplayName { get; set; }
public string NodeId { get; set; }
public OpcUaValue Value { get; set; }
}
private class OpcUaValue
{
public string Value { get; set; }
public string SourceTimestamp { get; set; }
}
/// <summary>