Refactored transport tests into their own class

Reduced a bit the number of equivalent tests run.
This commit is contained in:
Filippo Banno 2020-01-24 16:22:37 +00:00 коммит произвёл Filippo Bannò
Родитель 12621d553d
Коммит 3df626f098
4 изменённых файлов: 437 добавлений и 210 удалений

Просмотреть файл

@ -0,0 +1,206 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
{
// Utility for making multiple instances of IPeerDiscoveryTransport communicating with each other.
internal interface ITransportBuilder
{
IPeerDiscoveryTransport MakeTransport(int userIndex);
bool SimulatesPacketLoss { get; }
}
internal class MemoryTransportBuilder : ITransportBuilder
{
public IPeerDiscoveryTransport MakeTransport(int userIndex)
{
return new MemoryPeerDiscoveryTransport(userIndex);
}
public bool SimulatesPacketLoss => false;
}
internal class UdpTransportBuilder : ITransportBuilder
{
private readonly ushort port_ = Utils.NewPortNumber.Calculate();
public IPeerDiscoveryTransport MakeTransport(int userIndex)
{
return new UdpPeerDiscoveryTransport(new IPAddress(0xffffff7f), port_, new IPAddress(0x0000007f + (userIndex << 24)));
}
public bool SimulatesPacketLoss => false;
}
internal class UdpMulticastTransportBuilder : ITransportBuilder
{
private readonly ushort port_ = Utils.NewPortNumber.Calculate();
public IPeerDiscoveryTransport MakeTransport(int userIndex)
{
return new UdpPeerDiscoveryTransport(new IPAddress(0x000000e0), port_, new IPAddress(0x0000007f + (userIndex << 24)));
}
public bool SimulatesPacketLoss => false;
}
internal class UdpReorderedTransportBuilder : ITransportBuilder, IDisposable
{
public const int MaxDelayMs = 25;
public const int MaxRetries = 3;
private readonly Socket relay_;
private readonly ushort port_ = Utils.NewPortNumber.Calculate();
private readonly List<IPEndPoint> recipients_ = new List<IPEndPoint>();
private readonly Random random_;
// Wraps a packet for use in a map.
private class Packet
{
public IPEndPoint EndPoint;
public byte[] Contents;
public Packet(IPEndPoint endPoint, ArraySegment<byte> contents)
{
EndPoint = endPoint;
Contents = new byte[contents.Count];
for (int i = 0; i < contents.Count; ++i)
{
Contents[i] = contents[i];
}
}
public override bool Equals(object other)
{
if (other is Packet rhs)
{
return EndPoint.Equals(rhs.EndPoint) && Contents.SequenceEqual(rhs.Contents);
}
return false;
}
public override int GetHashCode()
{
// Not a great hash but it shouldn't matter in this case.
var result = 0;
foreach (byte b in Contents)
{
result = (result * 31) ^ b;
}
return EndPoint.GetHashCode() ^ result;
}
}
// Keeps track of each packet that goes through the relay and counts its repetitions.
// See receive loop for usage.
private readonly Dictionary<Packet, int> packetCounters;
public bool SimulatesPacketLoss => (packetCounters != null);
public IPeerDiscoveryTransport MakeTransport(int userIndex)
{
// Peers all send packets to the relay.
var address = new IPAddress(0x0000007f + (userIndex << 24));
lock (recipients_)
{
var endpoint = new IPEndPoint(address, port_);
// Remove first so the same agent can be re-created multiple times
recipients_.Remove(endpoint);
recipients_.Add(endpoint);
}
return new UdpPeerDiscoveryTransport(new IPAddress(0xfeffff7f), port_, address,
new UdpPeerDiscoveryTransport.Options { MaxRetries = MaxRetries, MaxRetryDelayMs = 100 });
}
public UdpReorderedTransportBuilder(Random random, bool packetLoss)
{
random_ = random;
relay_ = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
relay_.Bind(new IPEndPoint(new IPAddress(0xfeffff7f), port_));
// Disable exception on UDP connection reset (don't care).
uint IOC_IN = 0x80000000;
uint IOC_VENDOR = 0x18000000;
uint SIO_UDP_CONNRESET = IOC_IN | IOC_VENDOR | 12;
relay_.IOControl((int)SIO_UDP_CONNRESET, new byte[] { Convert.ToByte(false) }, null);
if (packetLoss)
{
packetCounters = new Dictionary<Packet, int>();
}
Task.Run(async () =>
{
try
{
while (true)
{
byte[] buf_ = new byte[1024];
var result = await relay_.ReceiveFromAsync(new ArraySegment<byte>(buf_), SocketFlags.None, new IPEndPoint(IPAddress.Any, 0));
if (packetCounters != null)
{
// Increase the probability of delivery of a packet with retries, up to 100% on the last retry.
// This simulates heavy packet loss while still guaranteeing that everything works.
// Note that this logic is very naive, but should be fine for small tests.
var packet = new Packet((IPEndPoint)result.RemoteEndPoint, new ArraySegment<byte>(buf_, 0, result.ReceivedBytes));
if (!packetCounters.TryGetValue(packet, out int counter))
{
counter = 0;
}
if (counter == MaxRetries - 1)
{
// Last retry, always send and forget the packet.
packetCounters.Remove(packet);
}
else
{
packetCounters[packet] = counter + 1;
// Drop with decreasing probability.
if (random_.Next(0, MaxRetries) > counter)
{
continue;
}
}
}
// The relay sends the packets to all peers with a random delay.
IPEndPoint[] curRecipients;
lock (recipients_)
{
curRecipients = recipients_.ToArray();
}
foreach (var rec in curRecipients)
{
var delay = random_.Next(MaxDelayMs);
_ = Task.Delay(delay).ContinueWith(t =>
{
try
{
relay_.SendToAsync(new ArraySegment<byte>(buf_, 0, result.ReceivedBytes), SocketFlags.None, rec);
}
catch (ObjectDisposedException) { }
catch (SocketException e) when (e.SocketErrorCode == SocketError.NotSocket) { }
});
}
}
}
catch (ObjectDisposedException) { }
catch (SocketException e) when (e.SocketErrorCode == SocketError.NotSocket) { }
});
}
public void Dispose()
{
relay_.Dispose();
}
}
}

