Catch exceptions thrown by call to ReceiveFromAsync

This commit is contained in:
Filippo Banno 2019-09-24 18:37:04 +01:00
Родитель c51eea45d8
Коммит 0283a56c31
1 изменённых файлов: 45 добавлений и 53 удалений

Просмотреть файл

@ -83,71 +83,63 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking
}
}
private void HandleAsyncRead(Task<SocketReceiveFromResult> task)
private async void HandleAsyncRead()
{
if (task.IsFaulted)
try
{
task.Exception.Handle(e =>
while (true)
{
// If the socket has been closed, quit gracefully.
return e is ObjectDisposedException;
});
return;
}
Debug.Assert(readSegment_.Offset == 0);
var result = await socket_.ReceiveFromAsync(readSegment_, SocketFlags.None, anywhere_);
// Dispatch the message.
var result = task.Result;
Debug.Assert(readSegment_.Offset == 0);
Guid streamId;
int seqNum = -1; //< unordered
int payloadOffset;
using (var str = new MemoryStream(readSegment_.Array, readSegment_.Offset, readSegment_.Count, false))
using (var reader = new BinaryReader(str))
{
streamId = new Guid(reader.ReadBytes(16));
if (streamId != Guid.Empty)
{
seqNum = reader.ReadInt32();
}
payloadOffset = (int)str.Position;
}
bool handleMessage = true;
if (seqNum >= 0)
{
// Locking the whole map here just for the sake of deletion is sub-optimal, but
// we don't expect a high contention so it should be good enough.
lock(receiveStreams_)
{
if (receiveStreams_.TryGetValue(streamId, out ReceiveStream streamData))
Guid streamId;
int seqNum = -1; //< unordered
int payloadOffset;
using (var str = new MemoryStream(readSegment_.Array, readSegment_.Offset, readSegment_.Count, false))
using (var reader = new BinaryReader(str))
{
if (seqNum > streamData.SeqNum)
streamId = new Guid(reader.ReadBytes(16));
if (streamId != Guid.Empty)
{
streamData.SeqNum = seqNum;
streamData.LastHeard = DateTime.UtcNow;
seqNum = reader.ReadInt32();
}
else
payloadOffset = (int)str.Position;
}
bool handleMessage = true;
if (seqNum >= 0)
{
// Locking the whole map here just for the sake of deletion is sub-optimal, but
// we don't expect a high contention so it should be good enough.
lock (receiveStreams_)
{
handleMessage = false;
if (receiveStreams_.TryGetValue(streamId, out ReceiveStream streamData))
{
if (seqNum >= streamData.SeqNum)
{
streamData.SeqNum = seqNum;
streamData.LastHeard = DateTime.UtcNow;
}
else
{
handleMessage = false;
}
}
else
{
receiveStreams_.Add(streamId, new ReceiveStream { SeqNum = seqNum });
}
}
}
else
if (handleMessage)
{
receiveStreams_.Add(streamId, new ReceiveStream { SeqNum = seqNum });
Message?.Invoke(this, new UdpPeerNetworkMessage(result.RemoteEndPoint,
streamId, new ArraySegment<byte>(readSegment_.Array, payloadOffset, result.ReceivedBytes)));
}
}
}
if (handleMessage)
{
Message?.Invoke(this, new UdpPeerNetworkMessage(result.RemoteEndPoint,
streamId, new ArraySegment<byte>(readSegment_.Array, payloadOffset, result.ReceivedBytes)));
}
// Listen again.
socket_.ReceiveFromAsync(readSegment_, SocketFlags.None, anywhere_)
.ContinueWith(HandleAsyncRead);
catch (ObjectDisposedException) { /* Terminate the task. */}
}
private async void CleanupExpired(CancellationToken token)
@ -220,8 +212,8 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking
// Start the cleanup thread.
deleteExpiredTask_ = Task.Run(() => CleanupExpired(deleteExpiredCts_.Token), deleteExpiredCts_.Token);
socket_.ReceiveFromAsync(readSegment_, SocketFlags.None, anywhere_)
.ContinueWith(HandleAsyncRead);
// Start the receiving thread.
Task.Run(HandleAsyncRead);
}
private static bool IsMulticast(IPAddress address)