important bug fixes for continue and added a simple recovery case

This commit is contained in:
Gunaprasaad Jeganathan 2019-01-19 02:42:36 +00:00
Родитель c94dc90b94
Коммит 1eab86ffee
4 изменённых файлов: 240 добавлений и 10 удалений

Просмотреть файл

@ -0,0 +1,200 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using FASTER.core;
namespace SumStore
{
class LatestRecoveryTest : IFasterRecoveryTest
{
const long numUniqueKeys = (1 << 23);
const long numOps = 4 * numUniqueKeys;
const long refreshInterval = (1 << 8);
const long completePendingInterval = (1 << 12);
const int checkpointInterval = 10 * 1000;
FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> fht;
Input[] inputArray;
public LatestRecoveryTest()
{
// Create FASTER index
var log = Devices.CreateLogDevice("D:\\logs\\hlog");
fht = new FasterKV
<AdId, NumClicks, Input, Output, Empty, Functions>
(numUniqueKeys, new Functions(),
new LogSettings { LogDevice = log },
new CheckpointSettings { CheckpointDir = "D:\\logs" });
}
public void Populate()
{
// Register thread with FASTER
Guid sessionGuid = fht.StartSession();
// Persist session id
System.IO.File.WriteAllText("D:\\clients\\0.txt", sessionGuid.ToString());
// Create a thread to issue periodic checkpoints
var t = new Thread(() => PeriodicCheckpoints());
t.Start();
// Generate clicks from start
GenerateClicks(0);
}
public void Continue()
{
// Recover the latest checkpoint
fht.Recover();
// Find out session Guid
string sessionGuidText = System.IO.File.ReadAllText("D:\\clients\\0.txt");
Guid.TryParse(sessionGuidText, out Guid sessionGuid);
// Register with thread
long sno = fht.ContinueSession(sessionGuid);
Console.WriteLine("Session {0} recovered until {1}", sessionGuid, sno);
// Create a thread to issue periodic checkpoints
var t = new Thread(() => PeriodicCheckpoints());
t.Start();
GenerateClicks(sno + 1);
}
public void RecoverAndTest()
{
// Recover the latest checkpoint
fht.Recover();
// Find out session Guid
string sessionGuidText = System.IO.File.ReadAllText("D:\\clients\\0.txt");
Guid.TryParse(sessionGuidText, out Guid sessionGuid);
// Register with thread
long sno = fht.ContinueSession(sessionGuid);
Console.WriteLine("Session {0} recovered until {1}", sessionGuid, sno);
// De-register session
fht.StopSession();
Test(sno);
}
private void GenerateClicks(long sno)
{
// Prepare the dataset
inputArray = new Input[numUniqueKeys];
for (int i = 0; i < numUniqueKeys; i++)
{
inputArray[i].adId.adId = (i % numUniqueKeys);
inputArray[i].numClicks.numClicks = 1;
}
// Increment in round-robin fashion starting from sno
while (true)
{
var key = (sno % numUniqueKeys);
fht.RMW(ref inputArray[key].adId, ref inputArray[key], Empty.Default, sno);
sno++;
if (sno % refreshInterval == 0)
{
fht.Refresh();
}
else if (sno % completePendingInterval == 0)
{
fht.CompletePending(false);
}
else if (sno % numUniqueKeys == 0)
{
fht.CompletePending(true);
}
}
}
private void PeriodicCheckpoints()
{
Thread.Sleep(10 * 1000);
Console.WriteLine("Started checkpoint thread");
while(true)
{
Thread.Sleep(checkpointInterval);
fht.StartSession();
fht.TakeFullCheckpoint(out Guid token);
fht.CompleteCheckpoint(true);
fht.StopSession();
Console.WriteLine("Completed checkpoint {0}", token);
}
}
public void RecoverAndTest(Guid indexToken, Guid hybridLogToken)
{
throw new NotImplementedException();
}
private void Test(long sno)
{
// Initalize array to store values
Input[] inputArray = new Input[numUniqueKeys];
for (int i = 0; i < numUniqueKeys; i++)
{
inputArray[i].adId.adId = (i % numUniqueKeys);
inputArray[i].numClicks.numClicks = 0;
}
// Start a new session
fht.StartSession();
// Issue read requests
Output output = default(Output);
for (int i = 0; i < numUniqueKeys; i++)
{
var status = fht.Read(ref inputArray[i].adId, ref inputArray[i], ref output, Empty.Default, i);
Debug.Assert(status == Status.OK || status == Status.NOTFOUND);
inputArray[i].numClicks.numClicks = output.value.numClicks;
}
// Complete all pending requests
fht.CompletePending(true);
// Release
fht.StopSession();
// Compute expected array
long[] expected = new long[numUniqueKeys];
for (long i = 0; i <= sno; i++)
{
var id = i % numUniqueKeys;
expected[id]++;
}
// Assert if expected is same as found
for (long i = 0; i < numUniqueKeys; i++)
{
if (expected[i] != inputArray[i].numClicks.numClicks)
{
Console.WriteLine("Debug error for AdId {0}: Expected ({1}), Found({2})", inputArray[i].adId.adId, expected[i], inputArray[i].numClicks.numClicks);
}
}
Console.WriteLine("Test successful");
Console.ReadLine();
}
}
}