Просмотреть файл

@ -4,10 +4,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Net.Sockets;
using System.Reflection.Metadata;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
@ -17,13 +15,13 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
{
public abstract class PeerDiscoveryTest
{
protected Func<int, IDiscoveryAgent> discoveryAgentFactory_;
private readonly ITestOutputHelper output_;
internal ITransportBuilder transportBuilder_;
protected readonly ITestOutputHelper output_;
protected readonly Random random_;
protected PeerDiscoveryTest(Func<int, IDiscoveryAgent> factory, ITestOutputHelper output)
internal PeerDiscoveryTest(ITransportBuilder transportBuilder, ITestOutputHelper output)
{
discoveryAgentFactory_ = factory;
transportBuilder_ = transportBuilder;
output_ = output;
var seed = new Random().Next();
@ -31,13 +29,17 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
random_ = new Random(seed);
}
protected abstract bool SimulatesPacketLoss { get; }
protected virtual IDiscoveryAgent MakeAgent(int userIndex)
{
var transport = transportBuilder_.MakeTransport(userIndex);
return new PeerDiscoveryAgent(transport, new PeerDiscoveryAgent.Options { ResourceExpirySec = int.MaxValue });
}
[Fact]
public void CreateResource()
{
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var svc1 = discoveryAgentFactory_(1))
using (var svc1 = MakeAgent(1))
{
var resource1 = svc1.PublishAsync("CreateResource", "http://resource1", null, cts.Token).Result;
@ -57,8 +59,8 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
public void FindResourcesLocalAndRemote()
{
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var svc1 = discoveryAgentFactory_(1))
using (var svc2 = discoveryAgentFactory_(2))
using (var svc1 = MakeAgent(1))
using (var svc2 = MakeAgent(2))
{
// Create some resources in the first one
const string category = "FindResourcesLocalAndRemote";
@ -109,8 +111,8 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
// start discovery, then start services afterwards
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var svc1 = discoveryAgentFactory_(1))
using (var svc2 = discoveryAgentFactory_(2))
using (var svc1 = MakeAgent(1))
using (var svc2 = MakeAgent(2))
{
const string category = "FindResourcesFromAnnouncement";
@ -137,7 +139,7 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
[Fact]
public void AgentShutdownRemovesResources()
{
if (SimulatesPacketLoss)
if (transportBuilder_.SimulatesPacketLoss)
{
// Shutdown message might not be retried and its loss will make this test fail. Skip it.
return;
@ -148,7 +150,7 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
// start discovery, then start agents afterwards
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var svc1 = discoveryAgentFactory_(1))
using (var svc1 = MakeAgent(1))
using (var resources1 = svc1.Subscribe(category1))
using (var resources2 = svc1.Subscribe(category2))
{
@ -156,8 +158,8 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
// These are disposed manually, but keep in a using block so that they are disposed even
// if the test throws.
using (var svc2 = discoveryAgentFactory_(2))
using (var svc3 = discoveryAgentFactory_(3))
using (var svc2 = MakeAgent(2))
using (var svc3 = MakeAgent(3))
{
// Create resources from svc2 and svc3
var resource2_1 = svc2.PublishAsync(category1, "conn1", null, cts.Token).Result;
@ -199,13 +201,13 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
public void CanEditResourceAttributes()
{
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var svc1 = discoveryAgentFactory_(1))
using (var svc1 = MakeAgent(1))
{
const string category = "CanEditResourceAttributes";
var resources1 = svc1.Subscribe(category);
Assert.Empty(resources1.Resources);
using (var svc2 = discoveryAgentFactory_(2))
using (var svc2 = MakeAgent(2))
{
// Create resources from svc2
var origAttrs = new Dictionary<string, string> { { "keyA", "valA" }, { "keyB", "valB" } };
@ -269,11 +271,11 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
var delayedException = new Utils.DelayedException();
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var publishingAgent = discoveryAgentFactory_(1))
using (var publishingAgent = MakeAgent(1))
{
publishingAgent.PublishAsync(category1, "", null, cts.Token).Wait();
publishingAgent.PublishAsync(category2, "", null, cts.Token).Wait();
var agent = discoveryAgentFactory_(2);
var agent = MakeAgent(2);
{
bool subIsDisposed = false;
@ -351,8 +353,8 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
{
var delayedException = new Utils.DelayedException();
using (var svc1 = discoveryAgentFactory_(1))
using (var svc2 = discoveryAgentFactory_(2))
using (var svc1 = MakeAgent(1))
using (var svc2 = MakeAgent(2))
{
for (int i = 0; i < 1000; ++i)
{
@ -392,195 +394,26 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
public class PeerDiscoveryTestUdp : PeerDiscoveryTest
{
static private IDiscoveryAgent MakeDiscoveryAgent(int userIndex)
{
var net = new UdpPeerDiscoveryTransport(new IPAddress(0xffffff7f), 45277, new IPAddress(0x0000007f + (userIndex << 24)));
return new PeerDiscoveryAgent(net, new PeerDiscoveryAgent.Options { ResourceExpirySec = int.MaxValue });
}
public PeerDiscoveryTestUdp(ITestOutputHelper output) : base(MakeDiscoveryAgent, output) { }
protected override bool SimulatesPacketLoss => false;
}
public class PeerDiscoveryTestUdpMulticast : PeerDiscoveryTest
{
static private IDiscoveryAgent MakeDiscoveryAgent(int userIndex)
{
var net = new UdpPeerDiscoveryTransport(new IPAddress(0x000000e0), 45278, new IPAddress(0x0000007f + (userIndex << 24)));
return new PeerDiscoveryAgent(net, new PeerDiscoveryAgent.Options { ResourceExpirySec = int.MaxValue });
}
public PeerDiscoveryTestUdpMulticast(ITestOutputHelper output) : base(MakeDiscoveryAgent, output) { }
protected override bool SimulatesPacketLoss => false;
public PeerDiscoveryTestUdp(ITestOutputHelper output) : base(new UdpTransportBuilder(), output) { }
}
public class PeerDiscoveryTestMemory : PeerDiscoveryTest
{
static private IDiscoveryAgent MakeDiscoveryAgent(int userIndex)
{
var net = new MemoryPeerDiscoveryTransport(userIndex);
return new PeerDiscoveryAgent(net, new PeerDiscoveryAgent.Options { ResourceExpirySec = int.MaxValue });
}
public PeerDiscoveryTestMemory(ITestOutputHelper output) : base(MakeDiscoveryAgent, output) { }
protected override bool SimulatesPacketLoss => false;
public PeerDiscoveryTestMemory(ITestOutputHelper output) : base(new MemoryTransportBuilder(), output) { }
}
// Uses relay socket to reorder packets delivery.
public abstract class PeerDiscoveryTestReordered : PeerDiscoveryTest, IDisposable
public class PeerDiscoveryTestUdpUnreliable : PeerDiscoveryTest, IDisposable
{
private const int MaxDelayMs = 25;
private const int MaxRetries = 3;
private readonly Socket relay_;
private readonly ushort port_;
private readonly List<IPEndPoint> recipients_ = new List<IPEndPoint>();
// Wraps a packet for use in a map.
private class Packet
public PeerDiscoveryTestUdpUnreliable(ITestOutputHelper output)
: base(null, output)
{
public IPEndPoint EndPoint;
public byte[] Contents;
public Packet(IPEndPoint endPoint, ArraySegment<byte> contents)
{
EndPoint = endPoint;
Contents = new byte[contents.Count];
for (int i = 0; i < contents.Count; ++i)
{
Contents[i] = contents[i];
}
}
public override bool Equals(object other)
{
if (other is Packet rhs)
{
return EndPoint.Equals(rhs.EndPoint) && Contents.SequenceEqual(rhs.Contents);
}
return false;
}
public override int GetHashCode()
{
// Not a great hash but it shouldn't matter in this case.
var result = 0;
foreach (byte b in Contents)
{
result = (result * 31) ^ b;
}
return EndPoint.GetHashCode() ^ result;
}
}
// Keeps track of each packet that goes through the relay and counts its repetitions.
// See receive loop for usage.
private readonly Dictionary<Packet, int> packetCounters;
protected override bool SimulatesPacketLoss => (packetCounters != null);
private IDiscoveryAgent MakeDiscoveryAgent(int userIndex)
{
// Peers all send packets to the relay.
var address = new IPAddress(0x0000007f + (userIndex << 24));
var net = new UdpPeerDiscoveryTransport(new IPAddress(0xfeffff7f), port_, address,
new UdpPeerDiscoveryTransport.Options { MaxRetries = MaxRetries, MaxRetryDelayMs = 100 });
lock (recipients_)
{
var endpoint = new IPEndPoint(address, port_);
// Remove first so the same agent can be re-created multiple times
recipients_.Remove(endpoint);
recipients_.Add(endpoint);
}
return new PeerDiscoveryAgent(net, new PeerDiscoveryAgent.Options { ResourceExpirySec = int.MaxValue });
}
public PeerDiscoveryTestReordered(ITestOutputHelper output, bool packetLoss, ushort port) : base(null, output)
{
discoveryAgentFactory_ = MakeDiscoveryAgent;
port_ = port;
relay_ = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
relay_.Bind(new IPEndPoint(new IPAddress(0xfeffff7f), port_));
// Disable exception on UDP connection reset (don't care).
uint IOC_IN = 0x80000000;
uint IOC_VENDOR = 0x18000000;
uint SIO_UDP_CONNRESET = IOC_IN | IOC_VENDOR | 12;
relay_.IOControl((int)SIO_UDP_CONNRESET, new byte[] { Convert.ToByte(false) }, null);
if (packetLoss)
{
packetCounters = new Dictionary<Packet, int>();
}
Task.Run(async () =>
{
try
{
while (true)
{
byte[] buf_ = new byte[1024];
var result = await relay_.ReceiveFromAsync(new ArraySegment<byte>(buf_), SocketFlags.None, new IPEndPoint(IPAddress.Any, 0));
if (packetCounters != null)
{
// Increase the probability of delivery of a packet with retries, up to 100% on the last retry.
// This simulates heavy packet loss while still guaranteeing that everything works.
// Note that this logic is very naive, but should be fine for small tests.
var packet = new Packet((IPEndPoint)result.RemoteEndPoint, new ArraySegment<byte>(buf_, 0, result.ReceivedBytes));
if (!packetCounters.TryGetValue(packet, out int counter))
{
counter = 0;
}
if (counter == MaxRetries - 1)
{
// Last retry, always send and forget the packet.
packetCounters.Remove(packet);
}
else
{
packetCounters[packet] = counter + 1;
// Drop with decreasing probability.
if (random_.Next(0, MaxRetries) > counter)
{
continue;
}
}
}
// The relay sends the packets to all peers with a random delay.
IPEndPoint[] curRecipients;
lock (recipients_)
{
curRecipients = recipients_.ToArray();
}
foreach (var rec in curRecipients)
{
var delay = random_.Next(MaxDelayMs);
_ = Task.Delay(delay).ContinueWith(t =>
{
try
{
relay_.SendToAsync(new ArraySegment<byte>(buf_, 0, result.ReceivedBytes), SocketFlags.None, rec);
}
catch (ObjectDisposedException) { }
catch (SocketException e) when (e.SocketErrorCode == SocketError.NotSocket) { }
});
}
}
}
catch (ObjectDisposedException) { }
catch (SocketException e) when (e.SocketErrorCode == SocketError.NotSocket) { }
});
transportBuilder_ = new UdpReorderedTransportBuilder(random_, packetLoss: true);
}
public void Dispose()
{
relay_.Dispose();
((IDisposable)transportBuilder_).Dispose();
}
[Fact]
@ -588,9 +421,9 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
{
const string category = "AttributeEditsAreInOrder";
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var svc1 = discoveryAgentFactory_(1))
using (var svc1 = MakeAgent(1))
using (var discovery = svc1.Subscribe(category))
using (var svc2 = discoveryAgentFactory_(2))
using (var svc2 = MakeAgent(2))
{
// Create resource from svc2
var origAttrs = new Dictionary<string, string> { { "value", "0" } };
@ -615,7 +448,7 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
}
// Give some time to the messages to reach the peers.
Task.Delay(MaxDelayMs).Wait();
Task.Delay(UdpReorderedTransportBuilder.MaxDelayMs).Wait();
// Edits should show up in svc1 in order
{
@ -650,19 +483,34 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
}
}
public class PeerDiscoveryTestReorderedReliable : PeerDiscoveryTestReordered
// This test needs reordering but does not work with packet loss, so we use a separate class.
public class PeerDiscoveryTestReorderedReliable
{
public PeerDiscoveryTestReorderedReliable(ITestOutputHelper output) : base(output, packetLoss: false, 45279) { }
private readonly ITransportBuilder transportBuilder_;
public PeerDiscoveryTestReorderedReliable(ITestOutputHelper output)
{
var seed = new Random().Next();
output.WriteLine($"Seed for lossy network: {seed}");
var random = new Random(seed);
transportBuilder_ = new UdpReorderedTransportBuilder(random, packetLoss: false);
}
protected virtual IDiscoveryAgent MakeAgent(int userIndex)
{
var transport = transportBuilder_.MakeTransport(userIndex);
return new PeerDiscoveryAgent(transport, new PeerDiscoveryAgent.Options { ResourceExpirySec = int.MaxValue });
}
[Fact]
public void NoAnnouncementsAfterDispose()
{
const string category = "NoAnnouncementsAfterDispose";
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
using (var svc1 = discoveryAgentFactory_(1))
using (var svc1 = MakeAgent(1))
using (var discovery = svc1.Subscribe(category))
{
using (var svc2 = discoveryAgentFactory_(2))
using (var svc2 = MakeAgent(2))
{
// Create a lot of resources.
var tasks = new Task<IDiscoveryResource>[100];
@ -685,8 +533,4 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
}
}
}
public class PeerDiscoveryTestReorderedUnreliable : PeerDiscoveryTestReordered
{
public PeerDiscoveryTestReorderedUnreliable(ITestOutputHelper output) : base(output, packetLoss: true, 45280) { }
}
}

Просмотреть файл

@ -0,0 +1,171 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
{
public class TransportTest
{
private readonly Random random_;
public TransportTest(ITestOutputHelper output)
{
var seed = new Random().Next();
output.WriteLine($"Seed for lossy network: {seed}");
random_ = new Random(seed);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void TestMemory(bool useMultiThread) { SendReceive(new MemoryTransportBuilder(), useMultiThread); }
[Theory]
[InlineData(true)]
[InlineData(false)]
public void TestUdp(bool useMultiThread) { SendReceive(new UdpTransportBuilder(), useMultiThread); }
[Theory]
[InlineData(true)]
[InlineData(false)]
public void TestUdpMulticast(bool useMultiThread) { SendReceive(new UdpMulticastTransportBuilder(), useMultiThread); }
[Theory]
[InlineData(true)]
[InlineData(false)]
public void TestUdpReordered(bool useMultiThread)
{
SendReceive(new UdpReorderedTransportBuilder(random_, false), useMultiThread);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void TestUdpReorderedUnreliable(bool useMultiThread)
{
SendReceive(new UdpReorderedTransportBuilder(random_, true), useMultiThread);
}
private void SendReceive(ITransportBuilder transportBuilder, bool useMultiThread)
{
using(var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
{
const int lastId = 50;
const int guidsPerTransport = 3;
// Make a few transports in the same broadcast domain.
var transports = new IPeerDiscoveryTransport[5];
// Associate a few stream IDs to each transport.
int streamsNum = guidsPerTransport * transports.Length;
var streams = new Guid[streamsNum];
// Keep track of the messages received by each transport.
var lastIdFromGuid = new Dictionary<Guid, int>[transports.Length];
var lastMessageReceived = new Dictionary<Guid, ManualResetEvent>[transports.Length];
for (int transportIdx = 0; transportIdx < transports.Length; ++transportIdx)
{
// Initialize transport and associated data.
lastMessageReceived[transportIdx] = new Dictionary<Guid, ManualResetEvent>();
lastIdFromGuid[transportIdx] = new Dictionary<Guid, int>();
for (int guidIdx = 0; guidIdx < guidsPerTransport; ++guidIdx)
{
var streamId = Guid.NewGuid();
streams[transportIdx * guidsPerTransport + guidIdx] = streamId;
lastMessageReceived[transportIdx][streamId] = new ManualResetEvent(false);
}
transports[transportIdx] = transportBuilder.MakeTransport(transportIdx + 1);
// Handle received messages.
// Note: must copy into local variable or the lambda will capture the wrong value.
int captureTransportIdx = transportIdx;
transports[transportIdx].Message +=
(IPeerDiscoveryTransport _, IPeerDiscoveryMessage message) =>
{
// Check that messages are received in send order.
lastIdFromGuid[captureTransportIdx].TryGetValue(message.StreamId, out int lastReceivedId);
int currentId = BitConverter.ToInt32(message.Contents);
Assert.True(currentId > lastReceivedId);
// Notify that the last message hasn't been dropped.
if (currentId == lastId)
{
lastMessageReceived[captureTransportIdx][message.StreamId].Set();
}
};
transports[transportIdx].Start();
}
try
{
// Send a sequence of messages from every transport for every stream.
var sendTasks = new List<Task>();
for (int transportIdx = 0; transportIdx < transports.Length; ++transportIdx)
{
for (int guidIdx = 0; guidIdx < guidsPerTransport; ++guidIdx)
{
var guid = streams[transportIdx * guidsPerTransport + guidIdx];
var transport = transports[transportIdx];
Action sendMessages = () =>
{
for (int msgIdx = 1; msgIdx <= lastId; ++msgIdx)
{
byte[] bytes = BitConverter.GetBytes(msgIdx);
transport.Broadcast(guid, bytes);
}
};
if (useMultiThread)
{
sendTasks.Add(Task.Run(sendMessages));
}
else
{
sendMessages();
}
}
}
// Wait until all messages have been sent.
Task.WaitAll(sendTasks.ToArray(), cts.Token);
// The last message has been received for every stream by every transport.
var allHandles = new List<WaitHandle>(streamsNum);
foreach (var transportEvents in lastMessageReceived)
{
foreach(var ev in transportEvents.Values)
{
allHandles.Add(ev);
}
}
// Workaround to WaitHandle.WaitAll not taking a CancellationToken.
Task.Run(() => WaitHandle.WaitAll(allHandles.ToArray())).Wait(cts.Token);
}
finally
{
Exception anyException = null;
foreach (var transport in transports)
{
try
{
transport.Stop();
}
catch (Exception e)
{
anyException = e;
}
}
if (anyException != null)
{
throw anyException;
}
}
}
}
}
}

Просмотреть файл

@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using Microsoft.MixedReality.Sharing.Matchmaking;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -45,6 +44,13 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
}
}
public static class NewPortNumber
{
private static volatile ushort current_ = 40000;
public static ushort Calculate() { return ++current_; }
}
// Run a query and wait for the predicate to be satisfied.
// Return the list of resources which satisfied the predicate or null if canceled before the predicate was satisfied.
public static IEnumerable<IDiscoveryResource> QueryAndWaitForResourcesPredicate(