Added timer to send every 1 second regardless if IoT Hub message is full.

This commit is contained in:
Kevin Hilscher 2017-08-02 20:23:48 -06:00
Родитель f9dce41075
Коммит 4d7170609f
2 изменённых файлов: 29 добавлений и 20 удалений

Просмотреть файл

@ -40,6 +40,7 @@
<PackageReference Include="Microsoft.Azure.Devices" Version="1.2.9" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.3.0" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="0.4.0" />
<PackageReference Include="System.Threading.Timer" Version="4.3.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -13,6 +13,7 @@ using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace Opc.Ua.Publisher
{
public class Program
@ -25,6 +26,7 @@ namespace Opc.Ua.Publisher
public static DeviceClient m_deviceClient = null;
private static ConcurrentQueue<string> m_sendQueue = new ConcurrentQueue<string>();
private const uint m_maxSizeOfIoTHubMessageBytes = 4096;
private const int m_defaultSendIntervalInMilliSeconds = 1000;
private static int m_currentSizeOfIoTHubMessageBytes = 0;
private static List<OpcUaMessage> m_messageList = new List<OpcUaMessage>();
private static PublisherServer m_server = new PublisherServer();
@ -391,12 +393,15 @@ namespace Opc.Ua.Publisher
{
try
{
//Send every x seconds, regardless if IoT Hub message is full.
Timer sendTimer = new Timer(async state => await SendToIoTHubAsync(), null, 0, m_defaultSendIntervalInMilliSeconds);
while (true)
{
if (ct.IsCancellationRequested)
{
Trace("Cancellation requested. Sending remaining messages.");
sendTimer.Dispose();
await SendToIoTHubAsync();
break;
}
@ -459,30 +464,33 @@ namespace Opc.Ua.Publisher
/// </summary>
private static async Task SendToIoTHubAsync()
{
string msgListInJson = JsonConvert.SerializeObject(m_messageList);
var encodedMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(msgListInJson));
// publish
encodedMessage.Properties.Add("content-type", "application/opcua+uajson");
encodedMessage.Properties.Add("deviceName", m_applicationName);
try
if (m_messageList.Count > 0)
{
if (m_deviceClient != null)
string msgListInJson = JsonConvert.SerializeObject(m_messageList);
var encodedMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(msgListInJson));
// publish
encodedMessage.Properties.Add("content-type", "application/opcua+uajson");
encodedMessage.Properties.Add("deviceName", m_applicationName);
try
{
await m_deviceClient.SendEventAsync(encodedMessage);
if (m_deviceClient != null)
{
await m_deviceClient.SendEventAsync(encodedMessage);
}
}
catch (Exception ex)
{
Trace("Failed to publish message, dropping...");
Trace(ex.ToString());
}
}
catch (Exception ex)
{
Trace("Failed to publish message, dropping...");
Trace(ex.ToString());
}
//Reset IoT Hub message size
m_currentSizeOfIoTHubMessageBytes = 0;
m_messageList.Clear();
//Reset IoT Hub message size
m_currentSizeOfIoTHubMessageBytes = 0;
m_messageList.Clear();
}
}
private class OpcUaMessage