From 7c77bfe5cbfa66c707930a76f7296c97fd7ca3bc Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 14 Nov 2018 19:38:40 -0800 Subject: [PATCH] Refactoring so that PMM does not need to be codegen'ed (#65) --- cs/benchmark/FasterYcsbBenchmark.cs | 2 +- cs/playground/ManagedSample3/Program.cs | 2 +- cs/playground/ManagedSample4/Program.cs | 2 +- cs/playground/NestedTypesTest/Program.cs | 2 +- cs/src/core/Allocator/PMMAsyncIO.cs | 13 +- .../core/Allocator/PersistentMemoryMalloc.cs | 200 +++++++++++------- .../core/Codegen/FasterHashTableCompiler.cs | 14 +- cs/src/core/Codegen/HashTableManager.cs | 6 +- cs/src/core/Index/FASTER/Checkpoint.cs | 6 +- cs/src/core/Index/FASTER/FASTER.cs | 28 ++- cs/src/core/Index/FASTER/FASTERBase.cs | 2 - cs/src/core/Index/FASTER/FASTERImpl.cs | 4 +- cs/src/core/Index/FASTER/Recovery.cs | 75 +++---- cs/src/core/ManagedLayer/FASTERFactory.cs | 10 +- cs/src/core/Utilities/AsyncResultTypes.cs | 78 ------- cs/src/core/Utilities/PageAsyncResultTypes.cs | 135 ++++++++++++ cs/test/LargeObjectTests.cs | 8 +- cs/test/ObjectFASTERTests.cs | 2 +- cs/test/SimpleRecoveryTest.cs | 8 +- 19 files changed, 351 insertions(+), 246 deletions(-) create mode 100644 cs/src/core/Utilities/PageAsyncResultTypes.cs diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index b9421ece..2824d182 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -93,7 +93,7 @@ namespace FASTER.benchmark #else store = new FasterKV #endif - (kMaxKey / 2, device, null); + (kMaxKey / 2, new LogSettings { LogDevice = device }); } private void RunYcsb(int thread_idx) diff --git a/cs/playground/ManagedSample3/Program.cs b/cs/playground/ManagedSample3/Program.cs index 098e2f95..25c3f478 100644 --- a/cs/playground/ManagedSample3/Program.cs +++ b/cs/playground/ManagedSample3/Program.cs @@ -119,7 +119,7 @@ namespace ManagedSample3 var h = FasterFactory.Create (128, new MyFunctions(), - new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MemorySizeBits = 14, PageSizeBits = 10 } + new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MemorySizeBits = 29 } ); h.StartSession(); diff --git a/cs/playground/ManagedSample4/Program.cs b/cs/playground/ManagedSample4/Program.cs index bde09e61..a831bae0 100644 --- a/cs/playground/ManagedSample4/Program.cs +++ b/cs/playground/ManagedSample4/Program.cs @@ -130,7 +130,7 @@ namespace ManagedSample4 var h = FasterFactory.Create , Wrap, Wrap, Wrap, MyContext, MyFunctions> (128, new MyFunctions(), - new LogSettings { LogDevice = log, MemorySizeBits = 14, PageSizeBits = 10 } + new LogSettings { LogDevice = log, MemorySizeBits = 29 } ); h.StartSession(); diff --git a/cs/playground/NestedTypesTest/Program.cs b/cs/playground/NestedTypesTest/Program.cs index 6d590114..e3cf279d 100644 --- a/cs/playground/NestedTypesTest/Program.cs +++ b/cs/playground/NestedTypesTest/Program.cs @@ -49,7 +49,7 @@ namespace NestedTypesTest #endif , MyFunctions> (128, new MyFunctions(), - new LogSettings { LogDevice = log, MemorySizeBits = 14, PageSizeBits = 10 } + new LogSettings { LogDevice = log, MemorySizeBits = 29 } ); h.StartSession(); diff --git a/cs/src/core/Allocator/PMMAsyncIO.cs b/cs/src/core/Allocator/PMMAsyncIO.cs index 5dfdeef7..070781d4 100644 --- a/cs/src/core/Allocator/PMMAsyncIO.cs +++ b/cs/src/core/Allocator/PMMAsyncIO.cs @@ -19,11 +19,6 @@ namespace FASTER.core /// public unsafe partial class PersistentMemoryMalloc : IAllocator { - // Size of object chunks beign written to storage - const int kObjectBlockSize = 100 * (1 << 20); - // Tail offsets per segment, in object log - public long[] segmentOffsets = new long[SegmentBufferSize]; - #region Async file operations /// @@ -239,7 +234,7 @@ namespace FASTER.core // Set status to in-progress PageStatusIndicator[flushPage % BufferSize].PageFlushCloseStatus - = new FlushCloseStatus { PageFlushStatus = FlushStatus.InProgress, PageCloseStatus = CloseStatus.Open }; + = new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.InProgress, PageCloseStatus = PMMCloseStatus.Open }; } PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress = -1; @@ -340,7 +335,7 @@ namespace FASTER.core var _alignedLength = (_s.Length + (sectorSize - 1)) & ~(sectorSize - 1); - var _objAddr = Interlocked.Add(ref localSegmentOffsets[(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; + var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; fixed (void* src = _s) Buffer.MemoryCopy(src, _objBuffer.aligned_pointer, _s.Length, _s.Length); @@ -422,12 +417,12 @@ namespace FASTER.core while (true) { var oldStatus = PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus; - if (oldStatus.PageCloseStatus == CloseStatus.Closed) + if (oldStatus.PageCloseStatus == PMMCloseStatus.Closed) { ClearPage((int)(result.page % BufferSize), result.page == 0); } var newStatus = oldStatus; - newStatus.PageFlushStatus = FlushStatus.Flushed; + newStatus.PageFlushStatus = PMMFlushStatus.Flushed; if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus.value, newStatus.value, oldStatus.value)) { break; diff --git a/cs/src/core/Allocator/PersistentMemoryMalloc.cs b/cs/src/core/Allocator/PersistentMemoryMalloc.cs index b172fc42..e80a79a6 100644 --- a/cs/src/core/Allocator/PersistentMemoryMalloc.cs +++ b/cs/src/core/Allocator/PersistentMemoryMalloc.cs @@ -20,9 +20,9 @@ namespace FASTER.core void CheckForAllocateComplete(ref long address); } - internal enum FlushStatus : int { Flushed, InProgress }; + internal enum PMMFlushStatus : int { Flushed, InProgress }; - internal enum CloseStatus : int { Closed, Open }; + internal enum PMMCloseStatus : int { Closed, Open }; internal struct FullPageStatus { @@ -34,9 +34,9 @@ namespace FASTER.core internal struct FlushCloseStatus { [FieldOffset(0)] - public FlushStatus PageFlushStatus; + public PMMFlushStatus PageFlushStatus; [FieldOffset(4)] - public CloseStatus PageCloseStatus; + public PMMCloseStatus PageCloseStatus; [FieldOffset(0)] public long value; } @@ -55,7 +55,7 @@ namespace FASTER.core public unsafe partial class PersistentMemoryMalloc : IAllocator { // Epoch information - public LightEpoch epoch; + private LightEpoch epoch; // Read buffer pool NativeSectorAlignedBufferPool readBufferPool; @@ -71,35 +71,42 @@ namespace FASTER.core private readonly int AlignedPageSizeBytes; // Segment size - private const int LogSegmentSizeBits = 30; - private const long SegmentSize = 1 << LogSegmentSizeBits; - private const long SegmentSizeMask = SegmentSize - 1; - private const int SegmentBufferSize = 1 + - (LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize)); + private readonly int LogSegmentSizeBits; + private readonly long SegmentSize; + private readonly long SegmentSizeMask; + private readonly int SegmentBufferSize; // Total HLOG size - private const int LogTotalSizeBits = 34; - private const long LogTotalSizeBytes = 1L << LogTotalSizeBits; - private const int BufferSize = (int)(LogTotalSizeBytes / (1L << LogPageSizeBits)); + private readonly int LogTotalSizeBits; + private readonly long LogTotalSizeBytes; + private readonly int BufferSize; // HeadOffset lag (from tail) private const int HeadOffsetLagNumPages = 4; - private const int HeadOffsetLagSize = BufferSize - HeadOffsetLagNumPages; - private const long HeadOffsetLagAddress = (long)HeadOffsetLagSize << LogPageSizeBits; + private readonly int HeadOffsetLagSize; + private readonly long HeadOffsetLagAddress; // ReadOnlyOffset lag (from tail) - public const double LogMutableFraction = 0.9; - public const long ReadOnlyLagAddress = (long)(LogMutableFraction * BufferSize) << LogPageSizeBits; + private readonly double LogMutableFraction; + private readonly long ReadOnlyLagAddress; // Circular buffer definition - private byte[][] values = new byte[BufferSize][]; - private GCHandle[] handles = new GCHandle[BufferSize]; - private long[] pointers = new long[BufferSize]; - private GCHandle ptrHandle; - private long* nativePointers; + private readonly byte[][] values; + private readonly GCHandle[] handles; + private readonly long[] pointers; + private readonly GCHandle ptrHandle; + private readonly long* nativePointers; // Array that indicates the status of each buffer page - private FullPageStatus[] PageStatusIndicator = new FullPageStatus[BufferSize]; + private readonly FullPageStatus[] PageStatusIndicator; + + // Size of object chunks beign written to storage + private const int kObjectBlockSize = 100 * (1 << 20); + + /// + /// Tail offsets per segment, in object log + /// + public readonly long[] segmentOffsets; NativeSectorAlignedBufferPool ioBufferPool; @@ -145,11 +152,12 @@ namespace FASTER.core /// /// Create instance of PMM /// - /// - /// + /// /// - public PersistentMemoryMalloc(IDevice device, IDevice objectLogDevice, IPageHandlers pageHandlers) : this(device, objectLogDevice, 0, pageHandlers) + public PersistentMemoryMalloc(LogSettings settings, IPageHandlers pageHandlers) : this(settings, 0, pageHandlers) { + + Allocate(Constants.kFirstValidAddress); // null pointer ReadOnlyAddress = GetTailAddress(); SafeReadOnlyAddress = ReadOnlyAddress; @@ -162,20 +170,45 @@ namespace FASTER.core /// /// Create instance of PMM /// - /// - /// + /// /// /// - internal PersistentMemoryMalloc(IDevice device, IDevice objectLogDevice, long startAddress, IPageHandlers pageHandlers) + internal PersistentMemoryMalloc(LogSettings settings, long startAddress, IPageHandlers pageHandlers) { + // Segment size + LogSegmentSizeBits = settings.SegmentSizeBits; + SegmentSize = 1 << LogSegmentSizeBits; + SegmentSizeMask = SegmentSize - 1; + SegmentBufferSize = 1 + + (LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize)); + + // Total HLOG size + LogTotalSizeBits = settings.MemorySizeBits; + LogTotalSizeBytes = 1L << LogTotalSizeBits; + BufferSize = (int)(LogTotalSizeBytes / (1L << LogPageSizeBits)); + + // HeadOffset lag (from tail) + HeadOffsetLagSize = BufferSize - HeadOffsetLagNumPages; + HeadOffsetLagAddress = (long)HeadOffsetLagSize << LogPageSizeBits; + + // ReadOnlyOffset lag (from tail) + LogMutableFraction = settings.MutableFraction; + ReadOnlyLagAddress = (long)(LogMutableFraction * BufferSize) << LogPageSizeBits; + + values = new byte[BufferSize][]; + handles = new GCHandle[BufferSize]; + pointers = new long[BufferSize]; + PageStatusIndicator = new FullPageStatus[BufferSize]; + segmentOffsets = new long[SegmentBufferSize]; + + if (BufferSize < 16) { throw new Exception("HLOG buffer must be at least 16 pages"); } - this.device = device; - - this.objectLogDevice = objectLogDevice; + device = settings.LogDevice; + objectLogDevice = settings.ObjectLogDevice; if (pageHandlers.HasObjects()) { @@ -247,11 +280,8 @@ namespace FASTER.core if (handles[i].IsAllocated) handles[i].Free(); values[i] = null; - PageStatusIndicator[i].PageFlushCloseStatus = new FlushCloseStatus { PageFlushStatus = FlushStatus.Flushed, PageCloseStatus = CloseStatus.Closed }; + PageStatusIndicator[i].PageFlushCloseStatus = new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.Flushed, PageCloseStatus = PMMCloseStatus.Closed }; } - handles = null; - pointers = null; - values = null; TailPageOffset.Page = 0; TailPageOffset.Offset = 0; SafeReadOnlyAddress = 0; @@ -261,55 +291,95 @@ namespace FASTER.core BeginAddress = 1; } + /// + /// Get tail address + /// + /// public long GetTailAddress() { var local = TailPageOffset; return ((long)local.Page << LogPageSizeBits) | (uint)local.Offset; } - // Simple Accessor Functions + /// + /// Get page + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public long GetPage(long logicalAddress) { return (logicalAddress >> LogPageSizeBits); } + /// + /// Get page index for page + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public int GetPageIndexForPage(long page) { return (int)(page % BufferSize); } + /// + /// Get page index for address + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public int GetPageIndexForAddress(long address) { return (int)((address >> LogPageSizeBits) % BufferSize); } + /// + /// Get capacity (number of pages) + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public int GetCapacityNumPages() { return BufferSize; } + /// + /// Get start logical address + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public long GetStartLogicalAddress(long page) { return page << LogPageSizeBits; } + /// + /// Get page size + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public long GetPageSize() { return PageSize; } + /// + /// Get offset in page + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public long GetOffsetInPage(long address) { return address & PageSizeMask; } + /// + /// Get offset lag in pages + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public long GetHeadOffsetLagInPages() { @@ -325,14 +395,10 @@ namespace FASTER.core public long GetPhysicalAddress(long logicalAddress) { // Offset within page - int offset = (int)(logicalAddress & PageSizeMask); - - // Global page address - long page = (logicalAddress >> LogPageSizeBits); + int offset = (int)(logicalAddress & ((1L << LogPageSizeBits) -1)); // Index of page within the circular buffer - int pageIndex = (int)(page % BufferSize); - + int pageIndex = (int)(logicalAddress >> LogPageSizeBits); return *(nativePointers+pageIndex) + offset; } @@ -420,8 +486,8 @@ namespace FASTER.core } //Invert the address if either the previous page is not flushed or if it is null - if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != FlushStatus.Flushed) || - (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != CloseStatus.Closed) || + if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) || + (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) || (values[pageIndex] == null)) { address = -address; @@ -483,8 +549,8 @@ namespace FASTER.core PageAlignedShiftHeadAddress(currentTailAddress); //Check if I can allocate pageIndex at all - if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != FlushStatus.Flushed) || - (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != CloseStatus.Closed) || + if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) || + (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) || (values[pageIndex] == null)) { return; @@ -515,6 +581,11 @@ namespace FASTER.core } } + /// + /// Shift begin address + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public void ShiftBeginAddress(long oldBeginAddress, long newBeginAddress) { @@ -525,26 +596,6 @@ namespace FASTER.core }); } - /// - /// Checks if until address has been flushed! - /// - /// - /// - public bool CheckFlushedUntil(long address) - { - return FlushedUntilAddress >= address; - } - - public void KillFuzzyRegion() - { - while (SafeReadOnlyAddress != ReadOnlyAddress) - { - Interlocked.CompareExchange(ref SafeReadOnlyAddress, - ReadOnlyAddress, - SafeReadOnlyAddress); - } - } - /// /// Seal: make sure there are no longer any threads writing to the page /// Flush: send page to secondary store @@ -595,7 +646,7 @@ namespace FASTER.core while (true) { var oldStatus = PageStatusIndicator[closePage].PageFlushCloseStatus; - if (oldStatus.PageFlushStatus == FlushStatus.Flushed) + if (oldStatus.PageFlushStatus == PMMFlushStatus.Flushed) { ClearPage(closePage, (closePageAddress >> LogPageSizeBits) == 0); @@ -614,7 +665,7 @@ namespace FASTER.core throw new Exception("Impossible"); } var newStatus = oldStatus; - newStatus.PageCloseStatus = CloseStatus.Closed; + newStatus.PageCloseStatus = PMMCloseStatus.Closed; if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[closePage].PageFlushCloseStatus.value, newStatus.value, oldStatus.value)) { break; @@ -657,8 +708,8 @@ namespace FASTER.core pointers[index] = (p + (sectorSize - 1)) & ~(sectorSize - 1); values[index] = tmp; - PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = FlushStatus.Flushed; - PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = CloseStatus.Closed; + PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; + PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; Interlocked.MemoryBarrier(); } @@ -761,6 +812,11 @@ namespace FASTER.core return false; } + /// + /// Reset for recovery + /// + /// + /// public void RecoveryReset(long tailAddress, long headAddress) { long tailPage = GetPage(tailAddress); @@ -779,7 +835,7 @@ namespace FASTER.core for (var addr = headAddress; addr < tailAddress; addr += PageSize) { var pageIndex = GetPageIndexForAddress(addr); - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = CloseStatus.Open; + PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Open; } } } diff --git a/cs/src/core/Codegen/FasterHashTableCompiler.cs b/cs/src/core/Codegen/FasterHashTableCompiler.cs index fc8d75b7..808d6610 100644 --- a/cs/src/core/Codegen/FasterHashTableCompiler.cs +++ b/cs/src/core/Codegen/FasterHashTableCompiler.cs @@ -38,8 +38,6 @@ namespace FASTER.core.Roslyn "IndexRecovery", "IndexCheckpoint", "StateTransitions", - "PersistentMemoryMalloc", - "PMMAsyncIO", }; @@ -47,10 +45,10 @@ namespace FASTER.core.Roslyn /// /// /// The generated type (to be instantiated). If null, then the error messages giving the reason for failing to generate the type. - public static Tuple GenerateFasterHashTableClass(bool persistGeneratedCode, bool optimizeCode, long LogTotalSizeBits, double LogMutableFraction, int LogPageSizeBits, int LogSegmentSizeBits, bool kFoldOverSnapshot) + public static Tuple GenerateFasterHashTableClass(bool persistGeneratedCode, bool optimizeCode) { var c = new FasterHashTableCompiler(); - c.Run(persistGeneratedCode, optimizeCode, LogTotalSizeBits, LogMutableFraction, LogPageSizeBits, LogSegmentSizeBits, kFoldOverSnapshot); + c.Run(persistGeneratedCode, optimizeCode); var name = String.Format("FASTER.core.Codegen_{0}.FasterKV", c.compilation.AssemblyName); var r = c.Compile(persistGeneratedCode); var a = r.Item1; @@ -66,7 +64,7 @@ namespace FASTER.core.Roslyn /// /// Runs the transformations needed to produce a valid compilation unit. /// - public void Run(bool persistGeneratedCode, bool optimizeCode, long LogTotalSizeBits, double LogMutableFraction, int LogPageSizeBits, int LogSegmentSizeBits, bool kFoldOverSnapshot) + public void Run(bool persistGeneratedCode, bool optimizeCode) { #if TIMING Stopwatch sw = new Stopwatch(); @@ -115,12 +113,6 @@ namespace FASTER.core.Roslyn compilation = compilation.ReplaceSyntaxTree(oldTree, newTree); } - compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogMutableFraction", LogMutableFraction.ToString(CultureInfo.InvariantCulture)); - compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogTotalSizeBits", LogTotalSizeBits.ToString(CultureInfo.InvariantCulture)); - compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogPageSizeBits", LogPageSizeBits.ToString(CultureInfo.InvariantCulture)); - compilation = RoslynHelpers.ReplaceConstantValue(compilation, "LogSegmentSizeBits", LogSegmentSizeBits.ToString(CultureInfo.InvariantCulture)); - compilation = RoslynHelpers.ReplaceConstantValue(compilation, "kFoldOverSnapshot", kFoldOverSnapshot ? "true" : "false"); - #if TIMING sw.Stop(); System.Diagnostics.Debug.WriteLine("Time to run the FasterHashTable compiler: {0}ms", sw.ElapsedMilliseconds); diff --git a/cs/src/core/Codegen/HashTableManager.cs b/cs/src/core/Codegen/HashTableManager.cs index 6f1e589e..cc2ba692 100644 --- a/cs/src/core/Codegen/HashTableManager.cs +++ b/cs/src/core/Codegen/HashTableManager.cs @@ -27,11 +27,11 @@ namespace FASTER.core #endif ; public static TIFaster GetFasterHashTable - (long size, IDevice logDevice, IDevice objectLogDevice, string checkpointDir, long LogTotalSizeBits, double LogMutableFraction, int LogPageSizeBits, int LogSegmentSizeBits, bool kFoldOverSnapshot, bool persistDll = PersistDll, bool optimizeCode = OptimizeCode) + (long size, LogSettings logSettings, CheckpointSettings checkpointSettings, bool persistDll = PersistDll, bool optimizeCode = OptimizeCode) { - var s = Roslyn.FasterHashTableCompiler.GenerateFasterHashTableClass(persistDll, optimizeCode, LogTotalSizeBits, LogMutableFraction, LogPageSizeBits, LogSegmentSizeBits, kFoldOverSnapshot); + var s = Roslyn.FasterHashTableCompiler.GenerateFasterHashTableClass(persistDll, optimizeCode); var t = s.Item1; - var instance = Activator.CreateInstance(t, size, logDevice, objectLogDevice, checkpointDir); + var instance = Activator.CreateInstance(t, size, logSettings, checkpointSettings); return (TIFaster)instance; } diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/FASTER/Checkpoint.cs index 2fa1873b..836165c3 100644 --- a/cs/src/core/Index/FASTER/Checkpoint.cs +++ b/cs/src/core/Index/FASTER/Checkpoint.cs @@ -224,7 +224,7 @@ namespace FASTER.core ObtainCurrentTailAddress(ref _hybridLogCheckpoint.info.startLogicalAddress); - if (!Constants.kFoldOverSnapshot) + if (!FoldOverSnapshot) { _hybridLogCheckpoint.info.flushedLogicalAddress = hlog.FlushedUntilAddress; _hybridLogCheckpoint.info.useSnapshotFile = 1; @@ -254,7 +254,7 @@ namespace FASTER.core WriteIndexMetaFile(); } - if (Constants.kFoldOverSnapshot) + if (FoldOverSnapshot) { hlog.ShiftReadOnlyToTail(out long tailAddress); @@ -507,7 +507,7 @@ namespace FASTER.core if (!prevThreadCtx.markers[EpochPhaseIdx.WaitFlush]) { var notify = false; - if (Constants.kFoldOverSnapshot) + if (FoldOverSnapshot) { notify = (hlog.FlushedUntilAddress >= _hybridLogCheckpoint.info.finalLogicalAddress); } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index d715b7a4..fbed3fa3 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -14,6 +14,17 @@ using System.Threading; namespace FASTER.core { + /// + /// FASTER configuration + /// + public static class Config + { + /// + /// Checkpoint directory + /// + public static string CheckpointDirectory = "C:\\data"; + } + public unsafe partial class FasterKV : FasterBase, IFasterKV, IPageHandlers { private PersistentMemoryMalloc hlog; @@ -22,6 +33,7 @@ namespace FASTER.core private const bool kCopyReadsToTail = false; private const bool breakWhenClassIsLoaded = false; + private readonly bool FoldOverSnapshot = false; /// /// Tail address of log @@ -75,15 +87,17 @@ namespace FASTER.core /// Create FASTER instance /// /// - /// - /// - /// - public FasterKV(long size, IDevice logDevice, IDevice objectLogDevice, string checkpointDir = null) + /// + /// + public FasterKV(long size, LogSettings logSettings, CheckpointSettings checkpointSettings = null) { - if (checkpointDir != null) - Config.CheckpointDirectory = checkpointDir; + if (checkpointSettings == null) + checkpointSettings = new CheckpointSettings(); - hlog = new PersistentMemoryMalloc(logDevice, objectLogDevice, this); + Config.CheckpointDirectory = checkpointSettings.CheckpointDir; + FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; + + hlog = new PersistentMemoryMalloc(logSettings, this); var recordSize = Layout.EstimatePhysicalSize(null, null); Initialize(size, hlog.GetSectorSize()); diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index d1bafd80..0a23bf98 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -18,8 +18,6 @@ namespace FASTER.core /// Size of cache line in bytes public const int kCacheLineBytes = 64; - public const bool kFoldOverSnapshot = false; - public const bool kFineGrainedHandoverRecord = false; public const bool kFineGrainedHandoverBucket = true; diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index badd74cc..ea858911 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -736,7 +736,7 @@ namespace FASTER.core // Mutable Region: Update the record in-place if (logicalAddress >= hlog.ReadOnlyAddress) { - if(Constants.kFoldOverSnapshot) + if(FoldOverSnapshot) { Debug.Assert(Layout.GetInfo(physicalAddress)->Version == threadCtx.version); } @@ -1012,7 +1012,7 @@ namespace FASTER.core // Mutable Region: Update the record in-place if (logicalAddress >= hlog.ReadOnlyAddress) { - if (Constants.kFoldOverSnapshot) + if (FoldOverSnapshot) { Debug.Assert(Layout.GetInfo(physicalAddress)->Version == threadCtx.version); } diff --git a/cs/src/core/Index/FASTER/Recovery.cs b/cs/src/core/Index/FASTER/Recovery.cs index 847ebd35..eb012ef9 100644 --- a/cs/src/core/Index/FASTER/Recovery.cs +++ b/cs/src/core/Index/FASTER/Recovery.cs @@ -12,6 +12,41 @@ using System.Threading; namespace FASTER.core { + + internal enum ReadStatus { Pending, Done }; + internal enum FlushStatus { Pending, Done }; + + internal class RecoveryStatus + { + public long startPage; + public long endPage; + public int capacity; + + public IDevice recoveryDevice; + public long recoveryDevicePageOffset; + public IDevice objectLogRecoveryDevice; + + public ReadStatus[] readStatus; + public FlushStatus[] flushStatus; + + public RecoveryStatus(int capacity, + long startPage, + long endPage) + { + this.capacity = capacity; + this.startPage = startPage; + this.endPage = endPage; + readStatus = new ReadStatus[capacity]; + flushStatus = new FlushStatus[capacity]; + for (int i = 0; i < capacity; i++) + { + flushStatus[i] = FlushStatus.Done; + readStatus[i] = ReadStatus.Pending; + } + } + } + + /// /// Partial class for recovery code in FASTER /// @@ -24,7 +59,7 @@ namespace FASTER.core // Recover segment offsets for object log if (_hybridLogCheckpoint.info.objectLogSegmentOffsets != null) - hlog.segmentOffsets = _hybridLogCheckpoint.info.objectLogSegmentOffsets; + Array.Copy(_hybridLogCheckpoint.info.objectLogSegmentOffsets, hlog.segmentOffsets, _hybridLogCheckpoint.info.objectLogSegmentOffsets.Length); _indexCheckpoint.main_ht_device = new LocalStorageDevice(DirectoryConfiguration.GetPrimaryHashTableFileName(_indexCheckpoint.info.token)); _indexCheckpoint.ofb_device = new LocalStorageDevice(DirectoryConfiguration.GetOverflowBucketsFileName(_indexCheckpoint.info.token)); @@ -46,7 +81,7 @@ namespace FASTER.core DeleteTentativeEntries(); - if (Constants.kFoldOverSnapshot) + if (FoldOverSnapshot) { RecoverHybridLog(_indexCheckpoint.info, _hybridLogCheckpoint.info); } @@ -114,37 +149,6 @@ namespace FASTER.core hlog.RecoveryReset(untilAddress, headAddress); } - private enum ReadStatus { Pending, Done }; - private enum FlushStatus { Pending, Done }; - private class RecoveryStatus - { - public long startPage; - public long endPage; - public int capacity; - - public IDevice recoveryDevice; - public long recoveryDevicePageOffset; - public IDevice objectLogRecoveryDevice; - - public ReadStatus[] readStatus; - public FlushStatus[] flushStatus; - - public RecoveryStatus(int capacity, - long startPage, - long endPage) - { - this.capacity = capacity; - this.startPage = startPage; - this.endPage = endPage; - readStatus = new ReadStatus[capacity]; - flushStatus = new FlushStatus[capacity]; - for (int i = 0; i < capacity; i++) - { - flushStatus[i] = FlushStatus.Done; - readStatus[i] = ReadStatus.Pending; - } - } - } private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, HybridLogRecoveryInfo recoveryInfo) @@ -422,15 +426,12 @@ namespace FASTER.core if (Interlocked.Decrement(ref result.count) == 0) { - // We don't write partial pages during recovery - Debug.Assert(result.partial == false); - int index = hlog.GetPageIndexForPage(result.page); result.context.flushStatus[index] = FlushStatus.Done; if (result.page + result.context.capacity < result.context.endPage) { long readPage = result.page + result.context.capacity; - if (Constants.kFoldOverSnapshot) + if (FoldOverSnapshot) { hlog.AsyncReadPagesFromDevice(readPage, 1, AsyncReadPagesCallbackForRecovery, result.context); } diff --git a/cs/src/core/ManagedLayer/FASTERFactory.cs b/cs/src/core/ManagedLayer/FASTERFactory.cs index f2e9a591..b17a8253 100644 --- a/cs/src/core/ManagedLayer/FASTERFactory.cs +++ b/cs/src/core/ManagedLayer/FASTERFactory.cs @@ -24,11 +24,6 @@ namespace FASTER.core /// public IDevice ObjectLogDevice = new NullDevice(); - /// - /// Size of page, in bits - /// - public int PageSizeBits = 25; - /// /// Size of a segment (group of pages), in bits /// @@ -164,10 +159,7 @@ namespace FASTER.core HashTableManager.GetFasterHashTable - (indexSizeBuckets, logSettings.LogDevice, logSettings.ObjectLogDevice, - checkpointSettings.CheckpointDir, logSettings.MemorySizeBits, - logSettings.MutableFraction, logSettings.PageSizeBits, - logSettings.SegmentSizeBits, checkpointSettings.CheckPointType == CheckpointType.FoldOver); + (indexSizeBuckets, logSettings, checkpointSettings); } /// diff --git a/cs/src/core/Utilities/AsyncResultTypes.cs b/cs/src/core/Utilities/AsyncResultTypes.cs index 31f3f57a..ff3fb92c 100644 --- a/cs/src/core/Utilities/AsyncResultTypes.cs +++ b/cs/src/core/Utilities/AsyncResultTypes.cs @@ -12,17 +12,6 @@ using System.IO; namespace FASTER.core { - /// - /// FASTER configuration - /// - public static class Config - { - /// - /// Checkpoint directory - /// - public static string CheckpointDirectory = "C:\\data"; - } - internal struct AsyncGetFromDiskResult : IAsyncResult { public TContext context; @@ -36,73 +25,6 @@ namespace FASTER.core public bool CompletedSynchronously => throw new NotImplementedException(); } - internal struct PageAsyncReadResult : IAsyncResult - { - public long page; - public TContext context; - public CountdownEvent handle; - public SectorAlignedMemory freeBuffer1; - public IOCompletionCallback callback; - public int count; - public IDevice objlogDevice; - public long resumeptr; - public long untilptr; - - public bool IsCompleted => throw new NotImplementedException(); - - public WaitHandle AsyncWaitHandle => throw new NotImplementedException(); - - public object AsyncState => throw new NotImplementedException(); - - public bool CompletedSynchronously => throw new NotImplementedException(); - - public void Free() - { - if (freeBuffer1.buffer != null) - freeBuffer1.Return(); - - if (handle != null) - { - handle.Signal(); - } - } - } - - internal class PageAsyncFlushResult : IAsyncResult - { - public long page; - public TContext context; - public bool partial; - public long untilAddress; - public int count; - public CountdownEvent handle; - public IDevice objlogDevice; - public SectorAlignedMemory freeBuffer1; - public SectorAlignedMemory freeBuffer2; - public AutoResetEvent done; - - public bool IsCompleted => throw new NotImplementedException(); - - public WaitHandle AsyncWaitHandle => throw new NotImplementedException(); - - public object AsyncState => throw new NotImplementedException(); - - public bool CompletedSynchronously => throw new NotImplementedException(); - - public void Free() - { - if (freeBuffer1.buffer != null) - freeBuffer1.Return(); - if (freeBuffer2.buffer != null) - freeBuffer2.Return(); - - if (handle != null) - { - handle.Signal(); - } - } - } - internal unsafe class HashIndexPageAsyncFlushResult : IAsyncResult { public HashBucket* start; diff --git a/cs/src/core/Utilities/PageAsyncResultTypes.cs b/cs/src/core/Utilities/PageAsyncResultTypes.cs new file mode 100644 index 00000000..99d00ff2 --- /dev/null +++ b/cs/src/core/Utilities/PageAsyncResultTypes.cs @@ -0,0 +1,135 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#define CALLOC + +using System; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Result of async page read + /// + /// + public struct PageAsyncReadResult : IAsyncResult + { + /// + /// Page + /// + public long page; + /// + /// Context + /// + public TContext context; + /// + /// Count + /// + public int count; + + internal CountdownEvent handle; + internal SectorAlignedMemory freeBuffer1; + internal IOCompletionCallback callback; + internal IDevice objlogDevice; + internal long resumeptr; + internal long untilptr; + + /// + /// + /// + public bool IsCompleted => throw new NotImplementedException(); + + /// + /// + /// + public WaitHandle AsyncWaitHandle => throw new NotImplementedException(); + + /// + /// + /// + public object AsyncState => throw new NotImplementedException(); + + /// + /// + /// + public bool CompletedSynchronously => throw new NotImplementedException(); + + /// + /// Free + /// + public void Free() + { + if (freeBuffer1.buffer != null) + freeBuffer1.Return(); + + if (handle != null) + { + handle.Signal(); + } + } + } + + /// + /// Page async flush result + /// + /// + public class PageAsyncFlushResult : IAsyncResult + { + /// + /// Page + /// + public long page; + /// + /// Context + /// + public TContext context; + /// + /// Count + /// + public int count; + + internal bool partial; + internal long untilAddress; + internal CountdownEvent handle; + internal IDevice objlogDevice; + internal SectorAlignedMemory freeBuffer1; + internal SectorAlignedMemory freeBuffer2; + internal AutoResetEvent done; + + /// + /// + /// + public bool IsCompleted => throw new NotImplementedException(); + + /// + /// + /// + public WaitHandle AsyncWaitHandle => throw new NotImplementedException(); + + /// + /// + /// + public object AsyncState => throw new NotImplementedException(); + + /// + /// + /// + public bool CompletedSynchronously => throw new NotImplementedException(); + + /// + /// Free + /// + public void Free() + { + if (freeBuffer1.buffer != null) + freeBuffer1.Return(); + if (freeBuffer2.buffer != null) + freeBuffer2.Return(); + + if (handle != null) + { + handle.Signal(); + } + } + } +} diff --git a/cs/test/LargeObjectTests.cs b/cs/test/LargeObjectTests.cs index 8a332121..829f7cfc 100644 --- a/cs/test/LargeObjectTests.cs +++ b/cs/test/LargeObjectTests.cs @@ -32,14 +32,14 @@ namespace FASTER.test.largeobjects fht1 = FasterFactory.Create (indexSizeBuckets: 128, functions: new MyLargeFunctions(), - logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot } ); fht2 = FasterFactory.Create (indexSizeBuckets: 128, functions: new MyLargeFunctions(), - logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot } ); @@ -96,14 +96,14 @@ namespace FASTER.test.largeobjects fht1 = FasterFactory.Create (indexSizeBuckets: 128, functions: new MyLargeFunctions(), - logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver } ); fht2 = FasterFactory.Create (indexSizeBuckets: 128, functions: new MyLargeFunctions(), - logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver } ); diff --git a/cs/test/ObjectFASTERTests.cs b/cs/test/ObjectFASTERTests.cs index 6db90711..25f220f3 100644 --- a/cs/test/ObjectFASTERTests.cs +++ b/cs/test/ObjectFASTERTests.cs @@ -29,7 +29,7 @@ namespace FASTER.test fht = FasterFactory.Create (indexSizeBuckets: 128, functions: new MyFunctions(), - logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckPointType = CheckpointType.FoldOver } ); fht.StartSession(); diff --git a/cs/test/SimpleRecoveryTest.cs b/cs/test/SimpleRecoveryTest.cs index 1a0db1e0..92bb8287 100644 --- a/cs/test/SimpleRecoveryTest.cs +++ b/cs/test/SimpleRecoveryTest.cs @@ -32,14 +32,14 @@ namespace FASTER.test.recovery.sumstore.simple fht1 = FasterFactory.Create (indexSizeBuckets: 128, - logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot } ); fht2 = FasterFactory.Create (indexSizeBuckets: 128, - logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.Snapshot } ); @@ -99,14 +99,14 @@ namespace FASTER.test.recovery.sumstore.simple fht1 = FasterFactory.Create (indexSizeBuckets: 128, - logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver } ); fht2 = FasterFactory.Create (indexSizeBuckets: 128, - logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 9, MemorySizeBits = 13 }, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 }, checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints", CheckPointType = CheckpointType.FoldOver } );