Added support for interfaces in GetObservable<>() as well as deep

inheritence. Previous implementation only supported the requested concrete
type and the immediate base type.
This commit is contained in:
TristanBithell 2016-08-30 14:18:59 -07:00
Родитель 5750abdda4
Коммит add121ebe5
2 изменённых файлов: 118 добавлений и 10 удалений

Просмотреть файл

@ -16,6 +16,8 @@ namespace System.Reactive
{ {
private readonly Dictionary<Type, IObserver<object>> _outputs = new Dictionary<Type, IObserver<object>>(); private readonly Dictionary<Type, IObserver<object>> _outputs = new Dictionary<Type, IObserver<object>>();
private readonly Dictionary<Type, List<Type>> _knownOutputMappings = new Dictionary<Type, List<Type>>();
public void OnCompleted() public void OnCompleted()
{ {
foreach (var output in _outputs.Values.ToArray()) foreach (var output in _outputs.Values.ToArray())
@ -32,17 +34,22 @@ namespace System.Reactive
} }
} }
public void OnNext(object value) public void OnNext(object inputObject)
{ {
IObserver<object> output; var inputObjectType = inputObject.GetType();
if (_outputs.TryGetValue(value.GetType(), out output))
if (!_knownOutputMappings.ContainsKey(inputObjectType))
{ {
output.OnNext(value); _knownOutputMappings.Add(inputObjectType, new List<Type>());
foreach (var type in GetTypes(inputObjectType).Where(type => _outputs.ContainsKey(type)))
{
_knownOutputMappings[inputObjectType].Add(type);
}
} }
if (_outputs.TryGetValue(value.GetType().BaseType, out output)) foreach (var keyType in _knownOutputMappings[inputObjectType])
{ {
output.OnNext(value); _outputs[keyType].OnNext(inputObject);
} }
} }
@ -54,16 +61,41 @@ namespace System.Reactive
public IObservable<TOutput> GetObservable<TOutput>() public IObservable<TOutput> GetObservable<TOutput>()
{ {
IObserver<object> o; IObserver<object> o;
if (!_outputs.TryGetValue(typeof (TOutput), out o)) if (!_outputs.TryGetValue(typeof(TOutput), out o))
{ {
o = new OutputSubject<TOutput>(); o = new OutputSubject<TOutput>();
_outputs.Add(typeof (TOutput), o); _outputs.Add(typeof(TOutput), o);
RefreshKnownOutputMappings(typeof(TOutput));
} }
var output = (IObservable<TOutput>) o; var output = (IObservable<TOutput>)o;
return output; return output;
} }
private List<Type> GetTypes(Type inputType)
{
var typeList = new List<Type>();
var temp = inputType;
while (temp.FullName != typeof(object).FullName)
{
typeList.Add(temp);
temp = temp.BaseType;
}
typeList.AddRange(inputType.GetInterfaces());
return typeList;
}
private void RefreshKnownOutputMappings(Type outputType)
{
foreach (var knownMappings in _knownOutputMappings)
{
if (GetTypes(knownMappings.Key).Contains(outputType) && !knownMappings.Value.Contains(outputType))
{
knownMappings.Value.Add(outputType);
}
}
}
private class OutputSubject<T> : ISubject<object, T>, IDisposable private class OutputSubject<T> : ISubject<object, T>, IDisposable
{ {
private readonly Subject<T> _subject; private readonly Subject<T> _subject;
@ -95,7 +127,7 @@ namespace System.Reactive
public void OnNext(object value) public void OnNext(object value)
{ {
_subject.OnNext((T) value); _subject.OnNext((T)value);
} }
public IDisposable Subscribe(IObserver<T> observer) public IDisposable Subscribe(IObserver<T> observer)

Просмотреть файл

@ -29,6 +29,82 @@ namespace Tests.Tx
Assert.AreEqual(2, stringObserver.Count); Assert.AreEqual(2, stringObserver.Count);
} }
[TestMethod]
public void GetObservableInheritenceTest1()
{
var itemA = new TestClassA();
var itemB = new TestClassB();
var itemC = new TestClassC();
var interfaceObserver = new CountObserver<ITestClassA>();
var itemAObserver = new CountObserver<TestClassA>();
var itemBObserver = new CountObserver<TestClassB>();
var itemCObserver = new CountObserver<TestClassC>();
var demux = new Demultiplexor();
demux.GetObservable<ITestClassA>().Subscribe(interfaceObserver);
demux.GetObservable<TestClassA>().Subscribe(itemAObserver);
demux.GetObservable<TestClassB>().Subscribe(itemBObserver);
demux.GetObservable<TestClassC>().Subscribe(itemCObserver);
demux.OnNext(itemA);
demux.OnNext(itemB);
demux.OnNext(itemC);
Assert.AreEqual(3, interfaceObserver.Count);
Assert.AreEqual(3, itemAObserver.Count);
Assert.AreEqual(2, itemBObserver.Count);
Assert.AreEqual(1, itemCObserver.Count);
}
[TestMethod]
public void TestLateGetObservableRefreshesCache()
{
var itemA = new TestClassA();
var itemB = new TestClassB();
var itemC = new TestClassC();
var interfaceObserver = new CountObserver<ITestClassA>();
var itemAObserver = new CountObserver<TestClassA>();
var itemBObserver = new CountObserver<TestClassB>();
var itemCObserver = new CountObserver<TestClassC>();
var demux = new Demultiplexor();
demux.GetObservable<TestClassA>().Subscribe(itemAObserver);
demux.GetObservable<TestClassB>().Subscribe(itemBObserver);
demux.GetObservable<TestClassC>().Subscribe(itemCObserver);
demux.OnNext(itemA);
demux.GetObservable<ITestClassA>().Subscribe(interfaceObserver);
demux.OnNext(itemB);
demux.OnNext(itemC);
Assert.AreEqual(2, interfaceObserver.Count);
Assert.AreEqual(3, itemAObserver.Count);
Assert.AreEqual(2, itemBObserver.Count);
Assert.AreEqual(1, itemCObserver.Count);
}
interface ITestClassA
{
int ValueA { get; set; }
}
class TestClassA : ITestClassA
{
public int ValueA { get; set; }
}
class TestClassB : TestClassA
{
public string ValueB { get; set; }
}
class TestClassC : TestClassB
{
public double ValueC { get; set; }
}
class CountObserver<T> : IObserver<T> class CountObserver<T> : IObserver<T>
{ {
int _count; int _count;