Refactoring so that PMM does not need to be codegen'ed (#65)

This commit is contained in:
Badrish Chandramouli 2018-11-14 19:38:40 -08:00 коммит произвёл GitHub
Родитель 6a93d381b4
Коммит 7c77bfe5cb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 351 добавлений и 246 удалений

Просмотреть файл

@ -93,7 +93,7 @@ namespace FASTER.benchmark
#else
store = new FasterKV
#endif
(kMaxKey / 2, device, null);
(kMaxKey / 2, new LogSettings { LogDevice = device });
}
private void RunYcsb(int thread_idx)

Просмотреть файл

@ -119,7 +119,7 @@ namespace ManagedSample3
var h = FasterFactory.Create
<MyKey, MyValue, MyInput, MyOutput, MyContext, MyFunctions>
(128, new MyFunctions(),
new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MemorySizeBits = 14, PageSizeBits = 10 }
new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MemorySizeBits = 29 }
);
h.StartSession();

Просмотреть файл

@ -130,7 +130,7 @@ namespace ManagedSample4
var h = FasterFactory.Create
<Wrap<int>, Wrap<int>, Wrap<int>, Wrap<int>, MyContext, MyFunctions>
(128, new MyFunctions(),
new LogSettings { LogDevice = log, MemorySizeBits = 14, PageSizeBits = 10 }
new LogSettings { LogDevice = log, MemorySizeBits = 29 }
);
h.StartSession();

Просмотреть файл

@ -49,7 +49,7 @@ namespace NestedTypesTest
#endif
, MyFunctions>
(128, new MyFunctions(),
new LogSettings { LogDevice = log, MemorySizeBits = 14, PageSizeBits = 10 }
new LogSettings { LogDevice = log, MemorySizeBits = 29 }
);
h.StartSession();

Просмотреть файл

