зеркало из https://github.com/microsoft/FASTER.git
760 строки
34 KiB
C#
760 строки
34 KiB
C#
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|
// Licensed under the MIT license.
|
|
|
|
#if false
|
|
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using FASTER.core;
|
|
using NUnit.Framework;
|
|
using static FASTER.test.TestUtils;
|
|
using static FASTER.core.Utility;
|
|
|
|
#pragma warning disable IDE0060 // Remove unused parameter; used for Setup only
|
|
|
|
namespace FASTER.test.Dispose
|
|
{
|
|
[TestFixture]
|
|
internal class DisposeTests
|
|
{
|
|
// MyKey and MyValue are classes; we want to be sure we are getting the right Keys and Values to Dispose().
|
|
private FasterKV<MyKey, MyValue> fht;
|
|
private IDevice log, objlog;
|
|
|
|
// Events to coordinate forcing CAS failure (by appending a new item), etc.
|
|
private SemaphoreSlim sutGate; // Session Under Test
|
|
private SemaphoreSlim otherGate; // Other session that inserts a colliding value
|
|
|
|
[SetUp]
|
|
public void Setup()
|
|
{
|
|
DeleteDirectory(MethodTestDir, wait: true);
|
|
|
|
sutGate = new(0);
|
|
otherGate = new(0);
|
|
|
|
log = Devices.CreateLogDevice(MethodTestDir + "/ObjectFASTERTests.log", deleteOnClose: true);
|
|
objlog = Devices.CreateLogDevice(MethodTestDir + "/ObjectFASTERTests.obj.log", deleteOnClose: true);
|
|
|
|
LogSettings logSettings = new () { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 15, PageSizeBits = 10 };
|
|
var concurrencyControlMode = ConcurrencyControlMode.None;
|
|
foreach (var arg in TestContext.CurrentContext.Test.Arguments)
|
|
{
|
|
if (arg is ReadCopyDestination dest)
|
|
{
|
|
if (dest == ReadCopyDestination.ReadCache)
|
|
logSettings.ReadCacheSettings = new() { PageSizeBits = 12, MemorySizeBits = 22 };
|
|
continue;
|
|
}
|
|
if (arg is ConcurrencyControlMode lm)
|
|
{
|
|
concurrencyControlMode = lm;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
fht = new FasterKV<MyKey, MyValue>(128, logSettings: logSettings, comparer: new MyKeyComparer(),
|
|
serializerSettings: new SerializerSettings<MyKey, MyValue> { keySerializer = () => new MyKeySerializer(), valueSerializer = () => new MyValueSerializer() },
|
|
concurrencyControlMode: concurrencyControlMode // Warning: ConcurrencyControlMode.LockTable will deadlock with X locks as both keys map to the same keyHash
|
|
);
|
|
}
|
|
|
|
[TearDown]
|
|
public void TearDown()
|
|
{
|
|
fht?.Dispose();
|
|
fht = null;
|
|
log?.Dispose();
|
|
log = null;
|
|
objlog?.Dispose();
|
|
objlog = null;
|
|
DeleteDirectory(MethodTestDir);
|
|
}
|
|
|
|
// This is passed to the FasterKV ctor to override the default one. This lets us use a different key for the colliding
|
|
// CAS; we can't use the same key because Readonly-region handling in the first session Seals the to-be-transferred record,
|
|
// so the second session would loop forever while the first session waits for the collision to be written.
|
|
class MyKeyComparer : IFasterEqualityComparer<MyKey>
|
|
{
|
|
public long GetHashCode64(ref MyKey key) => Utility.GetHashCode(key.key % TestKey);
|
|
|
|
public bool Equals(ref MyKey k1, ref MyKey k2) => k1.key == k2.key;
|
|
}
|
|
|
|
const int TestKey = 111;
|
|
const int TestCollidingKey = TestKey * 2;
|
|
const int TestCollidingKey2 = TestKey * 3;
|
|
const int TestInitialValue = 3333;
|
|
const int TestUpdatedValue = 5555;
|
|
const int TestCollidingValue = 7777;
|
|
const int TestCollidingValue2 = 9999;
|
|
|
|
internal enum DisposeHandler
|
|
{
|
|
None,
|
|
SingleWriter,
|
|
CopyUpdater,
|
|
InitialUpdater,
|
|
SingleDeleter,
|
|
DeserializedFromDisk,
|
|
}
|
|
|
|
public class DisposeFunctions : FunctionsBase<MyKey, MyValue, MyInput, MyOutput, Empty>
|
|
{
|
|
private readonly DisposeTests tester;
|
|
internal readonly bool isSUT; // IsSessionUnderTest
|
|
internal Queue<DisposeHandler> handlerQueue = new();
|
|
private bool isRetry;
|
|
private bool isSplice;
|
|
|
|
internal DisposeFunctions(DisposeTests tester, bool sut, bool splice = false)
|
|
{
|
|
this.tester = tester;
|
|
isSUT = sut;
|
|
isSplice = splice;
|
|
}
|
|
|
|
void WaitForEvent()
|
|
{
|
|
Assert.IsTrue(tester.fht.epoch.ThisInstanceProtected(), "This should only be called from IFunctions methods, which are under epoch protection");
|
|
if (isSUT)
|
|
{
|
|
MyKey key = new() { key = TestKey };
|
|
tester.fht.FindHashBucketEntryForKey(ref key, out var entry);
|
|
var address = entry.Address;
|
|
if (isSplice)
|
|
{
|
|
// Get the tail entry for this key's hash chain; there should be exactly one readcache entry for this test.
|
|
Assert.IsTrue(entry.ReadCache, "Expected readcache entry in WaitForEvent pt 1");
|
|
Assert.GreaterOrEqual(entry.AbsoluteAddress, tester.fht.ReadCache.HeadAddress);
|
|
var physicalAddress = tester.fht.readcache.GetPhysicalAddress(entry.AbsoluteAddress);
|
|
ref RecordInfo recordInfo = ref tester.fht.readcache.GetInfo(physicalAddress);
|
|
address = recordInfo.PreviousAddress;
|
|
|
|
// There should be only one readcache entry for this test. The address we just got may have been kTempInvalidAddress,
|
|
// and if not then it should have been a pre-FlushAndEvict()ed record.
|
|
Assert.IsFalse(IsReadCache(address));
|
|
|
|
// Retry will have already inserted something post-FlushAndEvict.
|
|
Assert.IsTrue(isRetry || address < tester.fht.hlog.HeadAddress);
|
|
}
|
|
tester.otherGate.Release();
|
|
tester.sutGate.Wait();
|
|
|
|
tester.fht.FindHashBucketEntryForKey(ref key, out entry);
|
|
|
|
// There's a little race where the SUT session could still beat the other session to the CAS
|
|
if (!isRetry)
|
|
{
|
|
if (isSplice)
|
|
{
|
|
// If this is not Standard locking, then we use detach-and-reattach logic on the hash chain. That happens after SingleWriter,
|
|
// so 'other' thread may still be in progress . Wait for it.
|
|
while (!entry.ReadCache)
|
|
{
|
|
Assert.IsFalse(tester.fht.LockTable.IsEnabled, "Standard locking should have spliced directly");
|
|
Thread.Yield();
|
|
tester.fht.FindHashBucketEntryForKey(ref key, out entry);
|
|
}
|
|
|
|
// We're the thread awaiting the splice, so wait until the address in the last readcache record changes.
|
|
Assert.IsTrue(entry.ReadCache, "Expected readcache entry in WaitForEvent pt 2");
|
|
Assert.GreaterOrEqual(entry.AbsoluteAddress, tester.fht.ReadCache.HeadAddress);
|
|
var physicalAddress = tester.fht.readcache.GetPhysicalAddress(entry.AbsoluteAddress);
|
|
ref RecordInfo recordInfo = ref tester.fht.readcache.GetInfo(physicalAddress);
|
|
while (recordInfo.PreviousAddress == address)
|
|
{
|
|
// Wait for the splice to happen
|
|
Thread.Yield();
|
|
}
|
|
Assert.IsFalse(IsReadCache(recordInfo.PreviousAddress));
|
|
Assert.IsTrue(recordInfo.PreviousAddress >= tester.fht.hlog.HeadAddress);
|
|
}
|
|
else
|
|
{
|
|
// We're not the splice thread, so wait until the address in the hash entry changes.
|
|
while (entry.Address == address)
|
|
{
|
|
Thread.Yield();
|
|
tester.fht.FindHashBucketEntryForKey(ref key, out entry);
|
|
}
|
|
}
|
|
}
|
|
isRetry = true; // the next call will be from RETRY_NOW
|
|
}
|
|
}
|
|
|
|
void SignalEvent()
|
|
{
|
|
// Let the SUT proceed, which will trigger a RETRY_NOW due to the failed CAS, so we need to release for the second wait as well.
|
|
// Release with a count of 2 to handle the attempt it's currently blocked on and the subsequent retry.
|
|
if (!isSUT)
|
|
tester.sutGate.Release(2);
|
|
}
|
|
|
|
public override bool SingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref UpsertInfo upsertInfo, WriteReason reason)
|
|
{
|
|
WaitForEvent();
|
|
dst = src;
|
|
SignalEvent();
|
|
return true;
|
|
}
|
|
|
|
public override bool InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output, ref RMWInfo rmwInfo)
|
|
{
|
|
WaitForEvent();
|
|
value = new MyValue { value = input.value };
|
|
SignalEvent();
|
|
return true;
|
|
}
|
|
|
|
public override bool CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output, ref RMWInfo rmwInfo)
|
|
{
|
|
WaitForEvent();
|
|
newValue = new MyValue { value = oldValue.value + input.value };
|
|
SignalEvent();
|
|
return true;
|
|
}
|
|
|
|
public override bool SingleDeleter(ref MyKey key, ref MyValue value, ref DeleteInfo deleteInfo)
|
|
{
|
|
WaitForEvent();
|
|
base.SingleDeleter(ref key, ref value, ref deleteInfo);
|
|
SignalEvent();
|
|
return true;
|
|
}
|
|
|
|
public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output, ref RMWInfo rmwInfo)
|
|
{
|
|
value.value += input.value;
|
|
return true;
|
|
}
|
|
|
|
public override bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyOutput output, ref RMWInfo rmwInfo) => true;
|
|
|
|
public override bool ConcurrentReader(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput dst, ref ReadInfo readInfo)
|
|
{
|
|
Assert.Fail("ConcurrentReader should not be called for this test");
|
|
return true;
|
|
}
|
|
|
|
public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref UpsertInfo upsertInfo)
|
|
{
|
|
dst.value = src.value;
|
|
return true;
|
|
}
|
|
|
|
public override void RMWCompletionCallback(ref MyKey key, ref MyInput input, ref MyOutput output, Empty ctx, Status status, RecordMetadata recordMetadata)
|
|
{
|
|
if (isSUT)
|
|
{
|
|
Assert.IsTrue(status.Found, status.ToString());
|
|
Assert.IsTrue(status.Record.CopyUpdated, status.ToString()); // InPlace due to RETRY_NOW after CAS failure
|
|
}
|
|
else
|
|
{
|
|
Assert.IsTrue(status.NotFound, status.ToString());
|
|
Assert.IsTrue(status.Record.Created, status.ToString());
|
|
}
|
|
}
|
|
|
|
public override bool SingleReader(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput dst, ref ReadInfo readInfo)
|
|
{
|
|
dst.value = value;
|
|
return true;
|
|
}
|
|
|
|
public override void DisposeSingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref UpsertInfo upsertInfo, WriteReason reason)
|
|
{
|
|
Assert.AreEqual(TestKey, key.key);
|
|
Assert.AreEqual(TestInitialValue, src.value);
|
|
Assert.AreEqual(TestInitialValue, dst.value); // dst has been populated
|
|
handlerQueue.Enqueue(DisposeHandler.SingleWriter);
|
|
}
|
|
|
|
public override void DisposeCopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output, ref RMWInfo rmwInfo)
|
|
{
|
|
Assert.AreEqual(TestKey, key.key);
|
|
Assert.AreEqual(TestInitialValue, oldValue.value);
|
|
Assert.AreEqual(TestInitialValue + TestUpdatedValue, newValue.value);
|
|
handlerQueue.Enqueue(DisposeHandler.CopyUpdater);
|
|
}
|
|
|
|
public override void DisposeInitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output, ref RMWInfo rmwInfo)
|
|
{
|
|
Assert.AreEqual(TestKey, key.key);
|
|
Assert.AreEqual(TestInitialValue, value.value);
|
|
handlerQueue.Enqueue(DisposeHandler.InitialUpdater);
|
|
}
|
|
|
|
public override void DisposeSingleDeleter(ref MyKey key, ref MyValue value, ref DeleteInfo deleteInfo)
|
|
{
|
|
Assert.AreEqual(TestKey, key.key);
|
|
Assert.IsNull(value); // This is the default value inserted for the Tombstoned record
|
|
handlerQueue.Enqueue(DisposeHandler.SingleDeleter);
|
|
}
|
|
|
|
public override void DisposeDeserializedFromDisk(ref MyKey key, ref MyValue value)
|
|
{
|
|
VerifyKeyValueCombo(ref key, ref value);
|
|
handlerQueue.Enqueue(DisposeHandler.DeserializedFromDisk);
|
|
}
|
|
}
|
|
|
|
static void VerifyKeyValueCombo(ref MyKey key, ref MyValue value)
|
|
{
|
|
switch (key.key)
|
|
{
|
|
case TestKey:
|
|
Assert.AreEqual(TestInitialValue, value.value);
|
|
break;
|
|
case TestCollidingKey:
|
|
Assert.AreEqual(TestCollidingValue, value.value);
|
|
break;
|
|
case TestCollidingKey2:
|
|
Assert.AreEqual(TestCollidingValue2, value.value);
|
|
break;
|
|
default:
|
|
Assert.Fail($"Unexpected key: {key.key}");
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Override some things from MyFunctions for our purposes here
|
|
class DisposeFunctionsNoSync : MyFunctions
|
|
{
|
|
internal Queue<DisposeHandler> handlerQueue = new();
|
|
|
|
public override bool CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output, ref RMWInfo rmwInfo)
|
|
{
|
|
newValue = new MyValue { value = oldValue.value + input.value };
|
|
output.value = newValue;
|
|
return true;
|
|
}
|
|
|
|
public override void ReadCompletionCallback(ref MyKey key, ref MyInput input, ref MyOutput output, Empty ctx, Status status, RecordMetadata recordMetadata)
|
|
{
|
|
Assert.IsTrue(status.Found);
|
|
}
|
|
|
|
public override void DisposeDeserializedFromDisk(ref MyKey key, ref MyValue value)
|
|
{
|
|
VerifyKeyValueCombo(ref key, ref value);
|
|
handlerQueue.Enqueue(DisposeHandler.DeserializedFromDisk);
|
|
}
|
|
}
|
|
|
|
void DoFlush(FlushMode flushMode)
|
|
{
|
|
switch (flushMode)
|
|
{
|
|
case FlushMode.NoFlush:
|
|
return;
|
|
case FlushMode.ReadOnly:
|
|
fht.Log.ShiftReadOnlyAddress(fht.Log.TailAddress, wait: true);
|
|
return;
|
|
case FlushMode.OnDisk:
|
|
fht.Log.FlushAndEvict(wait: true);
|
|
return;
|
|
}
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposeSingleWriter2Threads([Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
var functions1 = new DisposeFunctions(this, sut: true);
|
|
var functions2 = new DisposeFunctions(this, sut: false);
|
|
|
|
MyKey key = new() { key = TestKey };
|
|
MyKey collidingKey = new() { key = TestCollidingKey };
|
|
MyValue value = new() { value = TestInitialValue };
|
|
MyValue collidingValue = new() { value = TestCollidingValue };
|
|
|
|
void DoUpsert(DisposeFunctions functions)
|
|
{
|
|
using var innerSession = fht.NewSession(functions);
|
|
if (functions.isSUT)
|
|
innerSession.Upsert(ref key, ref value);
|
|
else
|
|
{
|
|
otherGate.Wait();
|
|
innerSession.Upsert(ref collidingKey, ref collidingValue);
|
|
}
|
|
}
|
|
|
|
var tasks = new[]
|
|
{
|
|
Task.Factory.StartNew(() => DoUpsert(functions1)),
|
|
Task.Factory.StartNew(() => DoUpsert(functions2))
|
|
};
|
|
|
|
Task.WaitAll(tasks);
|
|
|
|
Assert.AreEqual(DisposeHandler.SingleWriter, functions1.handlerQueue.Dequeue());
|
|
Assert.IsEmpty(functions1.handlerQueue);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposeInitialUpdater2Threads([Values(FlushMode.NoFlush, FlushMode.OnDisk)] FlushMode flushMode,
|
|
[Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
var functions1 = new DisposeFunctions(this, sut: true);
|
|
var functions2 = new DisposeFunctions(this, sut: false);
|
|
|
|
MyKey key = new() { key = TestKey };
|
|
MyKey collidingKey = new() { key = TestCollidingKey };
|
|
MyInput input = new() { value = TestInitialValue };
|
|
MyInput collidingInput = new() { value = TestCollidingValue };
|
|
|
|
DoFlush(flushMode);
|
|
|
|
void DoInsert(DisposeFunctions functions)
|
|
{
|
|
using var session = fht.NewSession(functions);
|
|
if (functions.isSUT)
|
|
session.RMW(ref key, ref input);
|
|
else
|
|
{
|
|
otherGate.Wait();
|
|
session.RMW(ref collidingKey, ref collidingInput);
|
|
}
|
|
}
|
|
|
|
var tasks = new[]
|
|
{
|
|
Task.Factory.StartNew(() => DoInsert(functions1)),
|
|
Task.Factory.StartNew(() => DoInsert(functions2))
|
|
};
|
|
Task.WaitAll(tasks);
|
|
|
|
Assert.AreEqual(DisposeHandler.InitialUpdater, functions1.handlerQueue.Dequeue());
|
|
Assert.IsEmpty(functions1.handlerQueue);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposeCopyUpdater2Threads([Values(FlushMode.ReadOnly, FlushMode.OnDisk)] FlushMode flushMode,
|
|
[Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
var functions1 = new DisposeFunctions(this, sut: true);
|
|
var functions2 = new DisposeFunctions(this, sut: false);
|
|
|
|
MyKey key = new() { key = TestKey };
|
|
MyKey collidingKey = new() { key = TestCollidingKey };
|
|
{
|
|
using var session = fht.NewSession(new DisposeFunctionsNoSync());
|
|
MyValue value = new() { value = TestInitialValue };
|
|
session.Upsert(ref key, ref value);
|
|
}
|
|
|
|
// Make it immutable so CopyUpdater is called.
|
|
DoFlush(flushMode);
|
|
|
|
void DoUpdate(DisposeFunctions functions)
|
|
{
|
|
using var session = fht.NewSession(functions);
|
|
MyInput input = new() { value = functions.isSUT ? TestUpdatedValue : TestCollidingValue };
|
|
if (functions.isSUT)
|
|
session.RMW(ref key, ref input);
|
|
else
|
|
{
|
|
otherGate.Wait();
|
|
session.RMW(ref collidingKey, ref input);
|
|
}
|
|
session.CompletePending(true);
|
|
}
|
|
|
|
var tasks = new[]
|
|
{
|
|
Task.Factory.StartNew(() => DoUpdate(functions1)),
|
|
Task.Factory.StartNew(() => DoUpdate(functions2))
|
|
};
|
|
Task.WaitAll(tasks);
|
|
|
|
// The way this works for OnDisk is:
|
|
// SUT sees that the address in the hash entry is below HeadAddress (because everything has been flushed to disk)
|
|
// SUT records InitialEntryAddress with the original hash entry address
|
|
// SUT goes pending, gets to InternalContinuePendingRMW, calls CreateNewRecordRMW, which calls CopyUpdater
|
|
// SUT (in CopyUpdater) signals Other, then blocks
|
|
// Other calls InternalRMW and also sees that the address in the hash entry is below HeadAddress, so it goes pending
|
|
// Other gets to InternalContinuePendingRMW, sees its key does not exist, and calls InitialUpdater, which signals SUT
|
|
// Other returns from InternalContinuePendingRMW, which enqueues DeserializedFromDisk into functions2.handlerQueue
|
|
// SUT is now unblocked and returns from CopyUpdater. CAS fails due to Other's insertion
|
|
// SUT does the RETRY loop in InternalContinuePendingRMW
|
|
// This second loop iteration searches for the record in-memory down to InitialEntryAddress and does not find it.
|
|
// It verifies that the lower bound of the search guarantees we searched all in-memory records.
|
|
// Therefore SUT calls CreateNewRecordRMW again, which succeeds.
|
|
// SUT returns from InternalContinuePendingRMW, which enqueues DeserializedFromDisk into functions1.handlerQueue
|
|
Assert.AreEqual(DisposeHandler.CopyUpdater, functions1.handlerQueue.Dequeue());
|
|
if (flushMode == FlushMode.OnDisk)
|
|
{
|
|
Assert.AreEqual(DisposeHandler.DeserializedFromDisk, functions1.handlerQueue.Dequeue());
|
|
Assert.AreEqual(DisposeHandler.DeserializedFromDisk, functions2.handlerQueue.Dequeue());
|
|
}
|
|
Assert.IsEmpty(functions1.handlerQueue);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposeSingleDeleter2Threads([Values(FlushMode.ReadOnly, FlushMode.OnDisk)] FlushMode flushMode,
|
|
[Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
var functions1 = new DisposeFunctions(this, sut: true);
|
|
var functions2 = new DisposeFunctions(this, sut: false);
|
|
|
|
MyKey key = new() { key = TestKey };
|
|
MyKey collidingKey = new() { key = TestCollidingKey };
|
|
|
|
{
|
|
using var session = fht.NewSession(new DisposeFunctionsNoSync());
|
|
MyValue value = new() { value = TestInitialValue };
|
|
session.Upsert(ref key, ref value);
|
|
MyValue collidingValue = new() { value = TestCollidingValue };
|
|
session.Upsert(ref collidingKey, ref collidingValue);
|
|
}
|
|
|
|
// Make it immutable so we don't simply set Tombstone.
|
|
DoFlush(flushMode);
|
|
|
|
void DoDelete(DisposeFunctions functions)
|
|
{
|
|
using var innerSession = fht.NewSession(functions);
|
|
if (functions.isSUT)
|
|
innerSession.Delete(ref key);
|
|
else
|
|
{
|
|
otherGate.Wait();
|
|
innerSession.Delete(ref collidingKey);
|
|
}
|
|
}
|
|
|
|
var tasks = new[]
|
|
{
|
|
Task.Factory.StartNew(() => DoDelete(functions1)),
|
|
Task.Factory.StartNew(() => DoDelete(functions2))
|
|
};
|
|
|
|
Task.WaitAll(tasks);
|
|
|
|
Assert.AreEqual(DisposeHandler.SingleDeleter, functions1.handlerQueue.Dequeue());
|
|
Assert.IsEmpty(functions1.handlerQueue);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void PendingRead([Values] ReadCopyDestination copyDest, [Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
|
|
{
|
|
DoPendingReadInsertTest(copyDest, initialReadCacheInsert: false);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void CopyToTailWithInitialReadCache([Values(ReadCopyDestination.ReadCache)] ReadCopyDestination copyDest,
|
|
[Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
// We use the ReadCopyDestination.ReadCache parameter so Setup() knows to set up the readcache, but
|
|
// for the actual test it is used only for setup; we execute CopyToTail.
|
|
DoPendingReadInsertTest(ReadCopyDestination.Tail, initialReadCacheInsert: true);
|
|
}
|
|
|
|
void DoPendingReadInsertTest(ReadCopyDestination copyDest, bool initialReadCacheInsert)
|
|
{
|
|
MyKey key = new() { key = TestKey };
|
|
MyKey collidingKey2 = new() { key = TestCollidingKey2 };
|
|
MyValue collidingValue2 = new() { value = TestCollidingValue2 };
|
|
|
|
using var session = fht.NewSession(new DisposeFunctionsNoSync());
|
|
|
|
// Do initial insert(s) to set things up
|
|
{
|
|
MyValue value = new() { value = TestInitialValue };
|
|
session.Upsert(ref key, ref value);
|
|
if (initialReadCacheInsert)
|
|
session.Upsert(ref collidingKey2, ref collidingValue2);
|
|
}
|
|
|
|
// FlushAndEvict so we go pending
|
|
DoFlush(FlushMode.OnDisk);
|
|
|
|
MyOutput output = new();
|
|
MyInput input = new();
|
|
|
|
if (initialReadCacheInsert)
|
|
{
|
|
Assert.IsTrue(session.Read(ref collidingKey2, ref output).IsPending);
|
|
session.CompletePending(wait: true);
|
|
}
|
|
|
|
ReadOptions readOptions = new() { CopyFrom = ReadCopyFrom.Device };
|
|
if (copyDest == ReadCopyDestination.Tail)
|
|
readOptions.CopyOptions.CopyTo = ReadCopyTo.MainLog;
|
|
var status = session.Read(ref key, ref input, ref output, ref readOptions, out _);
|
|
Assert.IsTrue(status.IsPending, status.ToString());
|
|
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
|
|
(status, output) = GetSinglePendingResult(completedOutputs);
|
|
Assert.AreEqual(TestInitialValue, output.value.value);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposePendingRead2Threads([Values] ReadCopyDestination copyDest, [Values] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
DoDisposePendingReadInsertTest2Threads(copyDest, initialReadCacheInsert: false);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposeCopyToTailWithInitialReadCache2Threads([Values(ReadCopyDestination.ReadCache)] ReadCopyDestination copyDest, [Values] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
// We use the ReadCopyDestination.ReadCache parameter so Setup() knows to set up the readcache, but
|
|
// for the actual test it is used only for setup; we execute CopyToTail.
|
|
DoDisposePendingReadInsertTest2Threads(ReadCopyDestination.Tail, initialReadCacheInsert: true);
|
|
}
|
|
|
|
void DoDisposePendingReadInsertTest2Threads(ReadCopyDestination copyDest, bool initialReadCacheInsert)
|
|
{
|
|
var functions1 = new DisposeFunctions(this, sut: true, splice: initialReadCacheInsert);
|
|
var functions2 = new DisposeFunctions(this, sut: false);
|
|
|
|
MyKey key = new() { key = TestKey };
|
|
MyKey collidingKey = new() { key = TestCollidingKey };
|
|
MyValue collidingValue = new() { value = TestCollidingValue };
|
|
MyKey collidingKey2 = new() { key = TestCollidingKey2 };
|
|
MyValue collidingValue2 = new() { value = TestCollidingValue2 };
|
|
|
|
// Do initial insert(s) to set things up
|
|
{
|
|
using var session = fht.NewSession(new DisposeFunctionsNoSync());
|
|
MyValue value = new() { value = TestInitialValue };
|
|
session.Upsert(ref key, ref value);
|
|
session.Upsert(ref collidingKey, ref collidingValue);
|
|
if (initialReadCacheInsert)
|
|
session.Upsert(ref collidingKey2, ref collidingValue2);
|
|
}
|
|
|
|
// FlushAndEvict so we go pending
|
|
DoFlush(FlushMode.OnDisk);
|
|
|
|
if (initialReadCacheInsert)
|
|
{
|
|
using var session = fht.NewSession(new DisposeFunctionsNoSync());
|
|
MyOutput output = new();
|
|
var status = session.Read(ref collidingKey2, ref output);
|
|
Assert.IsTrue(status.IsPending, status.ToString());
|
|
session.CompletePending(wait: true);
|
|
}
|
|
|
|
// We use Read() only here (not Upsert()), so we have only read locks and thus do not self-deadlock with an XLock on the colliding bucket.
|
|
void DoRead(DisposeFunctions functions)
|
|
{
|
|
MyOutput output = new();
|
|
MyInput input = new();
|
|
ReadOptions readOptions = new() { CopyFrom = ReadCopyFrom.Device };
|
|
if (copyDest == ReadCopyDestination.Tail)
|
|
readOptions.CopyOptions.CopyTo = ReadCopyTo.MainLog;
|
|
|
|
using var session = fht.NewSession(functions);
|
|
if (functions.isSUT)
|
|
{
|
|
var status = session.Read(ref key, ref input, ref output, ref readOptions, out _);
|
|
Assert.IsTrue(status.IsPending, status.ToString());
|
|
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
|
|
(status, output) = GetSinglePendingResult(completedOutputs);
|
|
Assert.IsTrue(status.Found, status.ToString());
|
|
Assert.AreEqual(TestInitialValue, output.value.value);
|
|
}
|
|
else
|
|
{
|
|
otherGate.Wait();
|
|
var status = session.Read(ref collidingKey, ref input, ref output, ref readOptions, out _);
|
|
Assert.IsTrue(status.IsPending, status.ToString());
|
|
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
|
|
(status, output) = GetSinglePendingResult(completedOutputs);
|
|
Assert.IsTrue(status.Found, status.ToString());
|
|
Assert.AreEqual(TestCollidingValue, output.value.value);
|
|
}
|
|
}
|
|
|
|
var tasks = new[]
|
|
{
|
|
Task.Factory.StartNew(() => DoRead(functions1)),
|
|
Task.Factory.StartNew(() => DoRead(functions2))
|
|
};
|
|
Task.WaitAll(tasks);
|
|
|
|
if (fht.LockTable.IsEnabled || !initialReadCacheInsert) // This allows true splice, so we generated a conflict.
|
|
Assert.AreEqual(DisposeHandler.SingleWriter, functions1.handlerQueue.Dequeue());
|
|
Assert.AreEqual(DisposeHandler.DeserializedFromDisk, functions1.handlerQueue.Dequeue());
|
|
Assert.IsEmpty(functions1.handlerQueue);
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposePendingReadWithNoInsertTest([Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
var functions = new DisposeFunctionsNoSync();
|
|
|
|
MyKey key = new() { key = TestKey };
|
|
MyValue value = new() { value = TestInitialValue };
|
|
|
|
// Do initial insert
|
|
using var session = fht.NewSession(functions);
|
|
session.Upsert(ref key, ref value);
|
|
|
|
// FlushAndEvict so we go pending
|
|
DoFlush(FlushMode.OnDisk);
|
|
|
|
MyOutput output = new();
|
|
var status = session.Read(ref key, ref output);
|
|
Assert.IsTrue(status.IsPending, status.ToString());
|
|
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
|
|
(status, output) = GetSinglePendingResult(completedOutputs);
|
|
Assert.AreEqual(TestInitialValue, output.value.value);
|
|
|
|
Assert.AreEqual(DisposeHandler.DeserializedFromDisk, functions.handlerQueue.Dequeue());
|
|
}
|
|
|
|
[Test]
|
|
[Category("FasterKV")]
|
|
[Category("Smoke")]
|
|
public void DisposePendingRmwWithNoConflictTest([Values(ConcurrencyControlMode.RecordIsolation, ConcurrencyControlMode.None)] ConcurrencyControlMode concurrencyControlMode)
|
|
{
|
|
var functions = new DisposeFunctionsNoSync();
|
|
|
|
MyKey key = new() { key = TestKey };
|
|
MyValue value = new() { value = TestInitialValue };
|
|
|
|
// Do initial insert
|
|
using var session = fht.NewSession(functions);
|
|
session.Upsert(ref key, ref value);
|
|
|
|
// FlushAndEvict so we go pending
|
|
DoFlush(FlushMode.OnDisk);
|
|
|
|
MyInput input = new() { value = TestUpdatedValue };
|
|
MyOutput output = new();
|
|
var status = session.RMW(ref key, ref input, ref output);
|
|
Assert.IsTrue(status.IsPending, status.ToString());
|
|
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
|
|
(status, output) = GetSinglePendingResult(completedOutputs);
|
|
Assert.AreEqual(TestInitialValue + TestUpdatedValue, output.value.value);
|
|
|
|
Assert.AreEqual(DisposeHandler.DeserializedFromDisk, functions.handlerQueue.Dequeue());
|
|
}
|
|
}
|
|
}
|
|
#endif
|