Просмотреть файл

@ -24,29 +24,36 @@ namespace SumStore
{
if (args.Length == 0)
{
Console.WriteLine("Usage: SumStore.exe [single|concurrent|test] [populate|recover|continue] [guid]");
Console.WriteLine("Usage: SumStore.exe [single|concurrent|test|latest] [populate|recover|continue] [guid]");
return;
}
if (!Directory.Exists("logs"))
Directory.CreateDirectory("logs");
if (!Directory.Exists("D:\\logs"))
Directory.CreateDirectory("D:\\logs");
if (!Directory.Exists("D:\\clients"))
Directory.CreateDirectory("D:\\clients");
int nextArg = 0;
var test = default(IFasterRecoveryTest);
var type = args[nextArg++];
if(type == "single")
if (type == "single")
{
test = new SingleThreadedRecoveryTest();
}
else if(type == "concurrent")
else if (type == "concurrent")
{
int threadCount = int.Parse(args[nextArg++]);
test = new ConcurrentRecoveryTest(threadCount);
}
else if(type == "test")
else if (type == "test")
{
int threadCount = int.Parse(args[nextArg++]);
test = new ConcurrentTest(threadCount);
}
else if (type == "latest")
{
test = new LatestRecoveryTest();
}
else
{
Debug.Assert(false);
@ -57,12 +64,19 @@ namespace SumStore
{
test.Populate();
}
else if(task == "recover")
else if (task == "recover")
{
Guid version = Guid.Parse(args[nextArg++]);
test.RecoverAndTest(version, version);
if(type == "latest")
{
((LatestRecoveryTest)test).RecoverAndTest();
}
else
{
Guid version = Guid.Parse(args[nextArg++]);
test.RecoverAndTest(version, version);
}
}
else if(task == "continue")
else if (task == "continue")
{
test.Continue();
}

Просмотреть файл

@ -0,0 +1,8 @@
{
"profiles": {
"SumStore": {
"commandName": "Project",
"commandLineArgs": "test"
}
}
}

Просмотреть файл

@ -35,6 +35,14 @@ namespace FASTER.core
{
if (_hybridLogCheckpoint.info.continueTokens.TryGetValue(guid, out long serialNum))
{
Phase phase = _systemState.phase;
if(phase != Phase.REST)
{
throw new Exception("Can continue only in REST phase");
}
InitLocalContext(ref threadCtx, guid);
threadCtx.serialNum = serialNum;
InternalRefresh();
return serialNum;
}
}