
397 строки
16 KiB

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
using NUnit.Framework;
using FASTER.devices;
namespace FASTER.test.recovery.sumstore.simple
public class RecoveryTests
const int numOps = 5000;
AdId[] inputArray;
private byte[] commitCookie;
string checkpointDir;
ICheckpointManager checkpointManager;
private FasterKV<AdId, NumClicks> fht1;
private FasterKV<AdId, NumClicks> fht2;
private IDevice log;
public void Setup()
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
checkpointManager = default;
checkpointDir = default;
inputArray = new AdId[numOps];
for (int i = 0; i < numOps; i++)
inputArray[i].adId = i;
public void TearDown()
fht1 = null;
fht2 = null;
log = null;
[Category("FasterKV"), Category("CheckpointRestore")]
public async ValueTask PageBlobSimpleRecoveryTest([Values]CheckpointType checkpointType, [Values]bool isAsync, [Values]bool testCommitCookie)
checkpointManager = new DeviceLogCommitCheckpointManager(
new AzureStorageNamedDeviceFactory(TestUtils.AzureEmulatedStorageString),
new DefaultCheckpointNamingScheme($"{TestUtils.AzureTestContainer}/{TestUtils.AzureTestDirectory}"));
await SimpleRecoveryTest1_Worker(checkpointType, isAsync, testCommitCookie);
public async ValueTask LocalDeviceSimpleRecoveryTest([Values] CheckpointType checkpointType, [Values] bool isAsync, [Values]bool testCommitCookie)
checkpointManager = new DeviceLogCommitCheckpointManager(
new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme($"{TestUtils.MethodTestDir}/chkpt"));
await SimpleRecoveryTest1_Worker(checkpointType, isAsync, testCommitCookie);
[Category("FasterKV"), Category("CheckpointRestore")]
public async ValueTask SimpleRecoveryTest1([Values]CheckpointType checkpointType, [Values]bool isAsync, [Values]bool testCommitCookie)
await SimpleRecoveryTest1_Worker(checkpointType, isAsync, testCommitCookie);
private async ValueTask SimpleRecoveryTest1_Worker(CheckpointType checkpointType, bool isAsync, bool testCommitCookie)
if (testCommitCookie)
// Generate a new unique byte sequence for test
commitCookie = Guid.NewGuid().ToByteArray();
if (checkpointManager is null)
checkpointDir = TestUtils.MethodTestDir + $"/checkpoints";
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/SimpleRecoveryTest1.log", deleteOnClose: true);
fht1 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir, CheckpointManager = checkpointManager }
fht2 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir, CheckpointManager = checkpointManager }
NumClicks value;
AdInput inputArg = default;
Output output = default;
var session1 = fht1.NewSession(new AdSimpleFunctions());
for (int key = 0; key < numOps; key++)
value.numClicks = key;
session1.Upsert(ref inputArray[key], ref value, Empty.Default, 0);
if (testCommitCookie)
fht1.CommitCookie = commitCookie;
fht1.TryInitiateFullCheckpoint(out Guid token, checkpointType);
if (isAsync)
await fht2.RecoverAsync(token);
if (testCommitCookie)
var session2 = fht2.NewSession(new AdSimpleFunctions());
Assert.AreEqual(2, session2.ID);
for (int key = 0; key < numOps; key++)
var status = session2.Read(ref inputArray[key], ref inputArg, ref output, Empty.Default, 0);
if (status.IsPending)
session2.CompletePendingWithOutputs(out var outputs, wait: true);
output = outputs.Current.Output;
Assert.AreEqual(key, output.value.numClicks);
[Category("FasterKV"), Category("CheckpointRestore")]
public async ValueTask SimpleRecoveryTest2([Values]CheckpointType checkpointType, [Values]bool isAsync)
checkpointManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir + "/checkpoints4"), false);
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/SimpleRecoveryTest2.log", deleteOnClose: true);
fht1 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager }
fht2 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager }
NumClicks value;
AdInput inputArg = default;
Output output = default;
var session1 = fht1.NewSession(new AdSimpleFunctions());
for (int key = 0; key < numOps; key++)
value.numClicks = key;
session1.Upsert(ref inputArray[key], ref value, Empty.Default, 0);
fht1.TryInitiateFullCheckpoint(out Guid token, checkpointType);
if (isAsync)
await fht2.RecoverAsync(token);
var session2 = fht2.NewSession(new AdSimpleFunctions());
for (int key = 0; key < numOps; key++)
var status = session2.Read(ref inputArray[key], ref inputArg, ref output, Empty.Default, 0);
if (status.IsPending)
Assert.AreEqual(key, output.value.numClicks);
[Category("FasterKV"), Category("CheckpointRestore")]
public async ValueTask ShouldRecoverBeginAddress([Values]bool isAsync)
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/SimpleRecoveryTest2.log", deleteOnClose: true);
checkpointDir = TestUtils.MethodTestDir + "/checkpoints6";
fht1 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir }
fht2 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir }
NumClicks value;
var session1 = fht1.NewSession(new AdSimpleFunctions());
var address = 0L;
for (int key = 0; key < numOps; key++)
value.numClicks = key;
session1.Upsert(ref inputArray[key], ref value, Empty.Default, 0);
if (key == 2999)
address = fht1.Log.TailAddress;
fht1.TryInitiateFullCheckpoint(out Guid token, CheckpointType.FoldOver);
if (isAsync)
await fht2.RecoverAsync(token);
Assert.AreEqual(address, fht2.Log.BeginAddress);
[Category("FasterKV"), Category("CheckpointRestore")]
public void SimpleReadAndUpdateInfoTest()
checkpointManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir + "/checkpoints"), false);
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/SimpleReadAndUpdateInfoTest.log", deleteOnClose: true);
fht1 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager }
fht2 = new FasterKV<AdId, NumClicks>(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager }
NumClicks value;
AdInput inputArg = default;
Output output = default;
AdSimpleFunctions functions1 = new(1);
AdSimpleFunctions functions2 = new(2);
var session1 = fht1.NewSession(functions1);
for (int key = 0; key < numOps; key++)
value.numClicks = key;
if ((key & 1) > 0)
session1.Upsert(ref inputArray[key], ref value, Empty.Default, 0);
AdInput input = new() { adId = inputArray[key], numClicks = value };
session1.RMW(ref inputArray[key], ref input);
fht1.TryInitiateFullCheckpoint(out Guid token, CheckpointType.FoldOver);
var session2 = fht2.NewSession(functions2);
// Just need one operation here to verify readInfo/upsertInfo in the functions
var lastKey = inputArray.Length - 1;
var status = session2.Read(ref inputArray[lastKey], ref inputArg, ref output, Empty.Default, 0);
Assert.IsFalse(status.IsPending, status.ToString());
value.numClicks = lastKey;
status = session2.Upsert(ref inputArray[lastKey], ref value, Empty.Default, 0);
Assert.IsFalse(status.IsPending, status.ToString());
inputArg = new() { adId = inputArray[lastKey], numClicks = new NumClicks { numClicks = 0} }; // CopyUpdater adds, so make this 0
status = session2.RMW(ref inputArray[lastKey], ref inputArg);
Assert.IsFalse(status.IsPending, status.ToString());
// Now verify Pending
fht2.Log.FlushAndEvict(wait: true);
output.value = new() { numClicks = lastKey };
inputArg.numClicks = new() { numClicks = lastKey };
status = session2.Read(ref inputArray[lastKey], ref inputArg, ref output, Empty.Default, 0);
Assert.IsTrue(status.IsPending, status.ToString());
session2.CompletePending(wait: true);
// Upsert does not go pending so is skipped here
output.value = new() { numClicks = lastKey };
inputArg.numClicks = new() { numClicks = lastKey };
status = session2.RMW(ref inputArray[lastKey], ref inputArg);
Assert.IsTrue(status.IsPending, status.ToString());
session2.CompletePending(wait: true);
public class AdSimpleFunctions : FunctionsBase<AdId, NumClicks, AdInput, Output, Empty>
long expectedVersion;
internal AdSimpleFunctions(long ver = -1) => this.expectedVersion = ver;
public override void ReadCompletionCallback(ref AdId key, ref AdInput input, ref Output output, Empty ctx, Status status, RecordMetadata recordMetadata)
Assert.AreEqual(key.adId, output.value.numClicks);
// Read functions
public override bool SingleReader(ref AdId key, ref AdInput input, ref NumClicks value, ref Output dst, ref ReadInfo readInfo)
if (expectedVersion >= 0)
Assert.AreEqual(expectedVersion, readInfo.Version);
dst.value = value;
return true;
public override bool ConcurrentReader(ref AdId key, ref AdInput input, ref NumClicks value, ref Output dst, ref ReadInfo readInfo)
if (expectedVersion >= 0)
Assert.AreEqual(expectedVersion, readInfo.Version);
dst.value = value;
return true;
// RMW functions
public override bool InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output, ref RMWInfo rmwInfo)
if (expectedVersion >= 0)
Assert.AreEqual(expectedVersion, rmwInfo.Version);
value = input.numClicks;
return true;
public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output, ref RMWInfo rmwInfo)
if (expectedVersion >= 0)
Assert.AreEqual(expectedVersion, rmwInfo.Version);
Interlocked.Add(ref value.numClicks, input.numClicks.numClicks);
return true;
public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref Output output, ref RMWInfo rmwInfo)
if (expectedVersion >= 0)
Assert.AreEqual(expectedVersion, rmwInfo.Version);
return true;
public override bool CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output, ref RMWInfo rmwInfo)
if (expectedVersion >= 0)
Assert.AreEqual(expectedVersion, rmwInfo.Version);
newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks;
return true;