Merge pull request #48 from Microsoft/DisposeNullReference

Addressing a shortcoming in how ingress methods dispose when manually…
This commit is contained in:
James Terwilliger 2019-02-05 12:37:59 -08:00 коммит произвёл GitHub
Родитель 9c80eec219 be5871f356
Коммит 492a855f7b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 66 добавлений и 37 удалений

Просмотреть файл

@ -90,6 +90,8 @@ namespace Microsoft.StreamProcessing
this.timer.Dispose();
this.timer = null;
}
base.DisposeState();
}
}

Просмотреть файл

@ -133,6 +133,8 @@ namespace Microsoft.StreamProcessing
this.timer.Dispose();
this.timer = null;
}
base.DisposeState();
}
}

Просмотреть файл

@ -100,11 +100,6 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
public abstract class DisorderedSubscriptionBase<TIngressStructure, TPayload, TResult> : Pipe<Empty, TResult>, IIngressStreamObserver
{
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
protected IDisposable disposer;
private readonly string errorMessages;
private new readonly bool isColumnar;
@ -365,10 +360,10 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
protected override void DisposeState()
{
this.subscription?.Dispose();
this.impatienceSorter?.Dispose();
this.currentBatch?.Free();
this.currentBatch = null;
this.impatienceSorter?.Dispose();
this.disposer?.Dispose();
}
/// <summary>
@ -595,11 +590,6 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
public abstract class SubscriptionBase<TIngressStructure, TPayload, TResult> : Pipe<Empty, TResult>, IIngressStreamObserver
{
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
protected IDisposable disposer;
private readonly string errorMessages;
private new readonly bool isColumnar;
@ -860,10 +850,10 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
protected override void DisposeState()
{
this.subscription?.Dispose();
this.impatienceSorter?.Dispose();
this.currentBatch?.Free();
this.currentBatch = null;
this.impatienceSorter?.Dispose();
this.disposer?.Dispose();
}
/// <summary>
@ -1095,11 +1085,6 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
public abstract class DisorderedPartitionedSubscriptionBase<TKey, TIngressStructure, TPayload, TResult> : Pipe<PartitionKey<TKey>, TResult>, IIngressStreamObserver
{
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
protected IDisposable disposer;
private readonly string errorMessages;
private new readonly bool isColumnar;
@ -1406,10 +1391,10 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
protected override void DisposeState()
{
this.subscription?.Dispose();
this.impatienceSorter?.Dispose();
this.currentBatch?.Free();
this.currentBatch = null;
this.impatienceSorter?.Dispose();
this.disposer?.Dispose();
}
/// <summary>
@ -1641,11 +1626,6 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
public abstract class PartitionedSubscriptionBase<TKey, TIngressStructure, TPayload, TResult> : Pipe<PartitionKey<TKey>, TResult>, IIngressStreamObserver
{
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
protected IDisposable disposer;
private readonly string errorMessages;
private new readonly bool isColumnar;
@ -1952,10 +1932,10 @@ namespace Microsoft.StreamProcessing.Internal
[EditorBrowsable(EditorBrowsableState.Never)]
protected override void DisposeState()
{
this.subscription?.Dispose();
this.impatienceSorter?.Dispose();
this.currentBatch?.Free();
this.currentBatch = null;
this.impatienceSorter?.Dispose();
this.disposer?.Dispose();
}
/// <summary>

Просмотреть файл

@ -137,11 +137,6 @@ foreach (bool disordered in new[] { true, false })
[EditorBrowsable(EditorBrowsableState.Never)]
public abstract class <#= disordered ? "Disordered" : string.Empty #><#= partitionString #>SubscriptionBase<<#= genericArgument #>TIngressStructure, TPayload, TResult> : Pipe<<#= baseType #>, TResult>, IIngressStreamObserver
{
/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
protected IDisposable disposer;
private readonly string errorMessages;
private new readonly bool isColumnar;
@ -511,10 +506,10 @@ foreach (bool disordered in new[] { true, false })
[EditorBrowsable(EditorBrowsableState.Never)]
protected override void DisposeState()
{
this.subscription?.Dispose();
this.impatienceSorter?.Dispose();
this.currentBatch?.Free();
this.currentBatch = null;
this.impatienceSorter?.Dispose();
this.disposer?.Dispose();
}
/// <summary>

Просмотреть файл

@ -11,6 +11,8 @@ using System.Net;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.StreamProcessing;
using Microsoft.StreamProcessing.Internal;
using Microsoft.StreamProcessing.Serializer;
@ -116,7 +118,7 @@ namespace SimpleTesting
for (int x = 0; x < 100; x++)
{
pool.Get(out ColumnBatch<string> inputStr);
pool.Get(out var inputStr);
var toss1 = rand.NextDouble();
inputStr.UsedLength = toss1 < 0.1 ? 0 : rand.Next(Config.DataBatchSize);
@ -129,7 +131,7 @@ namespace SimpleTesting
if (x == 1) inputStr.col[i] = string.Empty;
}
StateSerializer<ColumnBatch<string>> s = StreamableSerializer.Create<ColumnBatch<string>>(new SerializerSettings { });
var s = StreamableSerializer.Create<ColumnBatch<string>>(new SerializerSettings { });
var ms = new MemoryStream
{
Position = 0
@ -585,6 +587,54 @@ namespace SimpleTesting
}
}
[TestClass]
public class AdHocWithoutMemoryLeakDetection : TestWithConfigSettingsWithoutMemoryLeakDetection
{
public AdHocWithoutMemoryLeakDetection() : base(new ConfigModifier()
.ForceRowBasedExecution(false)
.DontFallBackToRowBasedExecution(true))
{ }
[TestMethod, TestCategory("Gated")]
public async Task DisposeTest1()
{
var cancelTokenSource = new CancellationTokenSource();
var inputSubject = new Subject<int>();
var lastSeenSubscription = 0;
var observableInput = inputSubject.AsObservable();
var inputTask = new Task(() =>
{
var n = 0;
while (!cancelTokenSource.Token.IsCancellationRequested)
{
inputSubject.OnNext(++n);
Thread.Sleep(10);
}
inputSubject.OnCompleted();
});
var semaphore = new SemaphoreSlim(0, 1);
var subscription = observableInput.ToAtemporalStreamable(TimelinePolicy.Sequence(1)).Count().ToStreamEventObservable().Where(c => c.IsData).Subscribe(c =>
{
Interlocked.Exchange(ref lastSeenSubscription, (int)c.Payload);
if (semaphore.CurrentCount == 0) semaphore.Release();
});
// Start the input feed.
inputTask.Start();
// Wait until we have at least one output data event.
await semaphore.WaitAsync();
// Dispose the subscription.
subscription.Dispose();
// Keep the input feed going, before cancel. This should behave properly if the subscription is disposed of properly.
await Task.Delay(200);
cancelTokenSource.Cancel();
await inputTask;
// Make sure we really got an output data event.
Assert.IsTrue(lastSeenSubscription > 0);
}
}
[TestClass]
public class PublishTests : TestWithConfigSettingsAndMemoryLeakDetection
{