Always call handler added to IDiscoverySubscription.Update (#135)
Fixes #39.
This commit is contained in:
Родитель
6bcbdaede3
Коммит
03add9f910
|
@ -37,20 +37,21 @@ A resource has:
|
|||
string category = "myapp/session";
|
||||
IDiscoverySubscription sessionSubscription = agent.Subscribe(category);
|
||||
IDiscoveryResource foundSession = null;
|
||||
while (foundSession == null)
|
||||
{
|
||||
foreach (IDiscoveryResource res in sessionSubscription.Resources)
|
||||
sessionSubscription.Updated +=
|
||||
subscription =>
|
||||
{
|
||||
if (res.Attributes["environment"] == "house")
|
||||
foreach (IDiscoveryResource res in sessionSubscription.Resources)
|
||||
{
|
||||
Console.WriteLine("Discovered session at " + res.Connection);
|
||||
foundSession = res;
|
||||
break;
|
||||
if (res.Attributes["environment"] == "house")
|
||||
{
|
||||
Console.WriteLine("Discovered session at " + res.Connection);
|
||||
foundSession = res;
|
||||
|
||||
// Unsubscribe from further updates.
|
||||
sessionSubscription.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Unsubscribe from further updates.
|
||||
sessionSubscription.Dispose();
|
||||
};
|
||||
```
|
||||
|
||||
The [IDiscoveryResource](xref:Microsoft.MixedReality.Sharing.Matchmaking.IDiscoveryResource) interface gives read access to resources published and discovered. In general, the publisher of a resource can edit its attribute after publishing by calling [IDiscoveryResource.RequestEdit](xref:Microsoft.MixedReality.Sharing.Matchmaking.IDiscoveryResource.RequestEdit) and using the obtained [IDiscoveryResourceEditor](xref:Microsoft.MixedReality.Sharing.Matchmaking.IDiscoveryResourceEditor).
|
||||
|
|
|
@ -68,38 +68,35 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Example
|
|||
|
||||
// Subscribe to other participant resources.
|
||||
_discoverySubscription = _discoveryAgent.Subscribe(ParticipantCategory);
|
||||
Action<IDiscoverySubscription> onUpdateCallback = (IDiscoverySubscription subscription) =>
|
||||
{
|
||||
// Parse discovered resources.
|
||||
var activePeers = new Dictionary<IPAddress, string>();
|
||||
foreach (var res in subscription.Resources)
|
||||
_discoverySubscription.Updated +=
|
||||
(IDiscoverySubscription subscription) =>
|
||||
{
|
||||
if (res.Connection == _localAddrString)
|
||||
// Parse discovered resources.
|
||||
var activePeers = new Dictionary<IPAddress, string>();
|
||||
foreach (var res in subscription.Resources)
|
||||
{
|
||||
// Exclude the local resource.
|
||||
continue;
|
||||
if (res.Connection == _localAddrString)
|
||||
{
|
||||
// Exclude the local resource.
|
||||
continue;
|
||||
}
|
||||
try
|
||||
{
|
||||
var address = IPAddress.Parse(res.Connection);
|
||||
var name = res.Attributes[NameKey];
|
||||
activePeers.Add(address, name);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// Invalid resource format, or multiple resources per host.
|
||||
Debug.WriteLine($"Invalid resource: {e}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
try
|
||||
{
|
||||
var address = IPAddress.Parse(res.Connection);
|
||||
var name = res.Attributes[NameKey];
|
||||
activePeers.Add(address, name);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// Invalid resource format, or multiple resources per host.
|
||||
Debug.WriteLine($"Invalid resource: {e}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Create reader connections to the active peers.
|
||||
RefreshReaderConnections(activePeers);
|
||||
};
|
||||
_discoverySubscription.Updated += onUpdateCallback;
|
||||
|
||||
// Initialize the readers list.
|
||||
onUpdateCallback(_discoverySubscription);
|
||||
// Create reader connections to the active peers.
|
||||
RefreshReaderConnections(activePeers);
|
||||
};
|
||||
|
||||
// Loop waiting for input.
|
||||
Console.CursorTop = Console.WindowHeight;
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
|
@ -575,6 +576,17 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking
|
|||
CancellationTokenSource updateCts_ = new CancellationTokenSource();
|
||||
AutoResetEvent updateAvailable_ = new AutoResetEvent(false);
|
||||
|
||||
// Handler for IDiscoverySubscription.Updated
|
||||
private class NewHandler
|
||||
{
|
||||
public CategoryInfo Category;
|
||||
public IDiscoverySubscription Subscription;
|
||||
public Action<IDiscoverySubscription> Handler;
|
||||
}
|
||||
|
||||
// Handlers added since the last update.
|
||||
private ConcurrentQueue<NewHandler> newHandlers_ = new ConcurrentQueue<NewHandler>();
|
||||
|
||||
internal Client(IPeerDiscoveryTransport net)
|
||||
{
|
||||
proto_ = new Proto(net);
|
||||
|
@ -588,15 +600,31 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking
|
|||
{
|
||||
var handles = new WaitHandle[] { token.WaitHandle, updateAvailable_ };
|
||||
var updatedSubscriptions = new List<DiscoverySubscription>();
|
||||
var currNewHandlers = new List<NewHandler>();
|
||||
while (true)
|
||||
{
|
||||
// Wait for either update or cancellation.
|
||||
WaitHandle.WaitAny(handles);
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
// There has been an update, collect the dirty tasks.
|
||||
int currNewHandlersNum = newHandlers_.Count;
|
||||
currNewHandlers.Capacity = Math.Max(currNewHandlers.Capacity, currNewHandlersNum);
|
||||
|
||||
lock (this)
|
||||
{
|
||||
// Invoke any newly added handler, even if resources haven't changed.
|
||||
// Run under lock because it reads some CategoryInfos.
|
||||
for (int i = 0; i < currNewHandlersNum; ++i)
|
||||
{
|
||||
newHandlers_.TryDequeue(out NewHandler categoryAndHandler);
|
||||
// Prevent duplicate invocations by skipping the handler if its category is already dirty.
|
||||
if (!categoryAndHandler.Category.IsDirty)
|
||||
{
|
||||
currNewHandlers.Add(categoryAndHandler);
|
||||
}
|
||||
}
|
||||
|
||||
// Collect the subscriptions whose rooms have changed.
|
||||
foreach (var info in infoFromCategory_.Values)
|
||||
{
|
||||
if (info.IsDirty)
|
||||
|
@ -609,6 +637,10 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking
|
|||
// Outside the lock.
|
||||
try
|
||||
{
|
||||
foreach (var handler in currNewHandlers)
|
||||
{
|
||||
handler.Handler(handler.Subscription);
|
||||
}
|
||||
foreach (var t in updatedSubscriptions)
|
||||
{
|
||||
t.FireUpdated();
|
||||
|
@ -618,6 +650,8 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking
|
|||
{
|
||||
Log.Error(e, "Error while firing update");
|
||||
}
|
||||
|
||||
currNewHandlers.Clear();
|
||||
updatedSubscriptions.Clear();
|
||||
}
|
||||
}, token).ContinueWith(_ =>
|
||||
|
@ -755,12 +789,29 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking
|
|||
}
|
||||
}
|
||||
|
||||
public event Action<IDiscoverySubscription> Updated;
|
||||
public event Action<IDiscoverySubscription> Updated
|
||||
{
|
||||
add
|
||||
{
|
||||
updated_ += value;
|
||||
|
||||
// Add the new handler to the pending queue.
|
||||
client_.newHandlers_.Enqueue(new NewHandler { Category = info_, Subscription = this, Handler = value });
|
||||
// Raise the event so that the handler is called.
|
||||
client_.updateAvailable_.Set();
|
||||
}
|
||||
remove
|
||||
{
|
||||
updated_ -= value;
|
||||
}
|
||||
}
|
||||
private Action<IDiscoverySubscription> updated_;
|
||||
|
||||
public event Action<IDiscoverySubscription> Disposed;
|
||||
|
||||
public void FireUpdated()
|
||||
{
|
||||
Updated?.Invoke(this);
|
||||
updated_?.Invoke(this);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
|
|
|
@ -136,6 +136,41 @@ namespace Microsoft.MixedReality.Sharing.Matchmaking.Test
|
|||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void UpdateEventIsCalled()
|
||||
{
|
||||
using (var cts = new CancellationTokenSource(Utils.TestTimeoutMs))
|
||||
using (var svc1 = MakeAgent(1))
|
||||
using (var svc2 = MakeAgent(2))
|
||||
{
|
||||
const string category = "FindResourcesFromAnnouncement";
|
||||
|
||||
using (var task1 = svc1.Subscribe(category))
|
||||
using (var task2 = svc2.Subscribe(category))
|
||||
{
|
||||
Assert.Empty(task1.Resources);
|
||||
Assert.Empty(task2.Resources);
|
||||
|
||||
var updateCalled = new AutoResetEvent(false);
|
||||
|
||||
var subscription = svc2.Subscribe(category);
|
||||
subscription.Updated +=
|
||||
s =>
|
||||
{
|
||||
updateCalled.Set();
|
||||
};
|
||||
|
||||
// Updated is called even if there are no resources.
|
||||
WaitWithCancellation(updateCalled, cts.Token);
|
||||
|
||||
var resource1 = svc1.PublishAsync(category, "foo1", null, cts.Token).Result;
|
||||
|
||||
// Update is called at least once after resources change.
|
||||
WaitWithCancellation(updateCalled, cts.Token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AgentShutdownRemovesResources()
|
||||
{
|
||||
|
|
Загрузка…
Ссылка в новой задаче