@ -19,11 +19,6 @@ namespace FASTER.core
/// </summary>
public unsafe partial class PersistentMemoryMalloc : IAllocator
{
// Size of object chunks beign written to storage
const int kObjectBlockSize = 100 * (1 << 20);
// Tail offsets per segment, in object log
public long[] segmentOffsets = new long[SegmentBufferSize];
#region Async file operations
/// <summary>
@ -239,7 +234,7 @@ namespace FASTER.core
// Set status to in-progress
PageStatusIndicator[flushPage % BufferSize].PageFlushCloseStatus
= new FlushCloseStatus { PageFlushStatus = FlushStatus.InProgress, PageCloseStatus = CloseStatus.Open };
= new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.InProgress, PageCloseStatus = PMMCloseStatus.Open };
}
PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress = -1;
@ -340,7 +335,7 @@ namespace FASTER.core
var _alignedLength = (_s.Length + (sectorSize - 1)) & ~(sectorSize - 1);
var _objAddr = Interlocked.Add(ref localSegmentOffsets[(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength;
var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength;
fixed (void* src = _s)
Buffer.MemoryCopy(src, _objBuffer.aligned_pointer, _s.Length, _s.Length);
@ -422,12 +417,12 @@ namespace FASTER.core
while (true)
{
var oldStatus = PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus;
if (oldStatus.PageCloseStatus == CloseStatus.Closed)
if (oldStatus.PageCloseStatus == PMMCloseStatus.Closed)
{
ClearPage((int)(result.page % BufferSize), result.page == 0);
}
var newStatus = oldStatus;
newStatus.PageFlushStatus = FlushStatus.Flushed;
newStatus.PageFlushStatus = PMMFlushStatus.Flushed;
if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus.value, newStatus.value, oldStatus.value))
{
break;

Просмотреть файл

@ -20,9 +20,9 @@ namespace FASTER.core
void CheckForAllocateComplete(ref long address);
}
internal enum FlushStatus : int { Flushed, InProgress };
internal enum PMMFlushStatus : int { Flushed, InProgress };
internal enum CloseStatus : int { Closed, Open };
internal enum PMMCloseStatus : int { Closed, Open };
internal struct FullPageStatus
{
@ -34,9 +34,9 @@ namespace FASTER.core
internal struct FlushCloseStatus
{
[FieldOffset(0)]
public FlushStatus PageFlushStatus;
public PMMFlushStatus PageFlushStatus;
[FieldOffset(4)]
public CloseStatus PageCloseStatus;
public PMMCloseStatus PageCloseStatus;
[FieldOffset(0)]
public long value;
}
@ -55,7 +55,7 @@ namespace FASTER.core
public unsafe partial class PersistentMemoryMalloc : IAllocator
{
// Epoch information
public LightEpoch epoch;
private LightEpoch epoch;
// Read buffer pool
NativeSectorAlignedBufferPool readBufferPool;
@ -71,35 +71,42 @@ namespace FASTER.core
private readonly int AlignedPageSizeBytes;
// Segment size
private const int LogSegmentSizeBits = 30;
private const long SegmentSize = 1 << LogSegmentSizeBits;
private const long SegmentSizeMask = SegmentSize - 1;
private const int SegmentBufferSize = 1 +
(LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize));
private readonly int LogSegmentSizeBits;
private readonly long SegmentSize;
private readonly long SegmentSizeMask;
private readonly int SegmentBufferSize;
// Total HLOG size
private const int LogTotalSizeBits = 34;
private const long LogTotalSizeBytes = 1L << LogTotalSizeBits;
private const int BufferSize = (int)(LogTotalSizeBytes / (1L << LogPageSizeBits));
private readonly int LogTotalSizeBits;
private readonly long LogTotalSizeBytes;
private readonly int BufferSize;
// HeadOffset lag (from tail)
private const int HeadOffsetLagNumPages = 4;
private const int HeadOffsetLagSize = BufferSize - HeadOffsetLagNumPages;
private const long HeadOffsetLagAddress = (long)HeadOffsetLagSize << LogPageSizeBits;
private readonly int HeadOffsetLagSize;
private readonly long HeadOffsetLagAddress;
// ReadOnlyOffset lag (from tail)
public const double LogMutableFraction = 0.9;
public const long ReadOnlyLagAddress = (long)(LogMutableFraction * BufferSize) << LogPageSizeBits;
private readonly double LogMutableFraction;
private readonly long ReadOnlyLagAddress;
// Circular buffer definition
private byte[][] values = new byte[BufferSize][];
private GCHandle[] handles = new GCHandle[BufferSize];
private long[] pointers = new long[BufferSize];
private GCHandle ptrHandle;
private long* nativePointers;
private readonly byte[][] values;
private readonly GCHandle[] handles;
private readonly long[] pointers;
private readonly GCHandle ptrHandle;
private readonly long* nativePointers;
// Array that indicates the status of each buffer page
private FullPageStatus[] PageStatusIndicator = new FullPageStatus[BufferSize];
private readonly FullPageStatus[] PageStatusIndicator;
// Size of object chunks beign written to storage
private const int kObjectBlockSize = 100 * (1 << 20);
/// <summary>
/// Tail offsets per segment, in object log
/// </summary>
public readonly long[] segmentOffsets;
NativeSectorAlignedBufferPool ioBufferPool;
@ -145,11 +152,12 @@ namespace FASTER.core
/// <summary>
/// Create instance of PMM
/// </summary>
/// <param name="device"></param>
/// <param name="objectLogDevice"></param>
/// <param name="settings"></param>
/// <param name="pageHandlers"></param>
public PersistentMemoryMalloc(IDevice device, IDevice objectLogDevice, IPageHandlers pageHandlers) : this(device, objectLogDevice, 0, pageHandlers)
public PersistentMemoryMalloc(LogSettings settings, IPageHandlers pageHandlers) : this(settings, 0, pageHandlers)
{
Allocate(Constants.kFirstValidAddress); // null pointer
ReadOnlyAddress = GetTailAddress();
SafeReadOnlyAddress = ReadOnlyAddress;
@ -162,20 +170,45 @@ namespace FASTER.core
/// <summary>
/// Create instance of PMM
/// </summary>
/// <param name="device"></param>
/// <param name="objectLogDevice"></param>
/// <param name="settings"></param>
/// <param name="startAddress"></param>
/// <param name="pageHandlers"></param>
internal PersistentMemoryMalloc(IDevice device, IDevice objectLogDevice, long startAddress, IPageHandlers pageHandlers)
internal PersistentMemoryMalloc(LogSettings settings, long startAddress, IPageHandlers pageHandlers)
{
// Segment size
LogSegmentSizeBits = settings.SegmentSizeBits;
SegmentSize = 1 << LogSegmentSizeBits;
SegmentSizeMask = SegmentSize - 1;
SegmentBufferSize = 1 +
(LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize));
// Total HLOG size
LogTotalSizeBits = settings.MemorySizeBits;
LogTotalSizeBytes = 1L << LogTotalSizeBits;
BufferSize = (int)(LogTotalSizeBytes / (1L << LogPageSizeBits));
// HeadOffset lag (from tail)
HeadOffsetLagSize = BufferSize - HeadOffsetLagNumPages;
HeadOffsetLagAddress = (long)HeadOffsetLagSize << LogPageSizeBits;
// ReadOnlyOffset lag (from tail)
LogMutableFraction = settings.MutableFraction;
ReadOnlyLagAddress = (long)(LogMutableFraction * BufferSize) << LogPageSizeBits;
values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
pointers = new long[BufferSize];
PageStatusIndicator = new FullPageStatus[BufferSize];
segmentOffsets = new long[SegmentBufferSize];
if (BufferSize < 16)
{
throw new Exception("HLOG buffer must be at least 16 pages");
}
this.device = device;
this.objectLogDevice = objectLogDevice;
device = settings.LogDevice;
objectLogDevice = settings.ObjectLogDevice;
if (pageHandlers.HasObjects())
{
@ -247,11 +280,8 @@ namespace FASTER.core
if (handles[i].IsAllocated)
handles[i].Free();
values[i] = null;
PageStatusIndicator[i].PageFlushCloseStatus = new FlushCloseStatus { PageFlushStatus = FlushStatus.Flushed, PageCloseStatus = CloseStatus.Closed };
PageStatusIndicator[i].PageFlushCloseStatus = new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.Flushed, PageCloseStatus = PMMCloseStatus.Closed };
}
handles = null;
pointers = null;
values = null;
TailPageOffset.Page = 0;
TailPageOffset.Offset = 0;
SafeReadOnlyAddress = 0;
@ -261,55 +291,95 @@ namespace FASTER.core
BeginAddress = 1;
}
/// <summary>
/// Get tail address
/// </summary>
/// <returns></returns>
public long GetTailAddress()
{
var local = TailPageOffset;
return ((long)local.Page << LogPageSizeBits) | (uint)local.Offset;
}
// Simple Accessor Functions
/// <summary>
/// Get page
/// </summary>
/// <param name="logicalAddress"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long GetPage(long logicalAddress)
{
return (logicalAddress >> LogPageSizeBits);
}
/// <summary>
/// Get page index for page
/// </summary>
/// <param name="page"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetPageIndexForPage(long page)
{
return (int)(page % BufferSize);
}
/// <summary>
/// Get page index for address
/// </summary>
/// <param name="address"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetPageIndexForAddress(long address)
{
return (int)((address >> LogPageSizeBits) % BufferSize);
}
/// <summary>
/// Get capacity (number of pages)
/// </summary>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetCapacityNumPages()
{
return BufferSize;
}
/// <summary>
/// Get start logical address
/// </summary>
/// <param name="page"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long GetStartLogicalAddress(long page)
{
return page << LogPageSizeBits;
}
/// <summary>
/// Get page size
/// </summary>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long GetPageSize()
{
return PageSize;
}
/// <summary>
/// Get offset in page
/// </summary>
/// <param name="address"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long GetOffsetInPage(long address)
{
return address & PageSizeMask;
}
/// <summary>
/// Get offset lag in pages
/// </summary>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long GetHeadOffsetLagInPages()
{
@ -325,14 +395,10 @@ namespace FASTER.core
public long GetPhysicalAddress(long logicalAddress)
{
// Offset within page
int offset = (int)(logicalAddress & PageSizeMask);
// Global page address
long page = (logicalAddress >> LogPageSizeBits);
int offset = (int)(logicalAddress & ((1L << LogPageSizeBits) -1));
// Index of page within the circular buffer
int pageIndex = (int)(page % BufferSize);
int pageIndex = (int)(logicalAddress >> LogPageSizeBits);
return *(nativePointers+pageIndex) + offset;
}
@ -420,8 +486,8 @@ namespace FASTER.core
}
//Invert the address if either the previous page is not flushed or if it is null
if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != FlushStatus.Flushed) ||
(PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != CloseStatus.Closed) ||
if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) ||
(PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) ||
(values[pageIndex] == null))
{
address = -address;
@ -483,8 +549,8 @@ namespace FASTER.core
PageAlignedShiftHeadAddress(currentTailAddress);
//Check if I can allocate pageIndex at all
if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != FlushStatus.Flushed) ||
(PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != CloseStatus.Closed) ||
if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) ||
(PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) ||
(values[pageIndex] == null))
{
return;
@ -515,6 +581,11 @@ namespace FASTER.core
}
}
/// <summary>
/// Shift begin address
/// </summary>
/// <param name="oldBeginAddress"></param>
/// <param name="newBeginAddress"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void ShiftBeginAddress(long oldBeginAddress, long newBeginAddress)
{
@ -525,26 +596,6 @@ namespace FASTER.core
});
}
/// <summary>
/// Checks if until address has been flushed!
/// </summary>
/// <param name="address"></param>
/// <returns></returns>
public bool CheckFlushedUntil(long address)
{
return FlushedUntilAddress >= address;
}
public void KillFuzzyRegion()
{
while (SafeReadOnlyAddress != ReadOnlyAddress)
{
Interlocked.CompareExchange(ref SafeReadOnlyAddress,
ReadOnlyAddress,
SafeReadOnlyAddress);
}
}
/// <summary>
/// Seal: make sure there are no longer any threads writing to the page
/// Flush: send page to secondary store
@ -595,7 +646,7 @@ namespace FASTER.core
while (true)
{
var oldStatus = PageStatusIndicator[closePage].PageFlushCloseStatus;
if (oldStatus.PageFlushStatus == FlushStatus.Flushed)
if (oldStatus.PageFlushStatus == PMMFlushStatus.Flushed)
{
ClearPage(closePage, (closePageAddress >> LogPageSizeBits) == 0);
@ -614,7 +665,7 @@ namespace FASTER.core
throw new Exception("Impossible");
}
var newStatus = oldStatus;
newStatus.PageCloseStatus = CloseStatus.Closed;
newStatus.PageCloseStatus = PMMCloseStatus.Closed;
if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[closePage].PageFlushCloseStatus.value, newStatus.value, oldStatus.value))
{
break;
@ -657,8 +708,8 @@ namespace FASTER.core
pointers[index] = (p + (sectorSize - 1)) & ~(sectorSize - 1);
values[index] = tmp;
PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = FlushStatus.Flushed;
PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = CloseStatus.Closed;
PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed;
PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed;
Interlocked.MemoryBarrier();
}
@ -761,6 +812,11 @@ namespace FASTER.core
return false;
}
/// <summary>
/// Reset for recovery
/// </summary>
/// <param name="tailAddress"></param>
/// <param name="headAddress"></param>
public void RecoveryReset(long tailAddress, long headAddress)
{
long tailPage = GetPage(tailAddress);
@ -779,7 +835,7 @@ namespace FASTER.core
for (var addr = headAddress; addr < tailAddress; addr += PageSize)
{
var pageIndex = GetPageIndexForAddress(addr);
PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = CloseStatus.Open;
PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Open;
}
}
}

