catch exceptions in event hub trigger if orchestration service has not started

This commit is contained in:
Sebastian Burckhardt 2021-01-26 10:02:40 -08:00
Родитель bf793347f5
Коммит 5b825a265f
2 изменённых файлов: 62 добавлений и 37 удалений

Просмотреть файл

@ -3,6 +3,7 @@
namespace EventConsumer
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
@ -20,48 +21,66 @@ namespace EventConsumer
[DurableClient] IDurableClient client,
ILogger log)
{
if (TestConstants.BatchSignals)
int numTries = 0;
while (numTries < 10)
{
// we pack all the events into a byte sequence, and then
// send those bytes as a signals to the entity
numTries++;
var stream = new MemoryStream();
using var streamWriter = new BinaryWriter(stream);
for (int i = 0; i < eventDataSet.Length; i++)
try
{
var eventData = eventDataSet[i];
var sp = eventData.SystemProperties;
streamWriter.Write((int) eventData.Body[0]);
streamWriter.Write(eventData.SystemProperties.SequenceNumber);
streamWriter.Write(eventData.Body.Count);
streamWriter.Write(eventData.Body);
if (TestConstants.BatchSignals)
{
// we pack all the events into a byte sequence, and then
// send those bytes as a signals to the entity
var stream = new MemoryStream();
using var streamWriter = new BinaryWriter(stream);
for (int i = 0; i < eventDataSet.Length; i++)
{
var eventData = eventDataSet[i];
var sp = eventData.SystemProperties;
streamWriter.Write((int)eventData.Body[0]);
streamWriter.Write(eventData.SystemProperties.SequenceNumber);
streamWriter.Write(eventData.Body.Count);
streamWriter.Write(eventData.Body);
}
streamWriter.Flush();
await client.SignalEntityAsync(new EntityId(nameof(ReceiverEntity), "0"), nameof(ReceiverEntity.ReceiveBatch), stream.ToArray());
}
else
{
var signalTasks = new List<Task>();
// send one signal for each packet
for (int i = 0; i < eventDataSet.Length; i++)
{
var eventData = eventDataSet[i];
var sp = eventData.SystemProperties;
byte[] payload = eventData.Body.ToArray();
var evt = new Event()
{
Partition = payload[0],
SeqNo = eventData.SystemProperties.SequenceNumber,
Payload = payload
};
log.LogInformation($"Sending signal for {evt}");
signalTasks.Add(client.SignalEntityAsync(new EntityId(nameof(ReceiverEntity), "0"), nameof(ReceiverEntity.Receive), evt));
}
await Task.WhenAll(signalTasks);
}
break;
}
streamWriter.Flush();
await client.SignalEntityAsync(new EntityId(nameof(ReceiverEntity), "0"), nameof(ReceiverEntity.ReceiveBatch), stream.ToArray());
}
else
{
var signalTasks = new List<Task>();
// send one signal for each packet
for (int i = 0; i < eventDataSet.Length; i++)
catch (NullReferenceException)
{
var eventData = eventDataSet[i];
var sp = eventData.SystemProperties;
byte[] payload = eventData.Body.ToArray();
var evt = new Event()
{
Partition = payload[0],
SeqNo = eventData.SystemProperties.SequenceNumber,
Payload = payload
};
log.LogInformation($"Sending signal for {evt}");
signalTasks.Add(client.SignalEntityAsync(new EntityId(nameof(ReceiverEntity), "0"), nameof(ReceiverEntity.Receive), evt));
// during startup we may get these exceptions if the trigger goes off before the
// Orchestration service has been started
await Task.Delay(TimeSpan.FromSeconds(1));
}
await Task.WhenAll(signalTasks);
}
}
}

Просмотреть файл

@ -20,7 +20,13 @@
"UseGracefulShutdown": "true",
"storageProvider": {
"StorageConnectionString": "$AzureWebJobsStorage",
"EventHubsConnectionString": "$EventHubsConnection"
"EventHubsConnectionString": "$EventHubsConnection",
"LogLevelLimit": "Information",
"StorageLogLevelLimit": "Debug",
"TransportLogLevelLimit": "Information",
"EventLogLevelLimit": "Information",
"WorkItemLogLevelLimit": "Information",
"TraceToBlob": "true"
}
}
}