Просмотреть файл

@ -38,8 +38,6 @@ namespace FASTER.core.Roslyn
"IndexRecovery",
"IndexCheckpoint",
"StateTransitions",
"PersistentMemoryMalloc",
"PMMAsyncIO",
};
@ -47,10 +45,10 @@ namespace FASTER.core.Roslyn
///
/// </summary>
/// <returns>The generated type (to be instantiated). If null, then the error messages giving the reason for failing to generate the type.</returns>
public static Tuple<Type, string> GenerateFasterHashTableClass(bool persistGeneratedCode, bool optimizeCode, long LogTotalSizeBits, double LogMutableFraction, int LogPageSizeBits, int LogSegmentSizeBits, bool kFoldOverSnapshot)
public static Tuple<Type, string> GenerateFasterHashTableClass(bool persistGeneratedCode, bool optimizeCode)
{
var c = new FasterHashTableCompiler<TKey, TValue, TInput, TOutput, TContext, TFunctions, TIFaster>();
c.Run(persistGeneratedCode, optimizeCode, LogTotalSizeBits, LogMutableFraction, LogPageSizeBits, LogSegmentSizeBits, kFoldOverSnapshot);
c.Run(persistGeneratedCode, optimizeCode);
var name = String.Format("FASTER.core.Codegen_{0}.FasterKV", c.compilation.AssemblyName);
var r = c.Compile(persistGeneratedCode);
var a = r.Item1;
@ -66,7 +64,7 @@ namespace FASTER.core.Roslyn
/// <summary>
/// Runs the transformations needed to produce a valid compilation unit.
/// </summary>
public void Run(bool persistGeneratedCode, bool optimizeCode, long LogTotalSizeBits, double LogMutableFraction, int LogPageSizeBits, int LogSegmentSizeBits, bool kFoldOverSnapshot)
public void Run(bool persistGeneratedCode, bool optimizeCode)
{
#if TIMING
Stopwatch sw = new Stopwatch();
@ -115,12 +113,6 @@ namespace FASTER.core.Roslyn
compilation = compilation.ReplaceSyntaxTree(oldTree, newTree);
}
compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogMutableFraction", LogMutableFraction.ToString(CultureInfo.InvariantCulture));
compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogTotalSizeBits", LogTotalSizeBits.ToString(CultureInfo.InvariantCulture));
compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogPageSizeBits", LogPageSizeBits.ToString(CultureInfo.InvariantCulture));
compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogSegmentSizeBits", LogSegmentSizeBits.ToString(CultureInfo.InvariantCulture));
compilation = RoslynHelpers.ReplaceConstantValue(compilation, "kFoldOverSnapshot", kFoldOverSnapshot ? "true" : "false");
#if TIMING
sw.Stop();
System.Diagnostics.Debug.WriteLine("Time to run the FasterHashTable compiler: {0}ms", sw.ElapsedMilliseconds);

Просмотреть файл

@ -27,11 +27,11 @@ namespace FASTER.core
#endif
;
public static TIFaster GetFasterHashTable<TKey, TValue, TInput, TOutput, TContext, TFunctions, TIFaster>
(long size, IDevice logDevice, IDevice objectLogDevice, string checkpointDir, long LogTotalSizeBits, double LogMutableFraction, int LogPageSizeBits, int LogSegmentSizeBits, bool kFoldOverSnapshot, bool persistDll = PersistDll, bool optimizeCode = OptimizeCode)
(long size, LogSettings logSettings, CheckpointSettings checkpointSettings, bool persistDll = PersistDll, bool optimizeCode = OptimizeCode)
{
var s = Roslyn.FasterHashTableCompiler<TKey, TValue, TInput, TOutput, TContext, TFunctions, TIFaster>.GenerateFasterHashTableClass(persistDll, optimizeCode, LogTotalSizeBits, LogMutableFraction, LogPageSizeBits, LogSegmentSizeBits, kFoldOverSnapshot);
var s = Roslyn.FasterHashTableCompiler<TKey, TValue, TInput, TOutput, TContext, TFunctions, TIFaster>.GenerateFasterHashTableClass(persistDll, optimizeCode);
var t = s.Item1;
var instance = Activator.CreateInstance(t, size, logDevice, objectLogDevice, checkpointDir);
var instance = Activator.CreateInstance(t, size, logSettings, checkpointSettings);
return (TIFaster)instance;
}

Просмотреть файл

@ -224,7 +224,7 @@ namespace FASTER.core
ObtainCurrentTailAddress(ref _hybridLogCheckpoint.info.startLogicalAddress);
if (!Constants.kFoldOverSnapshot)
if (!FoldOverSnapshot)
{
_hybridLogCheckpoint.info.flushedLogicalAddress = hlog.FlushedUntilAddress;
_hybridLogCheckpoint.info.useSnapshotFile = 1;
@ -254,7 +254,7 @@ namespace FASTER.core
WriteIndexMetaFile();
}
if (Constants.kFoldOverSnapshot)
if (FoldOverSnapshot)
{
hlog.ShiftReadOnlyToTail(out long tailAddress);
@ -507,7 +507,7 @@ namespace FASTER.core
if (!prevThreadCtx.markers[EpochPhaseIdx.WaitFlush])
{
var notify = false;
if (Constants.kFoldOverSnapshot)
if (FoldOverSnapshot)
{
notify = (hlog.FlushedUntilAddress >= _hybridLogCheckpoint.info.finalLogicalAddress);
}

Просмотреть файл

@ -14,6 +14,17 @@ using System.Threading;
namespace FASTER.core
{
/// <summary>
/// FASTER configuration
/// </summary>
public static class Config
{
/// <summary>
/// Checkpoint directory
/// </summary>
public static string CheckpointDirectory = "C:\\data";
}
public unsafe partial class FasterKV : FasterBase, IFasterKV, IPageHandlers
{
private PersistentMemoryMalloc hlog;
@ -22,6 +33,7 @@ namespace FASTER.core
private const bool kCopyReadsToTail = false;
private const bool breakWhenClassIsLoaded = false;
private readonly bool FoldOverSnapshot = false;
/// <summary>
/// Tail address of log
@ -75,15 +87,17 @@ namespace FASTER.core
/// Create FASTER instance
/// </summary>
/// <param name="size"></param>
/// <param name="logDevice"></param>
/// <param name="objectLogDevice"></param>
/// <param name="checkpointDir"></param>
public FasterKV(long size, IDevice logDevice, IDevice objectLogDevice, string checkpointDir = null)
/// <param name="logSettings"></param>
/// <param name="checkpointSettings"></param>
public FasterKV(long size, LogSettings logSettings, CheckpointSettings checkpointSettings = null)
{
if (checkpointDir != null)
Config.CheckpointDirectory = checkpointDir;
if (checkpointSettings == null)
checkpointSettings = new CheckpointSettings();
hlog = new PersistentMemoryMalloc(logDevice, objectLogDevice, this);
Config.CheckpointDirectory = checkpointSettings.CheckpointDir;
FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
hlog = new PersistentMemoryMalloc(logSettings, this);
var recordSize = Layout.EstimatePhysicalSize(null, null);
Initialize(size, hlog.GetSectorSize());

Просмотреть файл

@ -18,8 +18,6 @@ namespace FASTER.core
/// Size of cache line in bytes
public const int kCacheLineBytes = 64;
public const bool kFoldOverSnapshot = false;
public const bool kFineGrainedHandoverRecord = false;
public const bool kFineGrainedHandoverBucket = true;

Просмотреть файл

@ -736,7 +736,7 @@ namespace FASTER.core
// Mutable Region: Update the record in-place
if (logicalAddress >= hlog.ReadOnlyAddress)
{
if(Constants.kFoldOverSnapshot)
if(FoldOverSnapshot)
{
Debug.Assert(Layout.GetInfo(physicalAddress)->Version == threadCtx.version);
}
@ -1012,7 +1012,7 @@ namespace FASTER.core
// Mutable Region: Update the record in-place
if (logicalAddress >= hlog.ReadOnlyAddress)
{
if (Constants.kFoldOverSnapshot)
if (FoldOverSnapshot)
{
Debug.Assert(Layout.GetInfo(physicalAddress)->Version == threadCtx.version);
}

Просмотреть файл

@ -12,6 +12,41 @@ using System.Threading;
namespace FASTER.core
{
internal enum ReadStatus { Pending, Done };
internal enum FlushStatus { Pending, Done };
internal class RecoveryStatus
{
public long startPage;
public long endPage;
public int capacity;
public IDevice recoveryDevice;
public long recoveryDevicePageOffset;
public IDevice objectLogRecoveryDevice;
public ReadStatus[] readStatus;
public FlushStatus[] flushStatus;
public RecoveryStatus(int capacity,
long startPage,
long endPage)
{
this.capacity = capacity;
this.startPage = startPage;
this.endPage = endPage;
readStatus = new ReadStatus[capacity];
flushStatus = new FlushStatus[capacity];
for (int i = 0; i < capacity; i++)
{
flushStatus[i] = FlushStatus.Done;
readStatus[i] = ReadStatus.Pending;
}
}
}
/// <summary>
/// Partial class for recovery code in FASTER
/// </summary>
@ -24,7 +59,7 @@ namespace FASTER.core
// Recover segment offsets for object log
if (_hybridLogCheckpoint.info.objectLogSegmentOffsets != null)
hlog.segmentOffsets = _hybridLogCheckpoint.info.objectLogSegmentOffsets;
Array.Copy(_hybridLogCheckpoint.info.objectLogSegmentOffsets, hlog.segmentOffsets, _hybridLogCheckpoint.info.objectLogSegmentOffsets.Length);
_indexCheckpoint.main_ht_device = new LocalStorageDevice(DirectoryConfiguration.GetPrimaryHashTableFileName(_indexCheckpoint.info.token));
_indexCheckpoint.ofb_device = new LocalStorageDevice(DirectoryConfiguration.GetOverflowBucketsFileName(_indexCheckpoint.info.token));
@ -46,7 +81,7 @@ namespace FASTER.core
DeleteTentativeEntries();
if (Constants.kFoldOverSnapshot)
if (FoldOverSnapshot)
{
RecoverHybridLog(_indexCheckpoint.info, _hybridLogCheckpoint.info);
}
@ -114,37 +149,6 @@ namespace FASTER.core
hlog.RecoveryReset(untilAddress, headAddress);
}
private enum ReadStatus { Pending, Done };
private enum FlushStatus { Pending, Done };
private class RecoveryStatus
{
public long startPage;
public long endPage;
public int capacity;
public IDevice recoveryDevice;
public long recoveryDevicePageOffset;
public IDevice objectLogRecoveryDevice;
public ReadStatus[] readStatus;
public FlushStatus[] flushStatus;
public RecoveryStatus(int capacity,
long startPage,
long endPage)
{
this.capacity = capacity;
this.startPage = startPage;
this.endPage = endPage;
readStatus = new ReadStatus[capacity];
flushStatus = new FlushStatus[capacity];
for (int i = 0; i < capacity; i++)
{
flushStatus[i] = FlushStatus.Done;
readStatus[i] = ReadStatus.Pending;
}
}
}
private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo,
HybridLogRecoveryInfo recoveryInfo)
@ -422,15 +426,12 @@ namespace FASTER.core
if (Interlocked.Decrement(ref result.count) == 0)
{
// We don't write partial pages during recovery
Debug.Assert(result.partial == false);
int index = hlog.GetPageIndexForPage(result.page);
result.context.flushStatus[index] = FlushStatus.Done;
if (result.page + result.context.capacity < result.context.endPage)
{
long readPage = result.page + result.context.capacity;
if (Constants.kFoldOverSnapshot)
if (FoldOverSnapshot)
{
hlog.AsyncReadPagesFromDevice(readPage, 1, AsyncReadPagesCallbackForRecovery, result.context);
}

Просмотреть файл

@ -24,11 +24,6 @@ namespace FASTER.core
/// </summary>
public IDevice ObjectLogDevice = new NullDevice();
/// <summary>
/// Size of page, in bits
/// </summary>
public int PageSizeBits = 25;
/// <summary>
/// Size of a segment (group of pages), in bits
/// </summary>
@ -164,10 +159,7 @@ namespace FASTER.core
HashTableManager.GetFasterHashTable
<TKey, TValue, TInput, TOutput,
TContext, TFunctions, TIFaster>
(indexSizeBuckets, logSettings.LogDevice, logSettings.ObjectLogDevice,
checkpointSettings.CheckpointDir, logSettings.MemorySizeBits,
logSettings.MutableFraction, logSettings.PageSizeBits,
logSettings.SegmentSizeBits, checkpointSettings.CheckPointType == CheckpointType.FoldOver);
(indexSizeBuckets, logSettings, checkpointSettings);
}
/// <summary>

Просмотреть файл

@ -12,17 +12,6 @@ using System.IO;
namespace FASTER.core
{
/// <summary>
/// FASTER configuration
/// </summary>
public static class Config
{
/// <summary>
/// Checkpoint directory
/// </summary>
public static string CheckpointDirectory = "C:\\data";
}
internal struct AsyncGetFromDiskResult<TContext> : IAsyncResult
{
public TContext context;
@ -36,73 +25,6 @@ namespace FASTER.core
public bool CompletedSynchronously => throw new NotImplementedException();
}
internal struct PageAsyncReadResult<TContext> : IAsyncResult
{
public long page;
public TContext context;
public CountdownEvent handle;
public SectorAlignedMemory freeBuffer1;
public IOCompletionCallback callback;
public int count;
public IDevice objlogDevice;
public long resumeptr;
public long untilptr;
public bool IsCompleted => throw new NotImplementedException();
public WaitHandle AsyncWaitHandle => throw new NotImplementedException();
public object AsyncState => throw new NotImplementedException();
public bool CompletedSynchronously => throw new NotImplementedException();
public void Free()
{
if (freeBuffer1.buffer != null)
freeBuffer1.Return();
if (handle != null)
{
handle.Signal();
}
}
}
internal class PageAsyncFlushResult<TContext> : IAsyncResult
{
public long page;
public TContext context;
public bool partial;
public long untilAddress;
public int count;
public CountdownEvent handle;
public IDevice objlogDevice;
public SectorAlignedMemory freeBuffer1;
public SectorAlignedMemory freeBuffer2;
public AutoResetEvent done;
public bool IsCompleted => throw new NotImplementedException();
public WaitHandle AsyncWaitHandle => throw new NotImplementedException();
public object AsyncState => throw new NotImplementedException();
public bool CompletedSynchronously => throw new NotImplementedException();
public void Free()
{
if (freeBuffer1.buffer != null)
freeBuffer1.Return();
if (freeBuffer2.buffer != null)
freeBuffer2.Return();
if (handle != null)
{
handle.Signal();
}
}
}
internal unsafe class HashIndexPageAsyncFlushResult : IAsyncResult
{
public HashBucket* start;

Просмотреть файл

@ -0,0 +1,135 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#define CALLOC
using System;
using System.Threading;
namespace FASTER.core
{
/// <summary>
/// Result of async page read
/// </summary>
/// <typeparam name="TContext"></typeparam>
public struct PageAsyncReadResult<TContext> : IAsyncResult
{
/// <summary>
/// Page
/// </summary>
public long page;
/// <summary>
/// Context
/// </summary>
public TContext context;
/// <summary>
/// Count
/// </summary>
public int count;
internal CountdownEvent handle;
internal SectorAlignedMemory freeBuffer1;
internal IOCompletionCallback callback;
internal IDevice objlogDevice;
internal long resumeptr;
internal long untilptr;
/// <summary>
///
/// </summary>
public bool IsCompleted => throw new NotImplementedException();
/// <summary>
///
/// </summary>
public WaitHandle AsyncWaitHandle => throw new NotImplementedException();
/// <summary>
///
/// </summary>
public object AsyncState => throw new NotImplementedException();
/// <summary>
///
/// </summary>
public bool CompletedSynchronously => throw new NotImplementedException();
/// <summary>
/// Free
/// </summary>
public void Free()
{
if (freeBuffer1.buffer != null)
freeBuffer1.Return();
if (handle != null)
{
handle.Signal();
}
}
}
/// <summary>
/// Page async flush result
/// </summary>
/// <typeparam name="TContext"></typeparam>
public class PageAsyncFlushResult<TContext> : IAsyncResult
{
/// <summary>
/// Page
/// </summary>
public long page;
/// <summary>
/// Context
/// </summary>
public TContext context;
/// <summary>
/// Count
/// </summary>
public int count;
internal bool partial;
internal long untilAddress;
internal CountdownEvent handle;
internal IDevice objlogDevice;
internal SectorAlignedMemory freeBuffer1;
internal SectorAlignedMemory freeBuffer2;
internal AutoResetEvent done;
/// <summary>
///
/// </summary>
public bool IsCompleted => throw new NotImplementedException();
/// <summary>
///
/// </summary>
public WaitHandle AsyncWaitHandle => throw new NotImplementedException();
/// <summary>
///
/// </summary>
public object AsyncState => throw new NotImplementedException();
/// <summary>
///
/// </summary>
public bool CompletedSynchronously => throw new NotImplementedException();
/// <summary>
/// Free
/// </summary>
public void Free()
{
if (freeBuffer1.buffer != null)
freeBuffer1.Return();
if (freeBuffer2.buffer != null)
freeBuffer2.Return();
if (handle != null)
{
handle.Signal();
}
}
}
}

Просмотреть файл

@ -32,14 +32,14 @@ namespace FASTER.test.largeobjects
fht1 = FasterFactory.Create
<MyKey, MyLargeValue, MyInput, MyLargeOutput, MyContext, MyLargeFunctions>
(indexSizeBuckets: 128, functions: new MyLargeFunctions(),
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot }
);
fht2 = FasterFactory.Create
<MyKey, MyLargeValue, MyInput, MyLargeOutput, MyContext, MyLargeFunctions>
(indexSizeBuckets: 128, functions: new MyLargeFunctions(),
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot }
);
@ -96,14 +96,14 @@ namespace FASTER.test.largeobjects
fht1 = FasterFactory.Create
<MyKey, MyLargeValue, MyInput, MyLargeOutput, MyContext, MyLargeFunctions>
(indexSizeBuckets: 128, functions: new MyLargeFunctions(),
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver }
);
fht2 = FasterFactory.Create
<MyKey, MyLargeValue, MyInput, MyLargeOutput, MyContext, MyLargeFunctions>
(indexSizeBuckets: 128, functions: new MyLargeFunctions(),
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver }
);

Просмотреть файл

@ -29,7 +29,7 @@ namespace FASTER.test
fht = FasterFactory.Create
<MyKey, MyValue, MyInput, MyOutput, MyContext, MyFunctions>
(indexSizeBuckets: 128, functions: new MyFunctions(),
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckPointType = CheckpointType.FoldOver }
);
fht.StartSession();

Просмотреть файл

@ -32,14 +32,14 @@ namespace FASTER.test.recovery.sumstore.simple
fht1 = FasterFactory.Create
<AdId, NumClicks, Input, Output, Empty, SimpleFunctions, ICustomFaster>
(indexSizeBuckets: 128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot }
);
fht2 = FasterFactory.Create
<AdId, NumClicks, Input, Output, Empty, SimpleFunctions, ICustomFaster>
(indexSizeBuckets: 128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot }
);
@ -99,14 +99,14 @@ namespace FASTER.test.recovery.sumstore.simple
fht1 = FasterFactory.Create
<AdId, NumClicks, Input, Output, Empty, SimpleFunctions, ICustomFaster>
(indexSizeBuckets: 128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver }
);
fht2 = FasterFactory.Create
<AdId, NumClicks, Input, Output, Empty, SimpleFunctions, ICustomFaster>
(indexSizeBuckets: 128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 },
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver }
);